diff options
| -rw-r--r-- | ChangeLog | 20 | ||||
| -rw-r--r-- | python/decorators.py | 17 | ||||
| -rw-r--r-- | python/introspect_parser.py | 27 | ||||
| -rw-r--r-- | python/service.py | 4 | ||||
| -rwxr-xr-x | test/python/test-client.py | 46 | ||||
| -rwxr-xr-x | test/python/test-service.py | 34 | 
6 files changed, 113 insertions, 35 deletions
@@ -1,5 +1,25 @@  2005-11-07  Robert McQueen  <robot101@debian.org> +	* python/decorators.py: Change emit_signal function to use the +	signature annotation of the signal when marhsalling the arguments from +	the service. Fix a bug where the code checking signature length +	against argument length referenced the wrong variable. + +	* python/introspect_parser.py: Avoid adding the type signature of +	signal arguments to any methods which occur after them in the +	introspection data (!) by making the parser a little more careful +	about its current state. + +	* python/service.py: Remove debug prints from last commit (again :D). + +	* test/python/test-client.py, test/python/test-service.py: Add test +	signals with signature decorators to test the strict marshalling code +	gives errors at the right time. Could do with checking the signals +	actually get emitted too, given that the test does nothing with +	signals at the moment... + +2005-11-07  Robert McQueen  <robot101@debian.org> +  	* python/_dbus.py: Add WeakReferenceDictionary cache of dbus.Bus  	instances to stop madness of creating new instances representing  	the same bus connection all the time, rendering any tracking of diff --git a/python/decorators.py b/python/decorators.py index 3973f3e4..e4cc0249 100644 --- a/python/decorators.py +++ b/python/decorators.py @@ -39,20 +39,25 @@ def signal(dbus_interface, signature=None):      _util._validate_interface_or_name(dbus_interface)      def decorator(func):          def emit_signal(self, *args, **keywords): -            func(self, *args, **keywords)            +            func(self, *args, **keywords)              message = dbus_bindings.Signal(self._object_path, dbus_interface, func.__name__)              iter = message.get_iter(True) -            -            for arg in args: -                iter.append(arg) -       + +            if emit_signal._dbus_signature: +                signature = tuple(dbus_bindings.Signature(emit_signal._dbus_signature)) +                for (arg, sig) in zip(args, signature): +                    iter.append_strict(arg, sig) +            else: +                for arg in args: +                    iter.append(arg) +              self._connection.send(message)          args = inspect.getargspec(func)[0]          args.pop(0)          if signature: -            sig = tuple(dbus_bindings.Signature(func._dbus_signature)) +            sig = tuple(dbus_bindings.Signature(signature))              if len(sig) > len(args):                  raise ValueError, 'signal signature is longer than the number of arguments provided' diff --git a/python/introspect_parser.py b/python/introspect_parser.py index 40cae1e7..47c9806e 100644 --- a/python/introspect_parser.py +++ b/python/introspect_parser.py @@ -13,35 +13,36 @@ def process_introspection_data(data):      reader = input_source.newTextReader("urn:introspect")      ret = reader.Read() -    current_iface='' -    current_method='' +    current_iface = None +    current_method = None      current_sigstr = '' -     +      while ret == 1:          name = reader.LocalName()          if reader.NodeType() == XMLREADER_START_ELEMENT_NODE_TYPE: -            if name == 'interface': +            if (not current_iface and not current_method and name == 'interface'):                  current_iface = reader.GetAttribute('name') -            elif name == 'method': +            elif (current_iface and not current_method and name == 'method'):                  current_method = reader.GetAttribute('name')                  if reader.IsEmptyElement(): -                    method_map[current_iface + '.' + current_method] = ''  -                    current_method = '' +                    method_map[current_iface + '.' + current_method] = '' +                    current_method = None                      current_sigstr = '' -                     -            elif name == 'arg': + +            elif (current_iface and current_method and name == 'arg'):                  direction = reader.GetAttribute('direction')                  if not direction or direction == 'in':                      current_sigstr = current_sigstr + reader.GetAttribute('type')          elif reader.NodeType() == XMLREADER_END_ELEMENT_NODE_TYPE: -            if name == 'method': -                method_map[current_iface + '.' + current_method] = current_sigstr  -                current_method = '' +            if (current_iface and not current_method and name == 'interface'): +                current_iface = None +            if (current_iface and current_method and name == 'method'): +                method_map[current_iface + '.' + current_method] = current_sigstr +                current_method = None                  current_sigstr = '' -                   ret = reader.Read()      if ret != 0: diff --git a/python/service.py b/python/service.py index d1973b04..3eec65f6 100644 --- a/python/service.py +++ b/python/service.py @@ -20,21 +20,19 @@ class BusName(object):          # otherwise register the name          retval = dbus_bindings.bus_request_name(bus.get_connection(), name) -        print retval +          # TODO: more intelligent tracking of bus name states?          if retval == dbus_bindings.REQUEST_NAME_REPLY_PRIMARY_OWNER:              pass          elif retval == dbus_bindings.REQUEST_NAME_REPLY_IN_QUEUE:              # you can't arrive at this state via the high-level bindings              # because you can't put flags in, but... who knows? -            print "joined queue for %s" % name              pass          elif retval == dbus_bindings.REQUEST_NAME_REPLY_EXISTS:              raise dbus_bindings.DBusException('requested name %s already exists' % name)          elif retval == dbus_bindings.REQUEST_NAME_REPLY_ALREADY_OWNER:              # if this is a shared bus which is being used by someone              # else in this process, this can happen legitimately -            print "already owner of %s" % name              pass          else:              raise dbus_bindings.DBusException('requesting name %s returned unexpected value %s' % (name, retval)) diff --git a/test/python/test-client.py b/test/python/test-client.py index 26ce375c..e207d491 100755 --- a/test/python/test-client.py +++ b/test/python/test-client.py @@ -110,8 +110,8 @@ class TestDBusBindings(unittest.TestCase):          main_loop.run() -    def testReturnMarshalling(self): -        print "\n********* Testing return marshalling ***********" +    def testStrictMarshalling(self): +        print "\n********* Testing strict return & signal marshalling ***********"          # these values are the same as in the server, and the          # methods should only succeed when they are called with @@ -120,33 +120,51 @@ class TestDBusBindings(unittest.TestCase):          # with a different number          values = ["", ("",""), ("","",""), [], {}, ["",""], ["","",""]]          methods = [ -                    (self.iface.ReturnOneString, set([0]), set([0])), -                    (self.iface.ReturnTwoStrings, set([1, 5]), set([5])), -                    (self.iface.ReturnStruct, set([1, 5]), set([1])), +                    (self.iface.ReturnOneString, 'SignalOneString', set([0]), set([0])), +                    (self.iface.ReturnTwoStrings, 'SignalTwoStrings', set([1, 5]), set([5])), +                    (self.iface.ReturnStruct, 'SignalStruct', set([1, 5]), set([1])),                      # all of our test values are sequences so will marshall correctly into an array :P -                    (self.iface.ReturnArray, set(range(len(values))), set([3, 5, 6])), -                    (self.iface.ReturnDict, set([0, 3, 4]), set([4])) +                    (self.iface.ReturnArray, 'SignalArray', set(range(len(values))), set([3, 5, 6])), +                    (self.iface.ReturnDict, 'SignalDict', set([0, 3, 4]), set([4]))                  ] -        for (method, success_values, return_values) in methods: +        for (method, signal, success_values, return_values) in methods:              print "\nTrying correct behaviour of", method._method_name              for value in range(len(values)):                  try:                      ret = method(value)                  except Exception, e: -                    print "%s(%s) raised %s" % (method._method_name, repr(values[value]), e.__class__) +                    print "%s(%r) raised %s" % (method._method_name, values[value], e.__class__)                      # should fail if it tried to marshal the wrong type -                    self.assert_(value not in success_values, "%s should succeed when we ask it to return %s\n%s" % (method._method_name, repr(values[value]), e)) +                    self.assert_(value not in success_values, "%s should succeed when we ask it to return %r\n%s\n%s" % (method._method_name, values[value], e.__class__, e))                  else: -                    print "%s(%s) returned %s" % (method._method_name, repr(values[value]), repr(ret)) +                    print "%s(%r) returned %r" % (method._method_name, values[value], ret)                      # should only succeed if it's the right return type -                    self.assert_(value in success_values, "%s should fail when we ask it to return %s" % (method._method_name, repr(values[value]))) +                    self.assert_(value in success_values, "%s should fail when we ask it to return %r" % (method._method_name, values[value]))                      # check the value is right too :D                      returns = map(lambda n: values[n], return_values) -                    self.assert_(ret in returns, "%s should return one of %s" % (method._method_name, repr(returns))) +                    self.assert_(ret in returns, "%s should return one of %r" % (method._method_name, returns)) + +            print "\nTrying correct emission of", signal +            for value in range(len(values)): +                try: +                    self.iface.EmitSignal(signal, value) +                except Exception, e: +                    print "EmitSignal(%s, %r) raised %s" % (signal, values[value], e.__class__) + +                    # should fail if it tried to marshal the wrong type +                    self.assert_(value not in success_values, "EmitSignal(%s) should succeed when we ask it to return %r\n%s\n%s" % (signal, values[value], e.__class__, e)) +                else: +                    print "EmitSignal(%s, %r) appeared to succeed" % (signal, values[value]) + +                    # should only succeed if it's the right return type +                    self.assert_(value in success_values, "EmitSignal(%s) should fail when we ask it to return %r" % (signal, values[value])) + +                    # FIXME: wait for the signal here +          print      def testInheritance(self): @@ -229,6 +247,8 @@ class TestDBusBindings(unittest.TestCase):                  else:                      names[name] = busname +            print +  class TestDBusPythonToGLibBindings(unittest.TestCase):      def setUp(self):          self.bus = dbus.SessionBus() diff --git a/test/python/test-service.py b/test/python/test-service.py index 820514ae..d5488c44 100755 --- a/test/python/test-service.py +++ b/test/python/test-service.py @@ -77,6 +77,40 @@ class TestObject(dbus.service.Object, TestInterface):      def ReturnDict(self, test):          return self.returnValue(test) +    @dbus.service.signal("org.freedesktop.DBus.TestSuiteInterface", signature='s') +    def SignalOneString(self, test): +        pass + +    @dbus.service.signal("org.freedesktop.DBus.TestSuiteInterface", signature='ss') +    def SignalTwoStrings(self, test, test2): +        pass + +    @dbus.service.signal("org.freedesktop.DBus.TestSuiteInterface", signature='(ss)') +    def SignalStruct(self, test): +        pass + +    @dbus.service.signal("org.freedesktop.DBus.TestSuiteInterface", signature='as') +    def SignalArray(self, test): +        pass + +    @dbus.service.signal("org.freedesktop.DBus.TestSuiteInterface", signature='a{ss}') +    def SignalDict(self, test): +        pass + +    @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='su', out_signature='') +    def EmitSignal(self, signal, value): +        sig = getattr(self, signal, None) +        assert(sig != None) + +        val = self.returnValue(value) +        # make two string case work by passing arguments in by tuple +        if (signal == 'SignalTwoStrings' and (value == 1 or value == 5)): +            val = tuple(val) +        else: +            val = tuple([val]) + +        sig(*val) +      def CheckInheritance(self):          return True  | 
