# -*- Mode: Python -*- # jdahlin is the most coolest and awesomest person in the world # and wrote all the good parts of this code. all the bad parts # where python conditionals have a ( ) around them, thus violating # PEP-8 were written by the lame wannabe python programmer seth #include "dbus_h_wrapper.h" cdef extern from "stdlib.h": cdef void *malloc(size_t size) cdef void free(void *ptr) cdef void *calloc(size_t nmemb, size_t size) cdef extern from "dbus-glib.h": ctypedef struct GMainContext cdef void dbus_connection_setup_with_g_main (DBusConnection *connection, GMainContext *context) cdef void dbus_server_setup_with_g_main (DBusServer *server, GMainContext *context) cdef void dbus_g_thread_init () cdef extern from "Python.h": void Py_XINCREF (object) void Py_XDECREF (object) object PyString_FromStringAndSize(char *, int) ctypedef struct DBusError: char *name char *message unsigned int dummy1 unsigned int dummy2 unsigned int dummy3 unsigned int dummy4 unsigned int dummy5 void *padding1 ctypedef struct DBusMessageIter: void *dummy1 void *dummy2 dbus_uint32_t dummy3 int dummy4 int dummy5 int dummy6 int dummy7 int dummy8 int dummy9 int dummy10 int dummy11 int pad1 int pad2 void *pad3 ctypedef struct DBusObjectPathVTable: DBusObjectPathUnregisterFunction unregister_function DBusObjectPathMessageFunction message_function void (* dbus_internal_pad1) (void *) void (* dbus_internal_pad2) (void *) void (* dbus_internal_pad3) (void *) void (* dbus_internal_pad4) (void *) _user_data_references = [ ] class DBusException(Exception): pass class ConnectionError(Exception): pass class ObjectPath(str): def __init__(self, value): str.__init__(value) class ByteArray(str): def __init__(self, value): str.__init__(value) #forward delcerations cdef class Connection cdef class Message cdef class PendingCall cdef class Watch cdef class MessageIter cdef void cunregister_function_handler (DBusConnection *connection, void *user_data): cdef Connection conn tup = user_data assert (type(tup) == list) function = tup[1] conn = Connection() conn.__cinit__(None, connection) args = [conn] function(*args) cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection, DBusMessage *msg, void *user_data): cdef Connection conn cdef Message message tup = user_data assert (type(tup) == list) function = tup[0] message = Message(_create=0) message._set_msg(msg) conn = Connection() conn.__cinit__(None, connection) args = [conn, message] retval = function(*args) if (retval == None): retval = DBUS_HANDLER_RESULT_HANDLED return retval cdef class Connection: cdef DBusConnection *conn def __init__(self, address=None, Connection _conn=None): cdef DBusConnection *c_conn cdef char *c_address c_conn=NULL if (_conn != None): c_conn = _conn.conn if (address != None or _conn != None): self.__cinit__(c_address, c_conn) # hack to be able to pass in a c pointer to the constructor # while still alowing python programs to create a Connection object cdef __cinit__(self, address, DBusConnection *_conn): cdef DBusError error dbus_error_init(&error) if _conn != NULL: self.conn = _conn dbus_connection_ref(self.conn) else: self.conn = dbus_connection_open(address, &error) if dbus_error_is_set(&error): raise DBusException, error.message cdef _set_conn(self, DBusConnection *conn): self.conn = conn cdef DBusConnection *_get_conn(self): return self.conn def get_base_service(self): return bus_get_base_service(self) def setup_with_g_main(self): dbus_connection_setup_with_g_main(self.conn, NULL) def disconnect(self): dbus_connection_disconnect(self.conn) def get_is_connected(self): return dbus_connection_get_is_connected(self.conn) def get_is_authenticated(self): return dbus_connection_get_is_authenticated(self.conn) def flush(self): dbus_connection_flush(self.conn) def borrow_message(self): cdef Message m m = Message(_create=0) m._set_msg(dbus_connection_borrow_message(self.conn)) return m def return_message(self, Message message): cdef DBusMessage *msg msg = message._get_msg() dbus_connection_return_message(self.conn, msg) def steal_borrowed_message(self, Message message): cdef DBusMessage *msg msg = message._get_msg() dbus_connection_steal_borrowed_message(self.conn, msg) def pop_message(self): cdef DBusMessage *msg cdef Message m msg = dbus_connection_pop_message(self.conn) if msg != NULL: m = Message(_create=0) m._set_msg(msg) else: m = None return m def get_dispatch_status(self): return dbus_connection_get_dispatch_status(self.conn) def dispatch(self): return dbus_connection_dispatch(self.conn) def send(self, Message message): #cdef dbus_uint32_t client_serial #if type(message) != Message: # raise TypeError cdef DBusMessage *msg msg = message._get_msg() retval = dbus_connection_send(self.conn, msg, NULL) return retval def send_with_reply(self, Message message, timeout_milliseconds): cdef dbus_bool_t retval cdef DBusPendingCall *cpending_call cdef DBusError error cdef DBusMessage *msg cdef PendingCall pending_call dbus_error_init(&error) cpending_call = NULL msg = message._get_msg() retval = dbus_connection_send_with_reply(self.conn, msg, &cpending_call, timeout_milliseconds) if dbus_error_is_set(&error): raise DBusException, error.message if (cpending_call != NULL): pending_call = PendingCall() pending_call.__cinit__(cpending_call) else: pending_call = None return (retval, pending_call) def send_with_reply_and_block(self, Message message, timeout_milliseconds=0): cdef DBusMessage * retval cdef DBusError error cdef DBusMessage *msg cdef Message m dbus_error_init(&error) msg = message._get_msg() retval = dbus_connection_send_with_reply_and_block( self.conn, msg, timeout_milliseconds, &error) if dbus_error_is_set(&error): raise DBusException, error.message if retval == NULL: raise AssertionError m = Message(_create=0) m._set_msg(retval) return m def set_watch_functions(self, add_function, remove_function, data): pass def set_timeout_functions(self, add_function, remove_function, data): pass def set_wakeup_main_function(self, wakeup_main_function, data): pass # FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function def add_filter(self, filter_function): user_data = [ filter_function ] global _user_data_references _user_data_references.append(user_data) return dbus_connection_add_filter(self.conn, cmessage_function_handler, user_data, NULL) #FIXME: remove_filter # this is pretty tricky, we want to only remove the filter # if we truly have no more calls to our message_function_handler...ugh def set_data(self, slot, data): pass def get_data(self, slot): pass def set_max_message_size(self, size): dbus_connection_set_max_message_size(self.conn, size) def get_max_message_size(self): return dbus_connection_get_max_message_size(self.conn) def set_max_received_size(self, size): dbus_connection_set_max_received_size(self.conn, size) def get_max_received_size(self): return dbus_connection_get_max_received_size(self.conn) def get_outgoing_size(self): return dbus_connection_get_outgoing_size(self.conn) # preallocate_send, free_preallocated_send, send_preallocated def register_object_path(self, path, unregister_cb, message_cb): cdef DBusObjectPathVTable cvtable cvtable.unregister_function = cunregister_function_handler cvtable.message_function = cmessage_function_handler user_data = [message_cb, unregister_cb] global _user_data_references _user_data_references.append(user_data) return dbus_connection_register_object_path(self.conn, path, &cvtable, user_data) def register_fallback(self, path, unregister_cb, message_cb): cdef DBusObjectPathVTable cvtable cvtable.unregister_function = cunregister_function_handler cvtable.message_function = cmessage_function_handler user_data = [message_cb, unregister_cb] global _user_data_references _user_data_references.append(user_data) return dbus_connection_register_fallback(self.conn, path, &cvtable, user_data) #FIXME: unregister_object_path , see problems with remove_filter def list_registered (self, parent_path): cdef char **cchild_entries cdef dbus_bool_t retval retval = dbus_connection_list_registered(self.conn, parent_path, &cchild_entries) if (not retval): #FIXME: raise out of memory exception? return None i = 0 child_entries = [] while (cchild_entries[i] != NULL): child_entries.append(cchild_entries[i]) i = i + 1 dbus_free_string_array(cchild_entries) return child_entries cdef class PendingCall: cdef DBusPendingCall *pending_call def __init__(self, PendingCall _pending_call=None): if (_pending_call != None): self.__cinit__(_pending_call.pending_call) cdef void __cinit__(self, DBusPendingCall *_pending_call): self.pending_call = _pending_call dbus_pending_call_ref(self.pending_call) cdef DBusPendingCall *_get_pending_call(self): return self.pending_call def cancel(self): dbus_pending_call_cancel(self.pending_call) def get_completed(self): return dbus_pending_call_get_completed(self.pending_call) def get_reply(self): cdef Message message message = Message(_create=0) message._set_msg(dbus_pending_call_steal_reply(self.pending_call)) return message def block(self): dbus_pending_call_block(self.pending_call) cdef class Watch: cdef DBusWatch* watch def __init__(self): pass cdef __cinit__(self, DBusWatch *cwatch): self.watch = cwatch def get_fd(self): return dbus_watch_get_fd(self.watch) # FIXME: not picked up correctly by extract.py #def get_flags(self): # return dbus_watch_get_flags(self.watch) def handle(self, flags): return dbus_watch_handle(self.watch, flags) def get_enabled(self): return dbus_watch_get_enabled(self.watch) cdef class MessageIter: cdef DBusMessageIter *iter cdef DBusMessageIter real_iter def __init__(self): self.iter = &self.real_iter cdef __cinit__(self, DBusMessageIter *iter): self.real_iter = iter[0] cdef DBusMessageIter *_get_iter(self): return self.iter def has_next(self): return dbus_message_iter_has_next(self.iter) def next(self): return dbus_message_iter_next(self.iter) def get(self): arg_type = self.get_arg_type() if arg_type == TYPE_INVALID: raise TypeError, 'Invalid arg type in MessageIter' elif arg_type == TYPE_NIL: retval = None elif arg_type == TYPE_STRING: retval = self.get_string() elif arg_type == TYPE_INT32: retval = self.get_int32() elif arg_type == TYPE_UINT32: retval = self.get_uint32() elif arg_type == TYPE_INT64: retval = self.get_int64() elif arg_type == TYPE_UINT64: retval = self.get_uint64() elif arg_type == TYPE_DOUBLE: retval = self.get_double() elif arg_type == TYPE_BYTE: retval = self.get_byte() elif arg_type == TYPE_BOOLEAN: retval = self.get_boolean() elif arg_type == TYPE_ARRAY: array_type = self.get_array_type() if array_type == TYPE_STRING: retval = self.get_string_array() elif array_type == TYPE_OBJECT_PATH: retval = self.get_object_path_array() elif array_type == TYPE_BYTE: retval = self.get_byte_array() elif array_type == TYPE_INT32: retval = self.get_int32_array() elif array_type == TYPE_UINT32: retval = self.get_uint32_array() elif array_type == TYPE_INT64: retval = self.get_int64_array() elif array_type == TYPE_UINT64: retval = self.get_uint64_array() elif array_type == TYPE_DOUBLE: retval = self.get_double_array() else: raise TypeError, "Unknown array type %d in MessageIter" % (array_type) elif arg_type == TYPE_DICT: retval = self.get_dict() elif arg_type == TYPE_OBJECT_PATH: retval = self.get_object_path() else: raise TypeError, 'Unknown arg type %d in MessageIter' % (arg_type) return retval def get_dict(self): cdef DBusMessageIter c_dict_iter cdef MessageIter dict_iter dbus_message_iter_init_dict_iterator(self.iter, &c_dict_iter) dict_iter = MessageIter() dict_iter.__cinit__(&c_dict_iter) dict = {} end_of_dict = False while True: key = dict_iter.get_dict_key() value = dict_iter.get() dict[key] = value if not dict_iter.has_next(): break dict_iter.next() return dict def get_arg_type(self): return dbus_message_iter_get_arg_type(self.iter) def get_array_type(self): return dbus_message_iter_get_array_type(self.iter) # FIXME: implement get_byte #def get_byte(self): # return dbus_message_iter_get_byte(self.iter) def get_boolean(self): return dbus_message_iter_get_boolean(self.iter) def get_int32(self): return dbus_message_iter_get_int32(self.iter) def get_uint32(self): return dbus_message_iter_get_uint32(self.iter) def get_int64(self): return dbus_message_iter_get_int64(self.iter) def get_uint64(self): return dbus_message_iter_get_uint64(self.iter) def get_double(self): return dbus_message_iter_get_double(self.iter) def get_string(self): return dbus_message_iter_get_string(self.iter) def get_object_path(self): object_path_string = dbus_message_iter_get_object_path(self.iter) return ObjectPath(object_path_string) def get_dict_key(self): return dbus_message_iter_get_dict_key(self.iter) # FIXME: implement dbus_message_iter_init_array_iterator def get_byte_array(self): cdef int len cdef unsigned char *bytearray cdef int i dbus_message_iter_get_byte_array(self.iter, &bytearray, &len) python_string = PyString_FromStringAndSize(bytearray, len) return python_string # FIXME: implement dbus_message_iter_get_boolean_array def get_int32_array(self): cdef int len cdef dbus_int32_t *retval cdef int i dbus_message_iter_get_int32_array(self.iter, &retval, &len) python_list = [] for i from 0 <= i < len: python_list.append(retval[i]) return python_list def get_uint32_array(self): cdef int len cdef dbus_uint32_t *retval cdef int i dbus_message_iter_get_uint32_array(self.iter, &retval, &len) python_list = [] for i from 0 <= i < len: python_list.append(retval[i]) return python_list def get_int64_array(self): cdef int len cdef dbus_int64_t *retval cdef int i dbus_message_iter_get_int64_array(self.iter, &retval, &len) python_list = [] for i from 0 <= i < len: python_list.append(retval[i]) return python_list def get_uint64_array(self): cdef int len cdef dbus_uint64_t *retval cdef int i dbus_message_iter_get_uint64_array(self.iter, &retval, &len) python_list = [] for i from 0 <= i < len: python_list.append(retval[i]) return python_list def get_double_array(self): cdef int len cdef double *retval cdef int i dbus_message_iter_get_double_array(self.iter, &retval, &len) python_list = [] for i from 0 <= i < len: python_list.append(retval[i]) return python_list def get_string_array(self): cdef int len cdef char **retval cdef int i dbus_message_iter_get_string_array(self.iter, &retval, &len) list = [] for i from 0 <= i < len: list.append(retval[i]) return list def get_object_path_array(self): cdef int len cdef char **retval cdef int i dbus_message_iter_get_object_path_array(self.iter, &retval, &len) list = [] for i from 0 <= i < len: list.append(ObjectPath(retval[i])) return list # dbus_message_append_iter_init included in class Message #FIXME: handle all the different types? def append(self, value): value_type = type(value) if value_type == bool: retval = self.append_boolean(value) elif value_type == int: retval = self.append_int32(value) elif value_type == long: retval = self.append_int64(value) elif value_type == str: retval = self.append_string(value) elif value_type == float: retval = self.append_double(value) elif value_type == dict: retval = self.append_dict(value) elif value_type == list: if len(value) == 0: # Empty lists are currently not supported, returning None instead retval = self.append(None) else: list_type = type(value[0]) if list_type == str: self.append_string_array(value) elif list_type == int: self.append_int32_array(value) elif list_type == long: self.append_int64_array(value) elif list_type == float: self.append_double_array(value) elif isinstance(value[0], ObjectPath): self.append_object_path_array(value) else: raise TypeError, "List of unknown type '%s'" % (list_type) elif value_type == None.__class__: retval = self.append_nil() elif isinstance(value, ObjectPath): retval = self.append_object_path(value) elif isinstance(value, ByteArray): retval = self.append_byte_array(value) else: raise TypeError, "Argument of unknown type '%s'" % (value_type) return retval def append_nil(self): return dbus_message_iter_append_nil(self.iter) def append_boolean(self, value): return dbus_message_iter_append_boolean(self.iter, value) def append_byte(self, value): if type(value) != str or len(value) != 1: raise TypeError return dbus_message_iter_append_byte(self.iter, ord(value)) def append_int32(self, value): return dbus_message_iter_append_int32(self.iter, value) def append_uint32(self, value): return dbus_message_iter_append_uint32(self.iter, value) def append_int64(self, value): return dbus_message_iter_append_int64(self.iter, value) def append_uint64(self, value): return dbus_message_iter_append_uint64(self.iter, value) def append_double(self, value): return dbus_message_iter_append_double(self.iter, value) def append_string(self, value): return dbus_message_iter_append_string(self.iter, value) def append_dict_key(self, value): return dbus_message_iter_append_dict_key(self.iter, value) def append_object_path(self, value): return dbus_message_iter_append_object_path(self.iter, value) # FIXME: append_array, append_boolean_array, append_uint32_array, # append_uint64_array def append_dict(self, python_dict): cdef DBusMessageIter c_dict_iter cdef MessageIter dict_iter dbus_message_iter_append_dict(self.iter, &c_dict_iter) dict_iter = MessageIter() dict_iter.__cinit__(&c_dict_iter) for key, value in python_dict.iteritems(): if type(key) != str: raise TypeError, "DBus dict keys must be strings" dict_iter.append_dict_key(key) dict_iter.append(value) def append_byte_array(self, python_list): cdef unsigned char * value cdef int length cdef int i length = len(python_list) value = malloc(length * sizeof(unsigned char)) for i from 0 <= i < length: item = python_list[i] if type(item) != str or len(item) != 1: raise TypeError value[i] = ord(item) return dbus_message_iter_append_byte_array(self.iter, value, length) def append_int32_array(self, python_list): cdef dbus_int32_t *value cdef int length cdef int i length = len(python_list) value = malloc(length * sizeof(dbus_int32_t)) for i from 0 <= i < length: item = python_list[i] if type(item) != int: raise TypeError value[i] = item return dbus_message_iter_append_int32_array(self.iter, value, length) def append_int64_array(self, python_list): cdef dbus_int64_t *value cdef int length cdef int i length = len(python_list) value = malloc(length * sizeof(dbus_int64_t)) for i from 0 <= i < length: item = python_list[i] if type(item) != int: raise TypeError value[i] = item return dbus_message_iter_append_int64_array(self.iter, value, length) def append_double_array(self, python_list): cdef double *value cdef int length cdef int i length = len(python_list) value = malloc(length * sizeof(double)) for i from 0 <= i < length: item = python_list[i] if type(item) != float: raise TypeError value[i] = item return dbus_message_iter_append_double_array(self.iter, value, length) def append_object_path_array(self, list): cdef char **value cdef int length cdef int i length = len(list) value = malloc(length * sizeof(char *)) for i from 0 <= i < length: item = list[i] if not isinstance(item, ObjectPath): raise TypeError value[i] = item return dbus_message_iter_append_object_path_array(self.iter, value, length) def append_string_array(self, python_list): cdef char **value cdef int length cdef dbus_bool_t return_code cdef int i length = len(python_list) value = malloc(length * sizeof(char *)) for i from 0 <= i < length: item = python_list[i] if type(item) != str: raise TypeError value[i] = item return dbus_message_iter_append_string_array(self.iter, value, length) (MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5) (TYPE_INVALID, TYPE_NIL, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_CUSTOM, TYPE_ARRAY, TYPE_DICT, TYPE_OBJECT_PATH) = (0, ord('v'), ord('y'), ord('b'), ord('i'), ord('u'), ord('x'), ord('t'), ord('d'), ord('s'), ord('c'), ord('a'), ord('m'), ord('o')) (HANDLER_RESULT_HANDLED, HANDLER_RESULT_NOT_YET_HANDLED, HANDLER_RESULT_NEED_MEMORY) = range(3) cdef class Message: cdef DBusMessage *msg def __init__(self, message_type=MESSAGE_TYPE_INVALID, service=None, path=None, interface=None, method=None, Message method_call=None, name=None, Message reply_to=None, error_name=None, error_message=None, _create=1): cdef char *cservice cdef DBusMessage *cmsg if (service == None): cservice = NULL else: cservice = service if not _create: return if message_type == MESSAGE_TYPE_METHOD_CALL: self.msg = dbus_message_new_method_call(cservice, path, interface, method) elif message_type == MESSAGE_TYPE_METHOD_RETURN: cmsg = method_call._get_msg() self.msg = dbus_message_new_method_return(cmsg) elif message_type == MESSAGE_TYPE_SIGNAL: self.msg = dbus_message_new_signal(path, interface, name) elif message_type == MESSAGE_TYPE_ERROR: cmsg = reply_to._get_msg() self.msg = dbus_message_new_error(cmsg, error_name, error_message) def type_to_name(self, type): if type == MESSAGE_TYPE_SIGNAL: return "signal" elif type == MESSAGE_TYPE_METHOD_CALL: return "method call" elif type == MESSAGE_TYPE_METHOD_RETURN: return "method return" elif type == MESSAGE_TYPE_ERROR: return "error" else: return "(unknown message type)" def __str__(self): message_type = self.get_type() sender = self.get_sender() if sender == None: sender = "(no sender)" if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL): retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type), self.get_interface(), self.get_member(), sender) elif message_type == MESSAGE_TYPE_METHOD_RETURN: retval = '%s sender=%s' % (self.type_to_name(message_type), sender) elif message_type == MESSAGE_TYPE_ERROR: retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type), self.get_error_name(), sender) else: retval = "Message of unknown type %d" % (message_type) # FIXME: should really use self.convert_to_tuple() here iter = self.get_iter() value_at_iter = True while (value_at_iter): type = iter.get_arg_type() if type == TYPE_INVALID: break elif type == TYPE_NIL: arg = 'nil:None\n' elif type == TYPE_STRING: str = iter.get_string() arg = 'string:%s\n' % (str) elif type == TYPE_OBJECT_PATH: path = iter.get_object_path() arg = 'object_path:%s\n' % (path) elif type == TYPE_INT32: num = iter.get_int32() arg = 'int32:%d\n' % (num) elif type == TYPE_UINT32: num = iter.get_uint32() arg = 'uint32:%u\n' % (num) elif type == TYPE_INT64: num = iter.get_int64() arg = 'int64:%d\n' % (num) elif type == TYPE_UINT64: num = iter.get_uint64() arg = 'uint64:%u\n' % (num) elif type == TYPE_DOUBLE: num = iter.get_double() arg = 'double:%f\n' % (num) elif type == TYPE_BYTE: num = iter.get_byte() arg = 'byte:%d\n' % (num) elif type == TYPE_BOOLEAN: bool = iter.get_boolean() if (bool): str = "true" else: str = "false" arg = 'boolean:%s\n' % (str) else: arg = '(unknown arg type %d)\n' % type retval = retval + arg value_at_iter = iter.next() return retval cdef _set_msg(self, DBusMessage *msg): self.msg = msg cdef DBusMessage *_get_msg(self): return self.msg def get_iter(self): cdef DBusMessageIter iter cdef MessageIter message_iter cdef DBusMessage *msg msg = self._get_msg() dbus_message_iter_init(msg, &iter) message_iter = MessageIter() message_iter.__cinit__(&iter) return message_iter def get_args_list(self): retval = [ ] iter = self.get_iter() try: retval.append(iter.get()) except TypeError, e: return [ ] value_at_iter = iter.next() while (value_at_iter): retval.append(iter.get()) value_at_iter = iter.next() return retval # FIXME: implement dbus_message_copy? def get_type(self): return dbus_message_get_type(self.msg) def set_path(self, object_path): return dbus_message_set_path(self.msg, object_path) def get_path(self): return dbus_message_get_path(self.msg) def set_interface(self, interface): return dbus_message_set_interface(self.msg, interface) def get_interface(self): return dbus_message_get_interface(self.msg) def set_member(self, member): return dbus_message_set_member(self.msg, member) def get_member(self): return dbus_message_get_member(self.msg) def set_error_name(self, name): return dbus_message_set_error_name(self.msg, name) def get_error_name(self): return dbus_message_get_error_name(self.msg) def set_destination(self, destination): return dbus_message_set_destination(self.msg, destination) def get_destination(self): return dbus_message_get_destination(self.msg) def set_sender(self, sender): return dbus_message_set_sender(self.msg, sender) def get_sender(self): cdef char *sender sender = dbus_message_get_sender(self.msg) if (sender == NULL): return None else: return sender def set_no_reply(self, no_reply): dbus_message_set_no_reply(self.msg, no_reply) def get_no_reply(self): return dbus_message_get_no_reply(self.msg) def is_method_call(self, interface, method): return dbus_message_is_method_call(self.msg, interface, method) def is_signal(self, interface, signal_name): return dbus_message_is_signal(self.msg, interface, signal_name) def is_error(self, error_name): return dbus_message_is_error(self.msg, error_name) def has_destination(self, service): return dbus_message_has_destination(self.msg, service) def has_sender(self, service): return dbus_message_has_sender(self.msg, service) def get_serial(self): return dbus_message_get_serial(self.msg) def set_reply_serial(self, reply_serial): return dbus_message_set_reply_serial(self.msg, reply_serial) def get_reply_serial(self): return dbus_message_get_reply_serial(self.msg) #FIXME: dbus_message_get_path_decomposed # FIXME: all the different dbus_message_*args* methods class Signal(Message): def __init__(self, spath, sinterface, sname): Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, interface=sinterface, name=sname) class MethodCall(Message): def __init__(self, mpath, minterface, mmethod): Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, interface=minterface, method=mmethod) class MethodReturn(Message): def __init__(self, method_call): Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call) class Error(Message): def __init__(self, reply_to, error_name, error_message): Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message) cdef class Server: cdef DBusServer *server def __init__(self, address): cdef DBusError error dbus_error_init(&error) self.server = dbus_server_listen(address, &error) if dbus_error_is_set(&error): raise DBusException, error.message def setup_with_g_main (self): dbus_server_setup_with_g_main(self.server, NULL) def disconnect(self): dbus_server_disconnect(self.server) def get_is_connected(self): return dbus_server_get_is_connected(self.server) # def set_new_connection_function(self, function, data): # dbus_server_set_new_connection_function(self.conn, function, # data, NULL) # def set_watch_functions(self, add_function, remove_function, data): # dbus_server_set_watch_functions(self.server, # add_function, remove_function, # data, NULL) # def set_timeout_functions(self, add_function, remove_function, data): # dbus_server_set_timeout_functions(self.server, # add_function, remove_function, # data, NULL) # def handle_watch(self, watch, condition): # dbus_server_handle_watch(self.conn, watch, condition) BUS_SESSION = DBUS_BUS_SESSION BUS_SYSTEM = DBUS_BUS_SYSTEM BUS_ACTIVATION = DBUS_BUS_ACTIVATION def bus_get (bus_type): cdef DBusError error cdef Connection conn dbus_error_init(&error) cdef DBusConnection *connection connection = dbus_bus_get(bus_type, &error) if dbus_error_is_set(&error): raise DBusException, error.message conn = Connection() conn.__cinit__(None, connection) return conn def bus_get_base_service(Connection connection): cdef DBusConnection *conn conn = connection._get_conn() return dbus_bus_get_base_service(conn) def bus_get_unix_user(Connection connection, service_name): cdef DBusError error dbus_error_init(&error) cdef int retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_get_unix_user(conn, service_name, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval #These are defines, not enums so they aren't auto generated ACTIVATION_REPLY_ACTIVATED = 0 ACTIVATION_REPLY_ALREADY_ACTIVE = 1 def bus_activate_service(Connection connection, service_name, flags=0): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef dbus_uint32_t results cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_activate_service(conn, service_name, flags, &results, &error) return (retval, results) def bus_register(Connection connection): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_register(conn, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1 SERVICE_FLAG_REPLACE_EXISTING = 0x2 def bus_acquire_service(Connection connection, service_name, flags=0): cdef DBusError error dbus_error_init(&error) cdef int retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_acquire_service(conn, service_name, flags, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval def bus_service_exists(Connection connection, service_name): cdef DBusError error dbus_error_init(&error) cdef dbus_bool_t retval cdef DBusConnection *conn conn = connection._get_conn() retval = dbus_bus_service_exists(conn, service_name, &error) if dbus_error_is_set(&error): raise DBusException, error.message return retval def bus_add_match(Connection connection, rule): cdef DBusError error cdef DBusConnection *conn dbus_error_init(&error) conn = connection._get_conn() dbus_bus_add_match (conn, rule, &error) if dbus_error_is_set(&error): raise DBusException, error.message def bus_remove_match(Connection connection, rule): cdef DBusError error cdef DBusConnection *conn dbus_error_init(&error) conn = connection._get_conn() dbus_bus_remove_match (conn, rule, &error) if dbus_error_is_set(&error): raise DBusException, error.message def init_gthreads (): dbus_g_thread_init ()