"""Module for high-level communication over the FreeDesktop.org Bus (DBus)
DBus allows you to share and access remote objects between processes
running on the desktop, and also to access system services (such as
the print spool).
To use DBus, first get a Bus object, which provides a connection to one
of a few standard dbus-daemon instances that might be running. From the
Bus you can get a RemoteService. A service is provided by an application or
process connected to the Bus, and represents a set of objects. Once you
have a RemoteService you can get a RemoteObject that implements a specific interface
(an interface is just a standard group of member functions). Then you can call
those member functions directly.
You can think of a complete method call as looking something like:
Bus:SESSION -> Service:org.gnome.Evolution -> Object:/org/gnome/Evolution/Inbox -> Interface: org.gnome.Evolution.MailFolder -> Method: Forward('message1', 'seth@gnome.org')
This communicates over the SESSION Bus to the org.gnome.Evolution process to call the
Forward method of the /org/gnome/Evolution/Inbox object (which provides the
org.gnome.Evolution.MailFolder interface) with two string arguments.
For example, the dbus-daemon itself provides a service and some objects:
# Get a connection to the desktop-wide SESSION bus
bus = dbus.Bus(dbus.Bus.TYPE_SESSION)
# Get the service provided by the dbus-daemon named org.freedesktop.DBus
dbus_service = bus.get_service('org.freedesktop.DBus')
# Get a reference to the desktop bus' standard object, denoted
# by the path /org/freedesktop/DBus. The object /org/freedesktop/DBus
# implements the 'org.freedesktop.DBus' interface
dbus_object = dbus_service.get_object('/org/freedesktop/DBus',
'org.freedesktop.DBus')
# One of the member functions in the org.freedesktop.DBus interface
# is ListServices(), which provides a list of all the other services
# registered on this bus. Call it, and print the list.
print(dbus_object.ListServices())
"""
import dbus_bindings
import re
import inspect
version = (0, 40, 0)
_threads_initialized = 0
def init_gthreads ():
global _threads_initialized
if not _threads_initialized:
dbus_bindings.init_gthreads ()
_threads_initialized = 1
def _validate_interface_or_name(value):
elements = value.split('.')
if len(elements) <= 1:
raise ValidationException("%s must contain at least two elements seperated by a period ('.')"%(value))
validate = re.compile('[A-Za-z][\w_]*')
for element in elements:
if not validate.match(element):
raise ValidationException("Element %s of %s has invalid characters"%(element ,value))
#Decorators
def method(dbus_interface):
_validate_interface_or_name(dbus_interface)
def decorator(func):
func._dbus_is_method = True
func._dbus_interface = dbus_interface
func._dbus_args = inspect.getargspec(func)[0]
func._dbus_args.pop(0)
return func
return decorator
def signal(dbus_interface):
_validate_interface_or_name(dbus_interface)
def decorator(func):
def emit_signal(self, *args, **keywords):
func(self, *args, **keywords)
message = dbus_bindings.Signal(self._object_path, dbus_interface, func.__name__)
iter = message.get_iter(True)
for arg in args:
iter.append(arg)
self._connection.send(message)
emit_signal._dbus_is_signal = True
emit_signal._dbus_interface = dbus_interface
emit_signal.__name__ = func.__name__
emit_signal._dbus_args = inspect.getargspec(func)[0]
emit_signal._dbus_args.pop(0)
return emit_signal
return decorator
class Bus:
"""A connection to a DBus daemon.
One of three possible standard buses, the SESSION, SYSTEM,
or STARTER bus
"""
TYPE_SESSION = dbus_bindings.BUS_SESSION
TYPE_SYSTEM = dbus_bindings.BUS_SYSTEM
TYPE_STARTER = dbus_bindings.BUS_STARTER
"""bus_type=[Bus.TYPE_SESSION | Bus.TYPE_SYSTEM | Bus.TYPE_STARTER]
"""
START_REPLY_SUCCESS = dbus_bindings.DBUS_START_REPLY_SUCCESS
START_REPLY_ALREADY_RUNNING = dbus_bindings.DBUS_START_REPLY_ALREADY_RUNNING
def __init__(self, bus_type=TYPE_SESSION, glib_mainloop=True):
self._connection = dbus_bindings.bus_get(bus_type)
self._connection.add_filter(self._signal_func)
self._match_rule_to_receivers = { }
if (glib_mainloop):
self._connection.setup_with_g_main()
def get_connection(self):
return self._connection
def get_session():
"""Static method that returns the session bus"""
return SessionBus()
get_session = staticmethod(get_session)
def get_system():
"""Static method that returns the system bus"""
return SystemBus()
get_system = staticmethod(get_system)
def get_starter():
"""Static method that returns the starter bus"""
return StarterBus()
get_starter = staticmethod(get_starter)
def get_object(self, named_service, object_path):
"""Get a proxy object to call over the bus"""
return ProxyObject(self, named_service, object_path)
def add_signal_receiver(self, handler_function, signal_name=None, dbus_interface=None, named_service=None, path=None):
match_rule = self._get_match_rule(signal_name, dbus_interface, named_service, path)
if (not self._match_rule_to_receivers.has_key(match_rule)):
self._match_rule_to_receivers[match_rule] = [handler_function]
else:
self._match_rule_to_receivers[match_rule].append(handler_function)
dbus_bindings.bus_add_match(self._connection, match_rule)
def remove_signal_receiver(self, handler_function, signal_name=None, dbus_interface=None, named_service=None, path=None):
match_rule = self._get_match_rule(signal_name, dbus_interface, named_service, path)
if self._match_rule_to_receivers.has_key(match_rule):
if self._match_rule_to_receivers[match_rule].__contains__(handler_function):
self._match_rule_to_receivers[match_rule].pop(handler_function)
dbus_bindings.bus_remove_match(self._connection, match_rule)
def get_unix_user(self, named_service):
"""Get the unix user for the given named_service on this Bus"""
return dbus_bindings.bus_get_unix_user(self._connection, named_service)
def _get_match_rule(self, signal_name, dbus_interface, named_service, path):
match_rule = "type='signal'"
if (dbus_interface):
match_rule = match_rule + ",interface='%s'" % (dbus_interface)
if (named_service):
if (named_service[0] != ':' and named_service != "org.freedesktop.DBus"):
bus_object = self.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
named_service = bus_object.GetNameOwner(named_service, dbus_interface='org.freedesktop.DBus')
match_rule = match_rule + ",sender='%s'" % (named_service)
if (path):
match_rule = match_rule + ",path='%s'" % (path)
if (signal_name):
match_rule = match_rule + ",member='%s'" % (signal_name)
return match_rule
def _signal_func(self, connection, message):
if (message.get_type() != dbus_bindings.MESSAGE_TYPE_SIGNAL):
return dbus_bindings.HANDLER_RESULT_NOT_YET_HANDLED
dbus_interface = message.get_interface()
named_service = message.get_sender()
path = message.get_path()
signal_name = message.get_member()
match_rule = self._get_match_rule(signal_name, dbus_interface, named_service, path)
if (self._match_rule_to_receivers.has_key(match_rule)):
receivers = self._match_rule_to_receivers[match_rule]
for receiver in receivers:
args = message.get_args_list()
receiver(*args)
def start_service_by_name(self, named_service):
return dbus_bindings.bus_start_service_by_name(self._connection, named_service)
class SystemBus(Bus):
"""The system-wide message bus
"""
def __init__(self):
Bus.__init__(self, Bus.TYPE_SYSTEM)
class SessionBus(Bus):
"""The session (current login) message bus
"""
def __init__(self):
Bus.__init__(self, Bus.TYPE_SESSION)
class StarterBus(Bus):
"""The bus that activated this process (if
this process was launched by DBus activation)
"""
def __init__(self):
Bus.__init__(self, Bus.TYPE_STARTER)
class Interface:
"""An inteface into a remote object
An Interface can be used to wrap ProxyObjects
so that calls can be routed to their correct
dbus interface
"""
def __init__(self, object, dbus_interface):
self._obj = object
self._dbus_interface = dbus_interface
def connect_to_signal(self, signal_name, handler_function, dbus_interface = None):
if not dbus_interface:
dbus_interface = self._dbus_interface
self._obj.connect_to_signal(signal_name, handler_function, dbus_interface)
def __getattr__(self, member, **keywords):
if (keywords.has_key('dbus_interface')):
_dbus_interface = keywords['dbus_interface']
else:
_dbus_interface = self._dbus_interface
if member == '__call__':
return object.__call__
else:
return self._obj.__getattr__(member, dbus_interface=_dbus_interface)
class ProxyObject:
"""A proxy to the remote Object.
A ProxyObject is provided by the Bus. ProxyObjects
have member functions, and can be called like normal Python objects.
"""
def __init__(self, bus, named_service, object_path):
self._bus = bus
self._named_service = named_service
self._object_path = object_path
def connect_to_signal(self, signal_name, handler_function, dbus_interface=None):
self._bus.add_signal_receiver(handler_function,
signal_name=signal_name,
dbus_interface=dbus_interface,
named_service=self._named_service,
path=self._object_path)
def __getattr__(self, member, **keywords):
if member == '__call__':
return object.__call__
else:
iface = None
if (keywords.has_key('dbus_interface')):
iface = keywords['dbus_interface']
return ProxyMethod(self._bus.get_connection(),
self._named_service,
self._object_path, iface, member)
class ProxyMethod:
"""A proxy Method.
Typically a member of a ProxyObject. Calls to the
method produce messages that travel over the Bus and are routed
to a specific named Service.
"""
def __init__(self, connection, named_service, object_path, dbus_interface, method_name):
self._connection = connection
self._named_service = named_service
self._object_path = object_path
self._method_name = method_name
self._dbus_interface = dbus_interface
def __call__(self, *args, **keywords):
dbus_interface = self._dbus_interface
if (keywords.has_key('dbus_interface')):
dbus_interface = keywords['dbus_interface']
reply_handler = None
if (keywords.has_key('reply_handler')):
reply_handler = keywords['reply_handler']
error_handler = None
if (keywords.has_key('error_handler')):
error_handler = keywords['error_handler']
if not(reply_handler and error_handler):
if reply_handler:
raise MissingErrorself, HandlerException()
elif error_handler:
raise MissingReplyHandlerException()
message = dbus_bindings.MethodCall(self._object_path, dbus_interface, self._method_name)
message.set_destination(self._named_service)
# Add the arguments to the function
iter = message.get_iter(True)
for arg in args:
iter.append(arg)
if reply_handler:
result = self._connection.send_with_reply_handlers(message, -1, reply_handler, error_handler)
args_tuple = (result,)
else:
reply_message = self._connection.send_with_reply_and_block(message, -1)
args_tuple = reply_message.get_args_list()
if len(args_tuple) == 0:
return
elif len(args_tuple) == 1:
return args_tuple[0]
else:
return args_tuple
class Service:
"""A base class for exporting your own Services across the Bus
Just inherit from Service, providing the name of your service
(e.g. org.designfu.SampleService).
"""
def __init__(self, named_service, bus=None):
self._named_service = named_service
if bus == None:
# Get the default bus
self._bus = Bus()
else:
self._bus = bus
dbus_bindings.bus_request_name(self._bus.get_connection(), named_service)
def get_bus(self):
"""Get the Bus this Service is on"""
return self._bus
def get_name(self):
"""Get the name of this service"""
return self._named_service
def _dispatch_dbus_method_call(target_methods, self, argument_list, message):
"""Calls method_to_call using argument_list, but handles
exceptions, etc, and generates a reply to the DBus Message message
"""
try:
target_method = None
dbus_interface = message.get_interface()
if dbus_interface == None:
if target_methods:
target_method = target_methods[0]
else:
for dbus_method in target_methods:
if dbus_method._dbus_interface == dbus_interface:
target_method = dbus_method
break
if target_method:
retval = target_method(self, *argument_list)
else:
if not dbus_interface:
raise UnknownMethodException('%s is not a valid method'%(message.get_member()))
else:
raise UnknownMethodException('%s is not a valid method of interface %s'%(message.get_member(), dbus_interface))
except Exception, e:
if e.__module__ == '__main__':
# FIXME: is it right to use .__name__ here?
error_name = e.__class__.__name__
else:
error_name = e.__module__ + '.' + str(e.__class__.__name__)
error_contents = str(e)
reply = dbus_bindings.Error(message, error_name, error_contents)
else:
reply = dbus_bindings.MethodReturn(message)
if retval != None:
iter = reply.get_iter(append=True)
iter.append(retval)
return reply
class ObjectType(type):
def __init__(cls, name, bases, dct):
#generate out vtable
method_vtable = getattr(cls, '_dbus_method_vtable', {})
reflection_data = getattr(cls, '_dbus_reflection_data', "")
reflection_interface_method_hash = {}
reflection_interface_signal_hash = {}
for func in dct.values():
if getattr(func, '_dbus_is_method', False):
if method_vtable.has_key(func.__name__):
method_vtable[func.__name__].append(func)
else:
method_vtable[func.__name__] = [func]
#generate a hash of interfaces so we can group
#methods in the xml data
if reflection_interface_method_hash.has_key(func._dbus_interface):
reflection_interface_method_hash[func._dbus_interface].append(func)
else:
reflection_interface_method_hash[func._dbus_interface] = [func]
elif getattr(func, '_dbus_is_signal', False):
if reflection_interface_signal_hash.has_key(func._dbus_interface):
reflection_interface_signal_hash[func._dbus_interface].append(func)
else:
reflection_interface_signal_hash[func._dbus_interface] = [func]
for interface in reflection_interface_method_hash.keys():
reflection_data = reflection_data + ' \n'%(interface)
for func in reflection_interface_method_hash[interface]:
reflection_data = reflection_data + ' \n'%(func.__name__)
for arg in func._dbus_args:
reflection_data = reflection_data + ' \n'%(arg)
#reclaim some memory
func._dbus_args = None
reflection_data = reflection_data + ' \n'
if reflection_interface_signal_hash.has_key(interface):
for func in reflection_interface_signal_hash[interface]:
reflection_data = reflection_data + ' \n'%(func.__name__)
for arg in func._dbus_args:
reflection_data = reflection_data + ' \n'%(arg)
#reclaim some memory
func._dbus_args = None
reflection_data = reflection_data + ' \n'
del reflection_interface_signal_hash[interface]
reflection_data = reflection_data + ' \n'
for interface in reflection_interface_signal_hash.keys():
reflection_data = reflection_data + ' \n'%(interface)
for func in reflection_interface_signal_hash[interface]:
reflection_data = reflection_data + ' \n'%(func.__name__)
for arg in func._dbus_args:
reflection_data = reflection_data + ' \n'%(arg)
#reclaim some memory
func._dbus_args = None
reflection_data = reflection_data + ' \n'
reflection_data = reflection_data + ' \n'
cls._dbus_reflection_data = reflection_data
cls._dbus_method_vtable = method_vtable
super(ObjectType, cls).__init__(name, bases, dct)
class Object:
"""A base class for exporting your own Objects across the Bus.
Just inherit from Object and provide a list of methods to share
across the Bus. These will appear as member functions of your
ServiceObject.
"""
__metaclass__ = ObjectType
def __init__(self, object_path, service):
self._object_path = object_path
self._service = service
self._bus = service.get_bus()
self._connection = self._bus.get_connection()
self._connection.register_object_path(object_path, self._unregister_cb, self._message_cb)
def _unregister_cb(self, connection):
print ("Unregister")
def _message_cb(self, connection, message):
target_method_name = message.get_member()
target_methods = self._dbus_method_vtable[target_method_name]
args = message.get_args_list()
reply = _dispatch_dbus_method_call(target_methods, self, args, message)
self._connection.send(reply)
@method('org.freedesktop.DBus.Introspectable')
def Introspect(self):
reflection_data = '\n'
reflection_data = reflection_data + '\n'%(self._object_path)
reflection_data = reflection_data + self._dbus_reflection_data
reflection_data = reflection_data + '\n'
return reflection_data
#Exceptions
class MissingErrorHandlerException(Exception):
def __init__(self):
Exception.__init__(self)
def __str__(self):
return "error_handler not defined: if you define a reply_handler you must also define an error_handler"
class MissingReplyHandlerException(Exception):
def __init__(self):
Exception.__init__(self)
def __str__(self):
return "reply_handler not defined: if you define an error_handler you must also define a reply_handler"
class ValidationException(Exception):
def __init__(self, msg=''):
self.msg = msg
Exception.__init__(self)
def __str__(self):
return "Error validating string: %s" % self.msg
class UnknownMethodException(Exception):
def __init__(self, msg=''):
self.msg = msg
Exception.__init__(self)
def __str__(self):
return "Unknown method: %s" % self.msg
ObjectPath = dbus_bindings.ObjectPath
ByteArray = dbus_bindings.ByteArray