import dbus_bindings
import _dbus
import operator
from exceptions import UnknownMethodException
from decorators import method
from decorators import signal
class BusName:
"""A base class for exporting your own Named Services across the Bus
"""
def __init__(self, named_service, bus=None):
self._named_service = named_service
if bus == None:
# Get the default bus
self._bus = _dbus.Bus()
else:
self._bus = bus
dbus_bindings.bus_request_name(self._bus.get_connection(), named_service)
def get_bus(self):
"""Get the Bus this Service is on"""
return self._bus
def get_name(self):
"""Get the name of this service"""
return self._named_service
def _dispatch_dbus_method_call(self, argument_list, message):
"""Calls method_to_call using argument_list, but handles
exceptions, etc, and generates a reply to the DBus Message message
"""
try:
method_name = message.get_member()
dbus_interface = message.get_interface()
candidate = None
successful = False
# split up the cases when we do and don't have an interface because the
# latter is much simpler
if dbus_interface:
# search through the class hierarchy in python MRO order
for cls in self.__class__.__mro__:
# if we haven't got a candidate yet, and we find a class with a
# suitably named member, save this as a candidate
if (not candidate and method_name in cls.__dict__):
if ("_dbus_is_method" in cls.__dict__[method_name].__dict__
and "_dbus_interface" in cls.__dict__[method_name].__dict__):
# however if it is annotated for a different interface
# than we are looking for, it cannot be a candidate
if cls.__dict__[method_name]._dbus_interface == dbus_interface:
candidate = cls
sucessful = True
target_parent = cls.__dict__[method_name]
break
else:
pass
else:
candidate = cls
# if we have a candidate, carry on checking this and all
# superclasses for a method annoated as a dbus method
# on the correct interface
if (candidate and method_name in cls.__dict__
and "_dbus_is_method" in cls.__dict__[method_name].__dict__
and "_dbus_interface" in cls.__dict__[method_name].__dict__
and cls.__dict__[method_name]._dbus_interface == dbus_interface):
# the candidate is a dbus method on the correct interface,
# or overrides a method that is, success!
target_parent = cls.__dict__[method_name]
sucessful = True
break
else:
# simpler version of above
for cls in self.__class__.__mro__:
if (not candidate and method_name in cls.__dict__):
candidate = cls
if (candidate and method_name in cls.__dict__
and "_dbus_is_method" in cls.__dict__[method_name].__dict__):
target_parent = cls.__dict__[method_name]
sucessful = True
break
retval = candidate.__dict__[method_name](self, *argument_list)
target_name = str(candidate.__module__) + '.' + candidate.__name__ + '.' + method_name
if not sucessful:
if not dbus_interface:
raise UnknownMethodException('%s is not a valid method'%(message.get_member()))
else:
raise UnknownMethodException('%s is not a valid method of interface %s'%(message.get_member(), dbus_interface))
except Exception, e:
if e.__module__ == '__main__':
# FIXME: is it right to use .__name__ here?
error_name = e.__class__.__name__
else:
error_name = e.__module__ + '.' + str(e.__class__.__name__)
error_contents = str(e)
reply = dbus_bindings.Error(message, error_name, error_contents)
else:
reply = dbus_bindings.MethodReturn(message)
# do strict adding if an output signature was provided
if target_parent._dbus_out_signature != None:
# iterate signature into list of complete types
signature = tuple(dbus_bindings.Signature(target_parent._dbus_out_signature))
if retval == None:
if len(signature) != 0:
raise TypeError('%s returned nothing but output signature is %s' %
(target_name, target_parent._dbus_out_signature))
elif len(signature) == 1:
iter = reply.get_iter(append=True)
iter.append_strict(retval, signature[0])
elif len(signature) > 1:
if operator.isSequenceType(retval):
if len(signature) > len(retval):
raise TypeError('output signature %s is longer than the number of values returned by %s' %
(target_parent._dbus_out_signature, target_name))
elif len(retval) > len(signature):
raise TypeError('output signature %s is shorter than the number of values returned by %s' %
(target_parent._dbus_out_signature, target_name))
else:
iter = reply.get_iter(append=True)
for (value, sig) in zip(retval, signature):
iter.append_strict(value, sig)
else:
raise TypeError('output signature %s has multiple values but %s didn\'t return a sequence type' %
(target_parent._dbus_out_signature, target_name))
# try and guess the return type
elif retval != None:
iter = reply.get_iter(append=True)
iter.append(retval)
return reply
class InterfaceType(type):
def __init__(cls, name, bases, dct):
# these attributes are shared between all instances of the Interface
# object, so this has to be a dictionary that maps class names to
# the per-class introspection/interface data
class_table = getattr(cls, '_dbus_class_table', {})
cls._dbus_class_table = class_table
interface_table = class_table[cls.__module__ + '.' + name] = {}
# merge all the name -> method tables for all the interfaces
# implemented by our base classes into our own
for b in bases:
base_name = b.__module__ + '.' + b.__name__
if getattr(b, '_dbus_class_table', False):
for (interface, method_table) in class_table[base_name].iteritems():
our_method_table = interface_table.setdefault(interface, {})
our_method_table.update(method_table)
# add in all the name -> method entries for our own methods/signals
for func in dct.values():
if getattr(func, '_dbus_interface', False):
method_table = interface_table.setdefault(func._dbus_interface, {})
method_table[func.__name__] = func
super(InterfaceType, cls).__init__(name, bases, dct)
# methods are different to signals, so we have two functions... :)
def _reflect_on_method(cls, func):
args = func._dbus_args
if func._dbus_in_signature:
# convert signature into a tuple so length refers to number of
# types, not number of characters
in_sig = tuple(dbus_bindings.Signature(func._dbus_in_signature))
if len(in_sig) > len(args):
raise ValueError, 'input signature is longer than the number of arguments taken'
elif len(in_sig) < len(args):
raise ValueError, 'input signature is shorter than the number of arguments taken'
else:
# magic iterator which returns as many v's as we need
in_sig = dbus_bindings.VariantSignature()
if func._dbus_out_signature:
out_sig = dbus_bindings.Signature(func._dbus_out_signature)
else:
# its tempting to default to dbus_bindings.Signature('v'), but
# for methods that return nothing, providing incorrect
# introspection data is worse than providing none at all
out_sig = []
reflection_data = ' \n' % (func.__name__)
for pair in zip(in_sig, args):
reflection_data += ' \n' % pair
for type in out_sig:
reflection_data += ' \n' % type
reflection_data += ' \n'
return reflection_data
def _reflect_on_signal(cls, func):
args = func._dbus_args
if func._dbus_signature:
# convert signature into a tuple so length refers to number of
# types, not number of characters
sig = tuple(dbus_bindings.Signature(func._dbus_signature))
if len(sig) > len(args):
raise ValueError, 'signal signature is longer than the number of arguments provided'
elif len(sig) < len(args):
raise ValueError, 'signal signature is shorter than the number of arguments provided'
else:
# magic iterator which returns as many v's as we need
sig = dbus_bindings.VariantSignature()
reflection_data = ' \n' % (func.__name__)
for pair in zip(sig, args):
reflection_data = reflection_data + ' \n' % pair
reflection_data = reflection_data + ' \n'
return reflection_data
class Interface(object):
__metaclass__ = InterfaceType
class Object(Interface):
"""A base class for exporting your own Objects across the Bus.
Just inherit from Object and provide a list of methods to share
across the Bus
"""
def __init__(self, bus_name, object_path):
self._object_path = object_path
self._name = bus_name
self._bus = bus_name.get_bus()
self._connection = self._bus.get_connection()
self._connection.register_object_path(object_path, self._unregister_cb, self._message_cb)
def _unregister_cb(self, connection):
print ("Unregister")
def _message_cb(self, connection, message):
try:
target_method_name = message.get_member()
args = message.get_args_list()
reply = _dispatch_dbus_method_call(self, args, message)
self._connection.send(reply)
except Exception, e:
error_reply = dbus_bindings.Error(message,
"org.freedesktop.DBus.Python.%s" % e.__class__.__name__,
str(e))
self._connection.send(error_reply)
@method('org.freedesktop.DBus.Introspectable', in_signature='', out_signature='s')
def Introspect(self):
reflection_data = '\n'
reflection_data += '\n' % (self._object_path)
interfaces = self._dbus_class_table[self.__class__.__module__ + '.' + self.__class__.__name__]
for (name, funcs) in interfaces.iteritems():
reflection_data += ' \n' % (name)
for func in funcs.values():
if getattr(func, '_dbus_is_method', False):
reflection_data += self.__class__._reflect_on_method(func)
elif getattr(func, '_dbus_is_signal', False):
reflection_data += self.__class__._reflect_on_signal(func)
reflection_data += ' \n'
reflection_data += '\n'
return reflection_data