diff options
| -rw-r--r-- | hcid/Makefile.am | 4 | ||||
| -rwxr-xr-x | hcid/dbus-test | 283 | 
2 files changed, 286 insertions, 1 deletions
| diff --git a/hcid/Makefile.am b/hcid/Makefile.am index 856f6bb5..d9e8ddbe 100644 --- a/hcid/Makefile.am +++ b/hcid/Makefile.am @@ -15,6 +15,8 @@ endif  sbin_PROGRAMS = hcid +noinst_SCRIPTS = dbus-test +  if DBUS  dbus_hcid_sources = dbus.h dbus.c  dbus_hcid_libs    = @DBUS_LIBS@ @@ -42,7 +44,7 @@ AM_YFLAGS = -d  CLEANFILES = lexer.c parser.c parser.h -EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c bluez-hcid.conf +EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c dbus-test bluez-hcid.conf  MAINTAINERCLEANFILES = Makefile.in diff --git a/hcid/dbus-test b/hcid/dbus-test new file mode 100755 index 00000000..830d3bf7 --- /dev/null +++ b/hcid/dbus-test @@ -0,0 +1,283 @@ +#!/usr/bin/env python + +import dbus +import dbus.glib +import gobject +import sys +import getopt +from signal import * + +mgr_cmds = [ "DeviceList", "DefaultDevice" ] +dev_cmds = [ "Inquiry", "CancelInquiry", "PeriodicInquiry", "CancelPeriodic", +             "RemoteName", "Connections", "Authenticate", "RoleSwitch" ] + +class Tester: +    exit_events = [] +    dev_path = None +    need_dev = False +    listen = False +    at_interrupt = None + +    def __init__(self, argv): +        self.name = argv[0] + +        self.parse_args(argv[1:]) + +        try: +            self.dbus_setup() +        except dbus.DBusException, e: +            print 'Failed to do D-BUS setup: %s' % e +            sys.exit(1) + +        self.dev_setup() + +    def parse_args(self, argv): +        try: +            opts, args = getopt.getopt(argv, "hli:") +        except getopt.GetoptError: +            self.usage() +            sys.exit(1) + +        for o, a in opts: +            if o == "-h": +                self.usage() +                sys.exit() +            elif o == "-l": +                self.need_dev = True +                self.listen = True +            elif o == "-i": +                if a[0] == '/': +                    self.dev_path = a +                else: +                    self.dev_path = '/org/bluez/Device/%s' % a + +        if not (args or self.listen): +            self.usage() +            sys.exit(1) + +        if args: +            self.cmd = args[0] +            self.cmd_args = args[1:] + +            if not self.cmd in mgr_cmds: +                self.need_dev = True + +    def dev_setup(self): +        if self.need_dev and not self.dev_path: +            try: +                self.dev_path = self.manager.DefaultDevice() +            except dbus.DBusException, e: +                print 'Failed to get default device: %s' % e +                sys.exit(1) + +        if self.dev_path: +            try: +                obj = self.bus.get_object('org.bluez', self.dev_path) +                self.dev = dbus.Interface(obj, 'org.bluez.Device') + +                self.dev.connect_to_signal('Up', self.dev_up) +                self.dev.connect_to_signal('Down', self.dev_down) + +                obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path) +                self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller') + +                self.ctl.connect_to_signal('InquiryStart', self.inquiry_start) +                self.ctl.connect_to_signal('InquiryResult', self.inquiry_result) +                self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete) +                self.ctl.connect_to_signal('RemoteName', self.remote_name) +                self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed) +                self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete) + +            except dbus.DBusException, e: +                print 'Failed to setup device path: %s' % e +                sys.exit(1) + +    def dbus_setup(self): +        self.bus = dbus.SystemBus() +        manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager') +        self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager') +        self.manager.connect_to_signal('DeviceAdded', self.device_added) +        self.manager.connect_to_signal('DeviceRemoved', self.device_removed) + +    def usage(self): +        print 'Usage: %s [-i] [-l] [-h] <cmd> [arg1..]' % self.name +        print '  -i <dev>   Specify device (e.g. "hci0" or "/dev/bluez/Device/hci0")' +        print '  -l         Listen for events (no command required)' +        print '  -h         Show this help' +        print 'Manager commands:' +        for cmd in mgr_cmds: +            print '\t%s' % cmd +        print 'Device commands:' +        for cmd in dev_cmds: +            print '\t%s' % cmd + +    def device_added(self, path): +        print 'DeviceAdded: %s' % path + +    def device_removed(self, path): +        print 'DeviceRemoved: %s' % path + +    def remote_name(self, bda, name): +        print 'RemoteName: %s, %s' % (bda, name) +        if 'RemoteName' in self.exit_events: +            self.main_loop.quit() + +    def remote_name_failed(self, bda, status): +        print 'RemoteNameFailed: %s, 0x%02X' % (bda, status) +        if 'RemoteNameFailed' in self.exit_events: +            self.main_loop.quit() + +    def inquiry_start(self): +        print 'InquiryStart' + +    def inquiry_complete(self): +        print 'InquiryComplete' +        if 'InquiryComplete' in self.exit_events: +            self.main_loop.quit() + +    def inquiry_result(self, bda, cls, rssi): +        print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi) + +    def authentication_complete(self, bda, status): +        print 'AuthenticationComplete: %s, 0x%02X' % (bda, status) +        if 'AuthenticationComplete' in self.exit_events: +            self.main_loop.quit() + +    def dev_up(self): +        print 'Up' + +    def dev_down(self): +        print 'Down' + +    def signal_cb(self, sig, frame): +        print 'Caught signal, exiting' +        if self.at_interrupt: +            self.at_interrupt() +        self.main_loop.quit() + +    def run(self): +        # Manager methods +        if self.listen: +            print 'Listening for events...' +        elif self.cmd == 'DeviceList': +            for dev in self.manager.DeviceList(): +                print dev +        elif self.cmd == 'DefaultDevice': +            print self.manager.DefaultDevice() + +        # Device methods +        elif self.cmd == 'Up': +            try: +                self.dev.Up() +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) +        elif self.cmd == 'Down': +            try: +                self.dev.Down() +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) + +        # Device.Controller methods +        elif self.cmd == 'Inquiry': +            if len(self.cmd_args) != 2: +                length, maxrsp = (10, 100) +            else: +                length, maxrsp = self.cmd_args +            try: +                self.ctl.Inquiry(dbus.Byte(length), dbus.Byte(maxrsp)) +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) +            self.listen = True +            self.exit_events.append('InquiryComplete') +            self.at_interrupt = self.ctl.CancelInquiry + +        elif self.cmd == 'CancelInquiry': +            try: +                self.ctl.CancelInquiry() +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) + +        elif self.cmd == 'RemoteName': +            if len(self.cmd_args) < 1: +                print 'Bluetooth address needed' +                sys.exit(1) +            try: +                self.ctl.RemoteName(self.cmd_args[0]) +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) +            self.listen = True +            self.exit_events.append('RemoteNameFailed') +            self.exit_events.append('RemoteName') + +        elif self.cmd == 'PeriodicInquiry': +            if len(self.cmd_args) != 3: +                length, min, max = (6, 20, 60) +            else: +                length, min, max = self.cmd_args +            self.listen = True +            try: +                self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.Byte(min), dbus.Byte(max)) +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) + +        elif self.cmd == 'CancelPeriodic': +            try: +                self.ctl.CancelPeriodic() +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) + +        elif self.cmd == 'Authenticate': +            if len(self.cmd_args) < 1: +                print 'Bluetooth address needed' +                sys.exit(1) +            try: +                self.ctl.Authenticate(self.cmd_args[0]) +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) +            self.listen = True +            self.exit_events.append('AuthenticationComplete') + +        elif self.cmd == 'RoleSwitch': +            if len(self.cmd_args) < 2: +                print 'Bluetooth address and role needed' +                exit.exit(1) +            bda, role = self.cmd_args +            if not (role == '0' or role == '1'): +                print 'Role should be 0 (master) or 1 (slave)' +                sys.exit(1) +            try: +                self.ctl.RoleSwitch(bda, dbus.Byte(role)) +            except dbus.DBusException, e: +                print 'Sending %s failed: %s' % (self.cmd, e) +                sys.exit(1) + +        elif self.cmd == 'Connections': +            connections = self.ctl.Connections() +            for conn in connections: +                print conn + +        else: +            print 'Unknown command: %s' % self.cmd +            sys.exit(1) + +        if self.listen: +            signal(SIGINT, self.signal_cb) +            signal(SIGTERM, self.signal_cb) +            self.main_loop = gobject.MainLoop() +            self.main_loop.run() + +if __name__ == '__main__': +    gobject.threads_init() +    dbus.glib.init_threads() + +    tester = Tester(sys.argv) +    tester.run() + | 
