# -*- Mode: Python -*- # jdahlin is the most coolest and awesomest person in the world # and wrote all the good parts of this code. all the bad parts # where python conditionals have a ( ) around them, thus violating # PEP-8 were written by the lame wannabe python programmer seth #FIXME: find memory leaks that I am sure exist #include "dbus_h_wrapper.h" cdef extern from "stdlib.h": cdef void *malloc(size_t size) cdef void free(void *ptr) cdef void *calloc(size_t nmemb, size_t size) cdef extern from "dbus-glib.h": ctypedef struct GMainContext cdef void dbus_connection_setup_with_g_main (DBusConnection *connection, GMainContext *context) cdef void dbus_server_setup_with_g_main (DBusServer *server, GMainContext *context) cdef void dbus_g_thread_init () cdef extern from "Python.h": void Py_XINCREF (object) void Py_XDECREF (object) object PyString_FromStringAndSize(char *, int) ctypedef struct DBusError: char *name char *message unsigned int dummy1 unsigned int dummy2 unsigned int dummy3 unsigned int dummy4 unsigned int dummy5 void *padding1 ctypedef struct DBusMessageIter: void *dummy1 void *dummy2 dbus_uint32_t dummy3 int dummy4 int dummy5 int dummy6 int dummy7 int dummy8 int dummy9 int dummy10 int dummy11 int pad1 int pad2 void *pad3 ctypedef struct DBusObjectPathVTable: DBusObjectPathUnregisterFunction unregister_function DBusObjectPathMessageFunction message_function void (* dbus_internal_pad1) (void *) void (* dbus_internal_pad2) (void *) void (* dbus_internal_pad3) (void *) void (* dbus_internal_pad4) (void *) _user_data_references = [ ] class DBusException(Exception): pass class ConnectionError(Exception): pass class ObjectPath(str): def __init__(self, value): str.__init__(value) class ByteArray(str): def __init__(self, value): str.__init__(value) class Signature(str): def __init__(self, value): str.__init__(value) #forward delcerations cdef class Connection cdef class Message cdef class PendingCall cdef class Watch cdef class MessageIter cdef void cunregister_function_handler (DBusConnection *connection, void *user_data): cdef Connection conn tup = user_data assert (type(tup) == list) function = tup[1] conn = Connection() conn.__cinit__(None, connection) args = [conn] function(*args) cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection, DBusMessage *msg, void *user_data): cdef Connection conn cdef Message message tup = user_data assert (type(tup) == list) function = tup[0] message = Message(_create=0) message._set_msg(msg) conn = Connection() conn.__cinit__(None, connection) args = [conn, message] retval = function(*args) if (retval == None): retval = DBUS_HANDLER_RESULT_HANDLED return retval cdef class Connection: cdef DBusConnection *conn def __init__(self, address=None, Connection _conn=None): cdef DBusConnection *c_conn cdef char *c_address c_conn=NULL if (_conn != None): c_conn = _conn.conn if (address != None or _conn != None): self.__cinit__(c_address, c_conn) # hack to be able to pass in a c pointer to the constructor # while still alowing python programs to create a Connection object cdef __cinit__(self, address, DBusConnection *_conn): cdef DBusError error dbus_error_init(&error) if _conn != NULL: self.conn = _conn dbus_connection_ref(self.conn) else: self.conn = dbus_connection_open(address, &error) if dbus_error_is_set(&error): raise DBusException, error.message cdef _set_conn(self, DBusConnection *conn): self.conn = conn cdef DBusConnection *_get_conn(self): return self.conn def get_unique_name(self): return bus_get_unique_name(self) def setup_with_g_main(self): dbus_connection_setup_with_g_main(self.conn, NULL) def disconnect(self): dbus_connection_disconnect(self.conn) def get_is_connected(self): return dbus_connection_get_is_connected(self.conn) def get_is_authenticated(self): return dbus_connection_get_is_authenticated(self.conn) def flush(self): dbus_connection_flush(self.conn) def borrow_message(self): cdef Message m m = Message(_create=0) m._set_msg(dbus_connection_borrow_message(self.conn)) return m def return_message(self, Message message): cdef DBusMessage *msg msg = message._get_msg() dbus_connection_return_message(self.conn, msg) def steal_borrowed_message(self, Message message): cdef DBusMessage *msg msg = message._get_msg() dbus_connection_steal_borrowed_message(self.conn, msg) def pop_message(self): cdef DBusMessage *msg cdef Message m msg = dbus_connection_pop_message(self.conn) if msg != NULL: m = Message(_create=0) m._set_msg(msg) else: m = None return m def get_dispatch_status(self): return dbus_connection_get_dispatch_status(self.conn) def dispatch(self): return dbus_connection_dispatch(self.conn) def send(self, Message message): #cdef dbus_uint32_t client_serial #if type(message) != Message: # raise TypeError cdef DBusMessage *msg msg = message._get_msg() retval = dbus_connection_send(self.conn, msg, NULL) return retval def send_with_reply(self, Message message, timeout_milliseconds): cdef dbus_bool_t retval cdef DBusPendingCall *cpending_call cdef DBusError error cdef DBusMessage *msg cdef PendingCall pending_call dbus_error_init(&error) cpending_call = NULL msg = message._get_msg() retval = dbus_connection_send_with_reply(self.conn, msg, &cpending_call, timeout_milliseconds) if dbus_error_is_set(&error): raise DBusException, error.message if (cpending_call != NULL): pending_call = PendingCall() pending_call.__cinit__(cpending_call) else: pending_call = None return (retval, pending_call) def send_with_reply_and_block(self, Message message, timeout_milliseconds=0): cdef DBusMessage * retval cdef DBusError error cdef DBusMessage *msg cdef Message m dbus_error_init(&error) msg = message._get_msg() retval = dbus_connection_send_with_reply_and_block( self.conn, msg, timeout_milliseconds, &error) if dbus_error_is_set(&error): raise DBusException, error.message if retval == NULL: raise AssertionError m = Message(_create=0) m._set_msg(retval) return m def set_watch_functions(self, add_function, remove_function, data): pass def set_timeout_functions(self, add_function, remove_function, data): pass def set_wakeup_main_function(self, wakeup_main_function, data): pass # FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function def add_filter(self, filter_function): user_data = [ filter_function ] global _user_data_references _user_data_references.append(user_data) return dbus_connection_add_filter(self.conn, cmessage_function_handler, user_data, NULL) #FIXME: remove_filter # this is pretty tricky, we want to only remove the filter # if we truly have no more calls to our message_function_handler...ugh def set_data(self, slot, data): pass def get_data(self, slot): pass def set_max_message_size(self, size): dbus_connection_set_max_message_size(self.conn, size) def get_max_message_size(self): return dbus_connection_get_max_message_size(self.conn) def set_max_received_size(self, size): dbus_connection_set_max_received_size(self.conn, size) def get_max_received_size(self): return dbus_connection_get_max_received_size(self.conn) def get_outgoing_size(self): return dbus_connection_get_outgoing_size(self.conn) # preallocate_send, free_preallocated_send, send_preallocated def register_object_path(self, path, unregister_cb, message_cb): cdef DBusObjectPathVTable cvtable cvtable.unregister_function = cunregister_function_handler cvtable.message_function = cmessage_function_handler user_data = [message_cb, unregister_cb] global _user_data_references _user_data_references.append(user_data) return dbus_connection_register_object_path(self.conn, path, &cvtable, user_data) def register_fallback(self, path, unregister_cb, message_cb): cdef DBusObjectPathVTable cvtable cvtable.unregister_function = cunregister_function_handler cvtable.message_function = cmessage_function_handler user_data = [message_cb, unregister_cb] global _user_data_references _user_data_references.append(user_data) return dbus_connection_register_fallback(self.conn, path, &cvtable, user_data) #FIXME: unregister_object_path , see problems with remove_filter def list_registered (self, parent_path): cdef char **cchild_entries cdef dbus_bool_t retval retval = dbus_connection_list_registered(self.conn, parent_path, &cchild_entries) if (not retval): #FIXME: raise out of memory exception? return None i = 0 child_entries = [] while (cchild_entries[i] != NULL): child_entries.append(cchild_entries[i]) i = i + 1 dbus_free_string_array(cchild_entries) return child_entries cdef class PendingCall: cdef DBusPendingCall *pending_call def __init__(self, PendingCall _pending_call=None): if (_pending_call != None): self.__cinit__(_pending_call.pending_call) cdef void __cinit__(self, DBusPendingCall *_pending_call): self.pending_call = _pending_call dbus_pending_call_ref(self.pending_call) cdef DBusPendingCall *_get_pending_call(self): return self.pending_call def cancel(self): dbus_pending_call_cancel(self.pending_call) def get_completed(self): return dbus_pending_call_get_completed(self.pending_call) def get_reply(self): cdef Message message message = Message(_create=0) message._set_msg(dbus_pending_call_steal_reply(self.pending_call)) return message def block(self): dbus_pending_call_block(self.pending_call) cdef class Watch: cdef DBusWatch* watch def __init__(self): pass cdef __cinit__(self, DBusWatch *cwatch): self.watch = cwatch def get_fd(self): return dbus_watch_get_fd(self.watch) # FIXME: not picked up correctly by extract.py #def get_flags(self): # return dbus_watch_get_flags(self.watch) def handle(self, flags): return dbus_watch_handle(self.watch, flags) def get_enabled(self): return dbus_watch_get_enabled(self.watch) cdef class MessageIter: cdef DBusMessageIter *iter cdef DBusMessageIter real_iter cdef dbus_uint32_t level def __init__(self, level=0): self.iter = &self.real_iter self.level = level if(self.level > 32): raise TypeError, 'Type recurion is too deep' cdef __cinit__(self, DBusMessageIter *iter): self.real_iter = iter[0] cdef DBusMessageIter *_get_iter(self): return self.iter def has_next(self): return dbus_message_iter_has_next(self.iter) def next(self): return dbus_message_iter_next(self.iter) def get(self, arg_type=None): if(arg_type == None): arg_type = self.get_arg_type() if arg_type == TYPE_INVALID: raise TypeError, 'Invalid arg type in MessageIter' elif arg_type == TYPE_STRING: retval = self.get_string() elif arg_type == TYPE_INT16: retval = self.get_int16() elif arg_type == TYPE_UINT16: retval = self.get_uint16() elif arg_type == TYPE_INT32: retval = self.get_int32() elif arg_type == TYPE_UINT32: retval = self.get_uint32() elif arg_type == TYPE_INT64: retval = self.get_int64() elif arg_type == TYPE_UINT64: retval = self.get_uint64() elif arg_type == TYPE_DOUBLE: retval = self.get_double() elif arg_type == TYPE_BYTE: retval = self.get_byte() elif arg_type == TYPE_BOOLEAN: retval = self.get_boolean() elif arg_type == TYPE_SIGNATURE: retval = self.get_signature() elif arg_type == TYPE_ARRAY: array_type = self.get_element_type() if array_type == TYPE_DICT_ENTRY: retval = self.get_dict() else: retval = self.get_array(array_type) elif arg_type == TYPE_OBJECT_PATH: retval = self.get_object_path() elif arg_type == TYPE_STRUCT: retval = self.get_struct() elif arg_type == TYPE_VARIANT: retval = self.get_variant() elif arg_type == TYPE_DICT_ENTRY: raise TypeError, 'Dictionary Entries can only appear as part of an array container' else: raise TypeError, 'Unknown arg type %d in MessageIter' % (arg_type) return retval def get_arg_type(self): return dbus_message_iter_get_arg_type(self.iter) def get_element_type(self): return dbus_message_iter_get_element_type(self.iter) def get_byte(self): cdef char c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_boolean(self): cdef dbus_bool_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_signature(self): signature_string = self.get_string() return Signature(signature_string) def get_int16(self): cdef dbus_int16_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_uint16(self): cdef dbus_uint16_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_int32(self): cdef dbus_int32_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_uint32(self): cdef dbus_uint32_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_int64(self): cdef dbus_int64_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_uint64(self): cdef dbus_uint64_t c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_double(self): cdef double c_val dbus_message_iter_get_basic(self.iter, &c_val) return c_val def get_string(self): cdef char *c_str dbus_message_iter_get_basic(self.iter, &c_str) return c_str def get_object_path(self): object_path_string = self.get_string() return ObjectPath(object_path_string) def get_dict(self): cdef DBusMessageIter c_dict_iter cdef MessageIter dict_iter level = self.level + 1 dbus_message_iter_recurse(self.iter, &c_dict_iter) dict_iter = MessageIter(level) dict_iter.__cinit__(&c_dict_iter) python_dict = {} cur_arg_type = dict_iter.get_arg_type() while cur_arg_type == TYPE_DICT_ENTRY: if cur_arg_type != TYPE_DICT_ENTRY: raise TypeError, "Dictionary elements must be of type TYPE_DICT_ENTRY '%s != %s'" % (TYPE_DICT_ENTRY, cur_arg_type) dict_entry = dict_iter.get_struct() if len(dict_entry) != 2: raise TypeError, "Dictionary entries must be structs of two elements. This entry had %i elements.'" % (len(dict_entry)) python_dict[dict_entry[0]] = dict_entry[1] dict_iter.next() cur_arg_type = dict_iter.get_arg_type() return python_dict def get_array(self, type): cdef DBusMessageIter c_array_iter cdef MessageIter array_iter level = self.level + 1 dbus_message_iter_recurse(self.iter, &c_array_iter) array_iter = MessageIter(level) array_iter.__cinit__(&c_array_iter) python_list = [] cur_arg_type = array_iter.get_arg_type() while cur_arg_type != TYPE_INVALID: if cur_arg_type != type: raise TypeError, "Array elements must be of the same type '%s != %s'" % (type, cur_arg_type) value = array_iter.get(type) python_list.append(value) array_iter.next() cur_arg_type = array_iter.get_arg_type() return python_list def get_variant(self): cdef DBusMessageIter c_var_iter cdef MessageIter var_iter level = self.level + 1 dbus_message_iter_recurse(self.iter, &c_var_iter) var_iter = MessageIter(level) var_iter.__cinit__(&c_var_iter) return var_iter.get() def get_struct(self): cdef DBusMessageIter c_struct_iter cdef MessageIter struct_iter level = self.level + 1 dbus_message_iter_recurse(self.iter, &c_struct_iter) struct_iter = MessageIter(level) struct_iter.__cinit__(&c_struct_iter) python_list = [] while struct_iter.get_arg_type() != TYPE_INVALID: value = struct_iter.get() python_list.append(value) struct_iter.next() return tuple(python_list) def python_value_to_dbus_sig(self, value, level = 0): if(level > 32): raise TypeError, 'Type recurion is too deep' level = level + 1 ptype = type(value) ret = "" if ptype == bool: ret = TYPE_BOOL ret = str(chr(ret)) elif ptype == int: ret = TYPE_INT32 ret = str(chr(ret)) elif ptype == long: ret = TYPE_INT64 ret = str(chr(ret)) elif ptype == str: ret = TYPE_STRING ret = str(chr(ret)) elif ptype == float: ret = TYPE_FLOAT ret = str(chr(ret)) elif ptype == dict: dict_list = value.items() key, value = dict_list[0] ret = str(chr(TYPE_ARRAY)) + str(chr(DICT_ENTRY_BEGIN)) ret = ret + self.python_value_to_dbus_sig(key, level) ret = ret + self.python_value_to_dbus_sig(value, level) ret = ret + str(chr(DICT_ENTRY_END)) elif ptype == tuple: ret = str(chr(STRUCT_BEGIN)) for item in value: ret = ret + self.python_value_to_dbus_sig(item, level) ret = ret + str(chr(STRUCT_END)) elif ptype == list: ret = str(chr(TYPE_ARRAY)) ret = ret + self.python_value_to_dbus_sig(value[0], level) elif isinstance(value, ObjectPath): ret = TYPE_PATH ret = str(chr(ret)) elif isinstance(ByteArray): ret = str(chr(TYPE_ARRAY)) + str(chr(TYPE_BYTE)) elif isinstance(Signature): ret = TYPE_SIGNATURE ret = str(chr(ret)) else: raise TypeError, "Argument of unknown type '%s'" % (ptype) return ret #FIXME: handle all the different types? def append(self, value): value_type = type(value) if value_type == bool: retval = self.append_boolean(value) elif value_type == int: retval = self.append_int32(value) elif value_type == long: retval = self.append_int64(value) elif value_type == str: retval = self.append_string(value) elif value_type == float: retval = self.append_double(value) elif value_type == dict: retval = self.append_dict(value) elif value_type == tuple: retval = self.append_struct(value) elif value_type == list: retval = self.append_array(value) #elif value_type == None.__class__: # retval = self.append_nil() elif isinstance(value, ObjectPath): retval = self.append_object_path(value) elif isinstance(value, ByteArray): retval = self.append_array(value) elif isinstance(value, Signature): retval = self.append_signature(value) else: raise TypeError, "Argument of unknown type '%s'" % (value_type) return retval def append_boolean(self, value): cdef dbus_bool_t c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_BOOLEAN, &c_value) def append_byte(self, value): cdef char b if type(value) != str or len(value) != 1: raise TypeError b = ord(value) return dbus_message_iter_append_basic(self.iter, TYPE_BYTE, &b) def append_int32(self, value): cdef dbus_int32_t c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_INT32, &c_value) def append_uint32(self, value): cdef dbus_uint32_t c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_UINT32, &c_value) def append_int64(self, value): cdef dbus_int64_t c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_INT64, &c_value) def append_uint64(self, value): cdef dbus_uint64_t c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_UINT64, &c_value) def append_double(self, value): cdef double c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_DOUBLE, &c_value) def append_string(self, value): cdef char *c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_STRING, &c_value) def append_object_path(self, value): cdef char *c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_PATH, &c_value) def append_signature(self, value): cdef char *c_value c_value = value return dbus_message_iter_append_basic(self.iter, TYPE_SIGNATURE, &c_value) def append_dict(self, python_dict): cdef DBusMessageIter c_dict_iter, c_dict_entry_iter cdef MessageIter dict_iter, dict_entry_iter level = self.level + 1 dict_list = python_dict.items() key, value = dict_list[0] sig = str(chr(DICT_ENTRY_BEGIN)) sig = sig + self.python_value_to_dbus_sig(key) sig = sig + self.python_value_to_dbus_sig(value) sig = sig + str(chr(DICT_ENTRY_END)) dbus_message_iter_open_container(self.iter, TYPE_ARRAY, sig, &c_dict_iter) dict_iter = MessageIter(level) dict_iter.__cinit__(&c_dict_iter) for key, value in dict_list: dbus_message_iter_open_container(dict_iter.iter, TYPE_DICT_ENTRY, sig, &c_dict_entry_iter) dict_entry_iter = MessageIter(level) dict_entry_iter.__cinit__(&c_dict_entry_iter) dict_entry_iter.append(key) dict_entry_iter.append(value) dbus_message_iter_close_container(dict_iter.iter, dict_entry_iter.iter) dbus_message_iter_close_container(self.iter, dict_iter.iter) def append_struct(self, python_struct): cdef DBusMessageIter c_struct_iter cdef MessageIter struct_iter level = self.level + 1 dbus_message_iter_open_container(self.iter, TYPE_STRUCT, NULL, &c_struct_iter) struct_iter = MessageIter(level) struct_iter.__cinit__(&c_struct_iter) for item in python_struct: if not struct_iter.append(item): dbus_message_iter_close_container(self.iter, struct_iter.iter) return False dbus_message_iter_close_container(self.iter, struct_iter.iter) def append_array(self, python_list): cdef DBusMessageIter c_array_iter cdef MessageIter array_iter level = self.level + 1 sig = self.python_value_to_dbus_sig(python_list[0]) dbus_message_iter_open_container(self.iter, TYPE_ARRAY, sig, &c_array_iter) array_iter = MessageIter(level) array_iter.__cinit__(&c_array_iter) length = len(python_list) for item in python_list: if not array_iter.append(item): dbus_message_iter_close_container(self.iter, array_iter.iter) return False dbus_message_iter_close_container(self.iter, array_iter.iter) return True def __str__(self): cdef DBusMessageIter c_array_iter cdef MessageIter array_iter value_at_iter = True retval = "" while (value_at_iter): type = self.get_arg_type() if type == TYPE_INVALID: break elif type == TYPE_STRING: str = iter.get_string() arg = 'string:%s\n' % (str) elif type == TYPE_OBJECT_PATH: path = iter.get_object_path() arg = 'object_path:%s\n' % (path) elif type == TYPE_INT32: num = iter.get_int32() arg = 'int32:%d\n' % (num) elif type == TYPE_UINT32: num = iter.get_uint32() arg = 'uint32:%u\n' % (num) elif type == TYPE_INT64: num = iter.get_int64() arg = 'int64:%d\n' % (num) elif type == TYPE_UINT64: num = iter.get_uint64() arg = 'uint64:%u\n' % (num) elif type == TYPE_DOUBLE: num = iter.get_double() arg = 'double:%f\n' % (num) elif type == TYPE_BYTE: num = iter.get_byte() arg = 'byte:%x(%s)\n' % (num, str(chr(num))) elif type == TYPE_BOOLEAN: bool = iter.get_boolean() if (bool): str = "true" else: str = "false" arg = 'boolean:%s\n' % (str) elif type == TYPE_ARRAY: dbus_message_iter_recurse(self.iter, &c_array_iter) array_iter = MessageIter(self.level + 1) array_iter.__cinit__(&c_array_iter) if array_iter.has_next(): arg = 'array [' + str(array_iter) + ']' else: arg = 'array []' else: arg = '(unknown arg type %d)\n' % type retval = retval + arg value_at_iter = self.next() return retval (MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5) (TYPE_INVALID, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT16, TYPE_UINT16, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_OBJECT_PATH, TYPE_SIGNATURE, TYPE_ARRAY, TYPE_STRUCT, STRUCT_BEGIN, STRUCT_END, TYPE_VARIANT, TYPE_DICT_ENTRY, DICT_ENTRY_BEGIN, DICT_ENTRY_END) = (0, ord('y'), ord('b'), ord('n'), ord('i'), ord('u'), ord('q'), ord('x'), ord('t'), ord('d'), ord('s'), ord('o'), ord('g'), ord('a'), ord('r'), ord('('), ord(')'), ord('v'), ord('e'), ord('{'), ord('}')) (HANDLER_RESULT_HANDLED, HANDLER_RESULT_NOT_YET_HANDLED, HANDLER_RESULT_NEED_MEMORY) = range(3) cdef class Message: cdef DBusMessage *msg def __init__(self, message_type=MESSAGE_TYPE_INVALID, service=None, path=None, interface=None, method=None, Message method_call=None, name=None, Message reply_to=None, error_name=None, error_message=None, _create=1): cdef char *cservice cdef DBusMessage *cmsg if (service == None): cservice = NULL else: cservice = service if not _create: return if message_type == MESSAGE_TYPE_METHOD_CALL: self.msg = dbus_message_new_method_call(cservice, path, interface, method) elif message_type == MESSAGE_TYPE_METHOD_RETURN: cmsg = method_call._get_msg() self.msg = dbus_message_new_method_return(cmsg) elif message_type == MESSAGE_TYPE_SIGNAL: self.msg = dbus_message_new_signal(path, interface, name) elif message_type == MESSAGE_TYPE_ERROR: cmsg = reply_to._get_msg() self.msg = dbus_message_new_error(cmsg, error_name, error_message) def type_to_name(self, type): if type == MESSAGE_TYPE_SIGNAL: return "signal" elif type == MESSAGE_TYPE_METHOD_CALL: return "method call" elif type == MESSAGE_TYPE_METHOD_RETURN: return "method return" elif type == MESSAGE_TYPE_ERROR: return "error" else: return "(unknown message type)" def __str__(self): message_type = self.get_type() sender = self.get_sender() if sender == None: sender = "(no sender)" if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL): retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type), self.get_interface(), self.get_member(), sender) elif message_type == MESSAGE_TYPE_METHOD_RETURN: retval = '%s sender=%s' % (self.type_to_name(message_type), sender) elif message_type == MESSAGE_TYPE_ERROR: retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type), self.get_error_name(), sender) else: retval = "Message of unknown type %d" % (message_type) # FIXME: should really use self.convert_to_tuple() here iter = self.get_iter() retval = retval + "\n" + str(iter) return retval cdef _set_msg(self, DBusMessage *msg): self.msg = msg cdef DBusMessage *_get_msg(self): return self.msg def get_iter(self, append=False): cdef DBusMessageIter iter cdef MessageIter message_iter cdef DBusMessage *msg msg = self._get_msg() if append: dbus_message_iter_init_append(msg, &iter) else: dbus_message_iter_init(msg, &iter) message_iter = MessageIter(0) message_iter.__cinit__(&iter) return message_iter def get_args_list(self): retval = [ ] iter = self.get_iter() try: retval.append(iter.get()) except TypeError, e: return [ ] value_at_iter = iter.next() while (value_at_iter): retval.append(iter.get()) value_at_iter = iter.next() return retval # FIXME: implement dbus_message_copy? def get_type(self): return dbus_message_get_type(self.msg) def set_path(self, object_path): return dbus_message_set_path(self.msg, object_path) def get_path(self): return dbus_message_get_path(self.msg) def set_interface(self, interface): return dbus_message_set_interface(self.msg, interface) def get_interface(self): return dbus_message_get_interface(self.msg) def set_member(self, member): return dbus_message_set_member(self.msg, member) def get_member(self): return dbus_message_get_member(self.msg) def set_error_name(self, name): return dbus_message_set_error_name(self.msg, name) def get_error_name(self): return dbus_message_get_error_name(self.msg) def set_destination(self, destination): return dbus_message_set_destination(self.msg, destination) def get_destination(self): return dbus_message_get_destination(self.msg) def set_sender(self, sender): return dbus_message_set_sender(self.msg, sender) def get_sender(self): cdef char *sender sender = dbus_message_get_sender(self.msg) if (sender == NULL): return None else: return sender def set_no_reply(self, no_reply): dbus_message_set_no_reply(self.msg, no_reply) def get_no_reply(self): return dbus_message_get_no_reply(self.msg) def is_method_call(self, interface, method): return dbus_message_is_method_call(self.msg, interface, method) def is_signal(self, interface, signal_name): return dbus_message_is_signal(self.msg, interface, signal_name) def is_error(self, error_name): return dbus_message_is_error(self.msg, error_name) def has_destination(self, service): return dbus_message_has_destination(self.msg, service) def has_sender(self, service): return dbus_message_has_sender(self.msg, service) def get_serial(self): return dbus_message_get_serial(self.msg) def set_reply_serial(self, reply_serial): return dbus_message_set_reply_serial(self.msg, reply_serial) def get_reply_serial(self): return dbus_message_get_reply_serial(self.msg) #FIXME: dbus_message_get_path_decomposed # FIXME: all the different dbus_message_*args* methods class Signal(Message): def __init__(self, spath, sinterface, sname): Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, interface=sinterface, name=sname) class MethodCall(Message): def __init__(self, mpath, minterface, mmethod): Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, interface=minterface, method=mmethod) class MethodReturn(Message): def __init__(self, method_call): Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call) class Error(Message): def __init__(self, reply_to, error_name, error_message): Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message) cdef class Server: cdef DBusServer *server def __init__(self, address): cdef DBusError error dbus_error_init(&error) self.server = dbus_server_listen(address, &error) if dbus_error_is_set(&error): raise DBusException, error.message def setup_with_g_main (self): dbus_server_setup_with_g_main(self.server, NULL) def disconnect(self): dbus_server_disconnect(self.server) def get_is_connected(self): return dbus_server_get_is_connected(self.server) # def set_new_connection_function(self, function, data): # dbus_server_set_new_connection_function(self.conn, function, # data, NULL) # def set_watch_functions(self, add_function, remove_function, data): # dbus_server_set_watch_functions(self.server, # add_function, remove_function, # data, NULL) # def set_timeout_functions(self, add_function, remove_function, data): # dbus_server_set_timeout_functions(self.server, # add_function, remove_function, # data, NULL) # def handle_watch(self, watch, condition): # dbus_server_handle_watch(self.conn, watch, condition) BUS_SESSION = DBUS_BUS_SESSION BUS_SYSTEM = DBUS_BUS_SYSTEM BUS_STARTER = DBUS_BUS_STARTER def bus_get (bus_type): cdef DBusError error cdef Connection conn dbus_error_init(&error) cdef DBusConnection *connection connection = dbus_bus_get(bus_type, &error) if dbus_error_is_set(&error): raise DBusException, error.message conn = Connection() conn.__cinit__(None, connection) return conn def bus_get_unique_name(Connection connection): cdef DBusConnection *conn conn = connection._get_conn() return dbus_bus_get_unique_name(conn) def bus_get_unix_user(Connection connection, service_name): cdef DBusError error dbus_error_init(&error) cdef int retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_get_unix_user(conn, service_name, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval #These are defines, not enums so they aren't auto generated DBUS_START_REPLY_SUCCESS = 0 DBUS_START_REPLY_ALREADY_RUNNING = 1 def bus_start_service_by_name(Connection connection, service_name, flags=0): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef dbus_uint32_t results cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_start_service_by_name(conn, service_name, flags, &results, &error) return (retval, results) def bus_register(Connection connection): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_register(conn, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1 SERVICE_FLAG_REPLACE_EXISTING = 0x2 def bus_request_name(Connection connection, service_name, flags=0): cdef DBusError error dbus_error_init(&error) cdef int retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_request_name(conn, service_name, flags, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval def bus_name_has_owner(Connection connection, service_name): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_name_has_owner(conn, service_name, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval def bus_add_match(Connection connection, rule): cdef DBusError error cdef DBusConnection *conn dbus_error_init(&error) conn = connection._get_conn() dbus_bus_add_match (conn, rule, &error) if dbus_error_is_set(&error): raise DBusException, error.message def bus_remove_match(Connection connection, rule): cdef DBusError error cdef DBusConnection *conn dbus_error_init(&error) conn = connection._get_conn() dbus_bus_remove_match (conn, rule, &error) if dbus_error_is_set(&error): raise DBusException, error.message def init_gthreads (): dbus_g_thread_init ()