#!/usr/bin/env python # qpaeq is a equalizer interface for pulseaudio's equalizer sinks # Copyright (C) 2009 Jason Newton . import os,math,sys try: import PyQt4,sip from PyQt4 import QtGui,QtCore import dbus.mainloop.qt import dbus except ImportError,e: print 'There was an error importing need libraries' print 'Make sure you haveqt4 and dbus forthon installed' print 'The error that occured was' print '\t%s' %(str(e)) import sys sys.exit(-1) from functools import partial import signal signal.signal(signal.SIGINT, signal.SIG_DFL) SYNC_TIMEOUT = 4*1000 CORE_PATH = "/org/pulseaudio/core1" CORE_IFACE = "org.PulseAudio.Core1" def connect(): try: if 'PULSE_DBUS_SERVER' in os.environ: address = os.environ['PULSE_DBUS_SERVER'] else: bus = dbus.SessionBus() # Should be UserBus, but D-Bus doesn't implement that yet. server_lookup = bus.get_object('org.PulseAudio1', '/org/pulseaudio/server_lookup1') address = server_lookup.Get('org.PulseAudio.ServerLookup1', 'Address', dbus_interface='org.freedesktop.DBus.Properties') return dbus.connection.Connection(address) except Exception,e: print 'There was an error connecting to pulseaudio, please make sure you have the pulseaudio dbus' print 'and equalizer modules loaded, exiting...' import sys sys.exit(-1) #TODO: signals: sink Filter changed, sink reconfigured (window size) (sink iface) #TODO: manager signals: new sink, removed sink, new profile, removed profile #TODO: add support for changing of window_size 1000-fft_size (adv option) #TODO: reconnect support loop 1 second trying to reconnect #TODO: just resample the filters for profiles when loading to different sizes #TODO: add preamp prop_iface='org.freedesktop.DBus.Properties' eq_iface='org.PulseAudio.Ext.Equalizing1.Equalizer' device_iface='org.PulseAudio.Core1.Device' class QPaeq(QtGui.QWidget): manager_path='/org/pulseaudio/equalizing1' manager_iface='org.PulseAudio.Ext.Equalizing1.Manager' core_iface='org.PulseAudio.Core1' core_path='/org/pulseaudio/core1' def __init__(self): QtGui.QWidget.__init__(self) self.setWindowTitle('qpaeq') self.slider_widget=None self.sink_name=None self.filter_state=None self.create_layout() self.set_connection() self.connect_to_sink(self.sinks[0]) self.set_callbacks() self.setMinimumSize(self.sizeHint()) def create_layout(self): self.main_layout=QtGui.QVBoxLayout() self.setLayout(self.main_layout) toprow_layout=QtGui.QHBoxLayout() sizePolicy = QtGui.QSizePolicy(QtGui.QSizePolicy.Preferred, QtGui.QSizePolicy.Fixed) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) #sizePolicy.setHeightForWidth(self.profile_box.sizePolicy().hasHeightForWidth()) toprow_layout.addWidget(QtGui.QLabel('Sink')) self.sink_box = QtGui.QComboBox() self.sink_box.setSizePolicy(sizePolicy) self.sink_box.setDuplicatesEnabled(False) self.sink_box.setInsertPolicy(QtGui.QComboBox.InsertAlphabetically) #self.sink_box.setSizeAdjustPolicy(QtGui.QComboBox.AdjustToContents) toprow_layout.addWidget(self.sink_box) toprow_layout.addWidget(QtGui.QLabel('Channel')) self.channel_box = QtGui.QComboBox() self.channel_box.setSizePolicy(sizePolicy) toprow_layout.addWidget(self.channel_box) toprow_layout.addWidget(QtGui.QLabel('Preset')) self.profile_box = QtGui.QComboBox() self.profile_box.setSizePolicy(sizePolicy) self.profile_box.setInsertPolicy(QtGui.QComboBox.InsertAlphabetically) #self.profile_box.setSizeAdjustPolicy(QtGui.QComboBox.AdjustToContents) toprow_layout.addWidget(self.profile_box) large_icon_size=self.style().pixelMetric(QtGui.QStyle.PM_LargeIconSize) large_icon_size=QtCore.QSize(large_icon_size,large_icon_size) save_profile=QtGui.QToolButton() save_profile.setIcon(self.style().standardIcon(QtGui.QStyle.SP_DriveFDIcon)) save_profile.setIconSize(large_icon_size) save_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly) save_profile.clicked.connect(self.save_profile) remove_profile=QtGui.QToolButton() remove_profile.setIcon(self.style().standardIcon(QtGui.QStyle.SP_TrashIcon)) remove_profile.setIconSize(large_icon_size) remove_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly) remove_profile.clicked.connect(self.remove_profile) toprow_layout.addWidget(save_profile) toprow_layout.addWidget(remove_profile) reset_button = QtGui.QPushButton('Reset') reset_button.clicked.connect(self.reset) toprow_layout.addStretch() toprow_layout.addWidget(reset_button) self.layout().addLayout(toprow_layout) self.profile_box.activated.connect(self.load_profile) self.channel_box.activated.connect(self.select_channel) def connect_to_sink(self,name): #TODO: clear slots for profile buttons #flush any pending saves for other sinks if self.filter_state is not None: self.filter_state.flush_state() sink=self.connection.get_object(object_path=name) self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface) self.sink=dbus.Interface(sink,dbus_interface=eq_iface) self.filter_state=FilterState(sink) #sample_rate,filter_rate,channels,channel) self.channel_box.clear() self.channel_box.addItem('All',self.filter_state.channels) for i in xrange(self.filter_state.channels): self.channel_box.addItem('%d' %(i+1,),i) self.setMinimumSize(self.sizeHint()) self.set_slider_widget(SliderArray(self.filter_state)) self.sink_name=name #set the signal listener for this sink core=self._get_core() #temporary hack until signal filtering works properly core.ListenForSignal('',[dbus.ObjectPath(self.sink_name),dbus.ObjectPath(self.manager_path)]) #for x in ['FilterChanged']: # core.ListenForSignal("%s.%s" %(self.eq_iface,x),[dbus.ObjectPath(self.sink_name)]) #core.ListenForSignal(self.eq_iface,[dbus.ObjectPath(self.sink_name)]) self.sink.connect_to_signal('FilterChanged',self.read_filter) def set_slider_widget(self,widget): layout=self.layout() if self.slider_widget is not None: i=layout.indexOf(self.slider_widget) layout.removeWidget(self.slider_widget) self.slider_widget.deleteLater() layout.insertWidget(i,self.slider_widget) else: layout.addWidget(widget) self.slider_widget=widget self.read_filter() def _get_core(self): core_obj=self.connection.get_object(object_path=self.core_path) core=dbus.Interface(core_obj,dbus_interface=self.core_iface) return core def sink_added(self,sink): #TODO: preserve selected sink self.update_sinks() def sink_removed(self,sink): #TODO: preserve selected sink, try connecting to backup otherwise if sink==self.sink_name: #connect to new sink? pass self.update_sinks() def save_profile(self): #popup dialog box for name current=self.profile_box.currentIndex() profile,ok=QtGui.QInputDialog.getItem(self,'Preset Name','Preset',self.profiles,current) if not ok or profile=='': return if profile in self.profiles: mbox=QtGui.QMessageBox(self) mbox.setText('%s preset already exists'%(profile,)) mbox.setInformativeText('Do you want to save over it?') mbox.setStandardButtons(mbox.Save|mbox.Discard|mbox.Cancel) mbox.setDefaultButton(mbox.Save) ret=mbox.exec_() if ret!=mbox.Save: return self.sink.SaveProfile(self.filter_state.channel,dbus.String(profile)) if self.filter_state.channel==self.filter_state.channels: for x in range(1,self.filter_state.channels): self.sink.LoadProfile(x,dbus.String(profile)) def remove_profile(self): #find active profile name, remove it profile=self.profile_box.currentText() manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface) manager.RemoveProfile(dbus.String(profile)) def load_profile(self,x): profile=self.profile_box.itemText(x) self.filter_state.load_profile(profile) def select_channel(self,x): self.filter_state.channel = self.channel_box.itemData(x).toPyObject() self._set_profile_name() self.filter_state.readback() #TODO: add back in preamp! #print frequencies #main_layout.addLayout(self.create_slider(partial(self.update_coefficient,0), # 'Preamp')[0] #) def set_connection(self): self.connection=connect() self.manager_obj=self.connection.get_object(object_path=self.manager_path) manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface) self.sinks=manager_props.Get(self.manager_iface,'EqualizedSinks') def set_callbacks(self): manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface) manager.connect_to_signal('ProfilesChanged',self.update_profiles) manager.connect_to_signal('SinkAdded',self.sink_added) manager.connect_to_signal('SinkRemoved',self.sink_removed) #self._get_core().ListenForSignal(self.manager_iface,[]) #self._get_core().ListenForSignal(self.manager_iface,[dbus.ObjectPath(self.manager_path)]) #core=self._get_core() #for x in ['ProfilesChanged','SinkAdded','SinkRemoved']: # core.ListenForSignal("%s.%s" %(self.manager_iface,x),[dbus.ObjectPath(self.manager_path)]) self.update_profiles() self.update_sinks() def update_profiles(self): #print 'update profiles called!' manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface) self.profiles=manager_props.Get(self.manager_iface,'Profiles') self.profile_box.blockSignals(True) self.profile_box.clear() self.profile_box.addItems(self.profiles) self.profile_box.blockSignals(False) self._set_profile_name() def update_sinks(self): self.sink_box.blockSignals(True) self.sink_box.clear() for x in self.sinks: sink=self.connection.get_object(object_path=x) sink_props=dbus.Interface(sink,dbus_interface=prop_iface) simple_name=sink_props.Get(device_iface,'Name') self.sink_box.addItem(simple_name,x) self.sink_box.blockSignals(False) self.sink_box.setMinimumSize(self.sink_box.sizeHint()) def read_filter(self): #print self.filter_frequencies self.filter_state.readback() def reset(self): coefs=dbus.Array([1/math.sqrt(2.0)]*(self.filter_state.filter_rate//2+1)) preamp=1.0 self.filter_state.set_filter(preamp,coefs) def _set_profile_name(self): self.profile_box.blockSignals(True) profile_name=self.sink.BaseProfile(self.filter_state.channel) if profile_name is not None: i=self.profile_box.findText(profile_name) if i>=0: self.profile_box.setCurrentIndex(i) self.profile_box.blockSignals(False) class SliderArray(QtGui.QWidget): def __init__(self,filter_state,parent=None): super(SliderArray,self).__init__(parent) #self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;') #self.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('blue')) self.filter_state=filter_state self.setLayout(QtGui.QHBoxLayout()) self.sub_array=None self.set_sub_array(SliderArraySub(self.filter_state)) self.inhibit_resize=0 def set_sub_array(self,widget): if self.sub_array is not None: self.layout().removeWidget(self.sub_array) self.sub_array.disconnect_signals() self.sub_array.deleteLater() self.sub_array=widget self.layout().addWidget(self.sub_array) self.sub_array.connect_signals() self.filter_state.readback() def resizeEvent(self,event): super(SliderArray,self).resizeEvent(event) if self.inhibit_resize==0: self.inhibit_resize+=1 #self.add_sliders_to_fit() t=QtCore.QTimer(self) t.setSingleShot(True) t.setInterval(0) t.timeout.connect(partial(self.add_sliders_to_fit,event)) t.start() def add_sliders_to_fit(self,event): if event.oldSize().width()>0 and event.size().width()>0: i=len(self.filter_state.frequencies)*int(round(float(event.size().width())/event.oldSize().width())) else: i=len(self.filter_state.frequencies) t_w=self.size().width() def evaluate(filter_state, target, variable): base_freqs=self.filter_state.freq_proper(self.filter_state.DEFAULT_FREQUENCIES) filter_state._set_frequency_values(subdivide(base_freqs,variable)) new_widget=SliderArraySub(filter_state) w=new_widget.sizeHint().width() return w-target def searcher(initial,evaluator): i=initial def d(e): return 1 if e>=0 else -1 error=evaluator(i) old_direction=d(error) i-=old_direction while True: error=evaluator(i) direction=d(error) if direction!=old_direction: k=i-1 #while direction<0 and error!=0: # k-=1 # error=evaluator(i) # direction=d(error) return k, evaluator(k) i-=direction old_direction=direction searcher(i,partial(evaluate,self.filter_state,t_w)) self.set_sub_array(SliderArraySub(self.filter_state)) self.inhibit_resize-=1 class SliderArraySub(QtGui.QWidget): def __init__(self,filter_state,parent=None): super(SliderArraySub,self).__init__(parent) self.filter_state=filter_state self.setLayout(QtGui.QGridLayout()) self.slider=[None]*len(self.filter_state.frequencies) self.label=[None]*len(self.slider) #self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;') #self.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('blue')) qt=QtCore.Qt #self.layout().setHorizontalSpacing(1) def add_slider(slider,label, c): self.layout().addWidget(slider,0,c,qt.AlignHCenter) self.layout().addWidget(label,1,c,qt.AlignHCenter) self.layout().setColumnMinimumWidth(c,max(label.sizeHint().width(),slider.sizeHint().width())) def create_slider(slider_label): slider=QtGui.QSlider(QtCore.Qt.Vertical,self) label=SliderLabel(slider_label,filter_state,self) slider.setRange(-1000,2000) slider.setSingleStep(1) return (slider,label) self.preamp_slider,self.preamp_label=create_slider('Preamp') add_slider(self.preamp_slider,self.preamp_label,0) for i,hz in enumerate(self.filter_state.frequencies): slider,label=create_slider(self.hz2label(hz)) self.slider[i]=slider #slider.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('red',)) self.label[i]=label c=i+1 add_slider(slider,label,i+1) def hz2label(self, hz): if hz==0: label_text='DC' elif hz==self.filter_state.sample_rate//2: label_text='Coda' else: label_text=hz2str(hz) return label_text def connect_signals(self): def connect(writer,reader,slider,label): slider.valueChanged.connect(writer) self.filter_state.readFilter.connect(reader) label_cb=partial(slider.setValue,0) label.clicked.connect(label_cb) return label_cb self.preamp_writer_cb=self.write_preamp self.preamp_reader_cb=self.sync_preamp self.preamp_label_cb=connect(self.preamp_writer_cb, self.preamp_reader_cb, self.preamp_slider, self.preamp_label) self.writer_callbacks=[None]*len(self.slider) self.reader_callbacks=[None]*len(self.slider) self.label_callbacks=[None]*len(self.label) for i in range(len(self.slider)): self.writer_callbacks[i]=partial(self.write_coefficient,i) self.reader_callbacks[i]=partial(self.sync_coefficient,i) self.label_callbacks[i]=connect(self.writer_callbacks[i], self.reader_callbacks[i], self.slider[i], self.label[i]) def disconnect_signals(self): def disconnect(writer,reader,label_cb,slider,label): slider.valueChanged.disconnect(writer) self.filter_state.readFilter.disconnect(reader) label.clicked.disconnect(label_cb) disconnect(self.preamp_writer_cb, self.preamp_reader_cb, self.preamp_label_cb, self.preamp_slider, self.preamp_label) for i in range(len(self.slider)): disconnect(self.writer_callbacks[i], self.reader_callbacks[i], self.label_callbacks[i], self.slider[i], self.label[i]) def write_preamp(self, v): self.filter_state.preamp=self.slider2coef(v) self.filter_state.seed() def sync_preamp(self): self.preamp_slider.blockSignals(True) self.preamp_slider.setValue(self.coef2slider(self.filter_state.preamp)) self.preamp_slider.blockSignals(False) def write_coefficient(self,i,v): self.filter_state.coefficients[i]=self.slider2coef(v)/math.sqrt(2.0) self.filter_state.seed() def sync_coefficient(self,i): slider=self.slider[i] slider.blockSignals(True) slider.setValue(self.coef2slider(math.sqrt(2.0)*self.filter_state.coefficients[i])) slider.blockSignals(False) @staticmethod def slider2coef(x): return (1.0+(x/1000.0)) @staticmethod def coef2slider(x): return int((x-1.0)*1000) outline='border-width: 1px; border-style: solid; border-color: %s;' class SliderLabel(QtGui.QLabel): clicked=QtCore.pyqtSignal() def __init__(self,label_text,filter_state,parent=None): super(SliderLabel,self).__init__(parent) self.setStyleSheet('font-size: 7pt; font-family: monospace;') self.setText(label_text) self.setMinimumSize(self.sizeHint()) def mouseDoubleClickEvent(self, event): self.clicked.emit() super(SliderLabel,self).mouseDoubleClickEvent(event) #until there are server side state savings, do it in the client but try and avoid #simulaneous broadcasting situations class FilterState(QtCore.QObject): #DEFAULT_FREQUENCIES=map(float,[25,50,75,100,150,200,300,400,500,800,1e3,1.5e3,3e3,5e3,7e3,10e3,15e3,20e3]) DEFAULT_FREQUENCIES=[31.75,63.5,125,250,500,1e3,2e3,4e3,8e3,16e3] readFilter=QtCore.pyqtSignal() def __init__(self,sink): super(FilterState,self).__init__() self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface) self.sink=dbus.Interface(sink,dbus_interface=eq_iface) self.sample_rate=self.get_eq_attr('SampleRate') self.filter_rate=self.get_eq_attr('FilterSampleRate') self.channels=self.get_eq_attr('NChannels') self.channel=self.channels self.set_frequency_values(self.DEFAULT_FREQUENCIES) self.sync_timer=QtCore.QTimer() self.sync_timer.setSingleShot(True) self.sync_timer.timeout.connect(self.save_state) def get_eq_attr(self,attr): return self.sink_props.Get(eq_iface,attr) def freq_proper(self,xs): return [0]+xs+[self.sample_rate//2] def _set_frequency_values(self,freqs): self.frequencies=freqs #print 'base',self.frequencies self.filter_frequencies=map(lambda x: int(round(x)), \ self.translate_rates(self.filter_rate,self.sample_rate, self.frequencies) \ ) self.coefficients=[0.0]*len(self.frequencies) self.preamp=1.0 def set_frequency_values(self,freqs): self._set_frequency_values(self.freq_proper(freqs)) @staticmethod def translate_rates(dst,src,rates): return list(map(lambda x: x*dst/src,rates)) def seed(self): self.sink.SeedFilter(self.channel,self.filter_frequencies,self.coefficients,self.preamp) self.sync_timer.start(SYNC_TIMEOUT) def readback(self): coefs,preamp=self.sink.FilterAtPoints(self.channel,self.filter_frequencies) self.coefficients=coefs self.preamp=preamp self.readFilter.emit() def set_filter(self,preamp,coefs): self.sink.SetFilter(self.channel,dbus.Array(coefs),self.preamp) self.sync_timer.start(SYNC_TIMEOUT) def save_state(self): print 'saving state' self.sink.SaveState() def load_profile(self,profile): self.sink.LoadProfile(self.channel,dbus.String(profile)) self.sync_timer.start(SYNC_TIMEOUT) def flush_state(self): if self.sync_timer.isActive(): self.sync_timer.stop() self.save_state() def safe_log(k,b): i=0 while k//b!=0: i+=1 k=k//b return i def hz2str(hz): p=safe_log(hz,10.0) if p<3: return '%dHz' %(hz,) elif hz%1000==0: return '%dKHz' %(hz/(10.0**3),) else: return '%.1fKHz' %(hz/(10.0**3),) def subdivide(xs, t_points): while len(xs)