From 3313f2a7f2e56d6c8e02fc4cf36ba54e33fb8e37 Mon Sep 17 00:00:00 2001 From: Marcel Holtmann Date: Thu, 3 Nov 2005 10:32:32 +0000 Subject: Add the dbus-test script --- hcid/Makefile.am | 4 +- hcid/dbus-test | 283 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+), 1 deletion(-) create mode 100755 hcid/dbus-test diff --git a/hcid/Makefile.am b/hcid/Makefile.am index 856f6bb5..d9e8ddbe 100644 --- a/hcid/Makefile.am +++ b/hcid/Makefile.am @@ -15,6 +15,8 @@ endif sbin_PROGRAMS = hcid +noinst_SCRIPTS = dbus-test + if DBUS dbus_hcid_sources = dbus.h dbus.c dbus_hcid_libs = @DBUS_LIBS@ @@ -42,7 +44,7 @@ AM_YFLAGS = -d CLEANFILES = lexer.c parser.c parser.h -EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c bluez-hcid.conf +EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c dbus-test bluez-hcid.conf MAINTAINERCLEANFILES = Makefile.in diff --git a/hcid/dbus-test b/hcid/dbus-test new file mode 100755 index 00000000..830d3bf7 --- /dev/null +++ b/hcid/dbus-test @@ -0,0 +1,283 @@ +#!/usr/bin/env python + +import dbus +import dbus.glib +import gobject +import sys +import getopt +from signal import * + +mgr_cmds = [ "DeviceList", "DefaultDevice" ] +dev_cmds = [ "Inquiry", "CancelInquiry", "PeriodicInquiry", "CancelPeriodic", + "RemoteName", "Connections", "Authenticate", "RoleSwitch" ] + +class Tester: + exit_events = [] + dev_path = None + need_dev = False + listen = False + at_interrupt = None + + def __init__(self, argv): + self.name = argv[0] + + self.parse_args(argv[1:]) + + try: + self.dbus_setup() + except dbus.DBusException, e: + print 'Failed to do D-BUS setup: %s' % e + sys.exit(1) + + self.dev_setup() + + def parse_args(self, argv): + try: + opts, args = getopt.getopt(argv, "hli:") + except getopt.GetoptError: + self.usage() + sys.exit(1) + + for o, a in opts: + if o == "-h": + self.usage() + sys.exit() + elif o == "-l": + self.need_dev = True + self.listen = True + elif o == "-i": + if a[0] == '/': + self.dev_path = a + else: + self.dev_path = '/org/bluez/Device/%s' % a + + if not (args or self.listen): + self.usage() + sys.exit(1) + + if args: + self.cmd = args[0] + self.cmd_args = args[1:] + + if not self.cmd in mgr_cmds: + self.need_dev = True + + def dev_setup(self): + if self.need_dev and not self.dev_path: + try: + self.dev_path = self.manager.DefaultDevice() + except dbus.DBusException, e: + print 'Failed to get default device: %s' % e + sys.exit(1) + + if self.dev_path: + try: + obj = self.bus.get_object('org.bluez', self.dev_path) + self.dev = dbus.Interface(obj, 'org.bluez.Device') + + self.dev.connect_to_signal('Up', self.dev_up) + self.dev.connect_to_signal('Down', self.dev_down) + + obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path) + self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller') + + self.ctl.connect_to_signal('InquiryStart', self.inquiry_start) + self.ctl.connect_to_signal('InquiryResult', self.inquiry_result) + self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete) + self.ctl.connect_to_signal('RemoteName', self.remote_name) + self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed) + self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete) + + except dbus.DBusException, e: + print 'Failed to setup device path: %s' % e + sys.exit(1) + + def dbus_setup(self): + self.bus = dbus.SystemBus() + manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager') + self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager') + self.manager.connect_to_signal('DeviceAdded', self.device_added) + self.manager.connect_to_signal('DeviceRemoved', self.device_removed) + + def usage(self): + print 'Usage: %s [-i] [-l] [-h] [arg1..]' % self.name + print ' -i Specify device (e.g. "hci0" or "/dev/bluez/Device/hci0")' + print ' -l Listen for events (no command required)' + print ' -h Show this help' + print 'Manager commands:' + for cmd in mgr_cmds: + print '\t%s' % cmd + print 'Device commands:' + for cmd in dev_cmds: + print '\t%s' % cmd + + def device_added(self, path): + print 'DeviceAdded: %s' % path + + def device_removed(self, path): + print 'DeviceRemoved: %s' % path + + def remote_name(self, bda, name): + print 'RemoteName: %s, %s' % (bda, name) + if 'RemoteName' in self.exit_events: + self.main_loop.quit() + + def remote_name_failed(self, bda, status): + print 'RemoteNameFailed: %s, 0x%02X' % (bda, status) + if 'RemoteNameFailed' in self.exit_events: + self.main_loop.quit() + + def inquiry_start(self): + print 'InquiryStart' + + def inquiry_complete(self): + print 'InquiryComplete' + if 'InquiryComplete' in self.exit_events: + self.main_loop.quit() + + def inquiry_result(self, bda, cls, rssi): + print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi) + + def authentication_complete(self, bda, status): + print 'AuthenticationComplete: %s, 0x%02X' % (bda, status) + if 'AuthenticationComplete' in self.exit_events: + self.main_loop.quit() + + def dev_up(self): + print 'Up' + + def dev_down(self): + print 'Down' + + def signal_cb(self, sig, frame): + print 'Caught signal, exiting' + if self.at_interrupt: + self.at_interrupt() + self.main_loop.quit() + + def run(self): + # Manager methods + if self.listen: + print 'Listening for events...' + elif self.cmd == 'DeviceList': + for dev in self.manager.DeviceList(): + print dev + elif self.cmd == 'DefaultDevice': + print self.manager.DefaultDevice() + + # Device methods + elif self.cmd == 'Up': + try: + self.dev.Up() + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + elif self.cmd == 'Down': + try: + self.dev.Down() + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + + # Device.Controller methods + elif self.cmd == 'Inquiry': + if len(self.cmd_args) != 2: + length, maxrsp = (10, 100) + else: + length, maxrsp = self.cmd_args + try: + self.ctl.Inquiry(dbus.Byte(length), dbus.Byte(maxrsp)) + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + self.listen = True + self.exit_events.append('InquiryComplete') + self.at_interrupt = self.ctl.CancelInquiry + + elif self.cmd == 'CancelInquiry': + try: + self.ctl.CancelInquiry() + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + + elif self.cmd == 'RemoteName': + if len(self.cmd_args) < 1: + print 'Bluetooth address needed' + sys.exit(1) + try: + self.ctl.RemoteName(self.cmd_args[0]) + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + self.listen = True + self.exit_events.append('RemoteNameFailed') + self.exit_events.append('RemoteName') + + elif self.cmd == 'PeriodicInquiry': + if len(self.cmd_args) != 3: + length, min, max = (6, 20, 60) + else: + length, min, max = self.cmd_args + self.listen = True + try: + self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.Byte(min), dbus.Byte(max)) + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + + elif self.cmd == 'CancelPeriodic': + try: + self.ctl.CancelPeriodic() + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + + elif self.cmd == 'Authenticate': + if len(self.cmd_args) < 1: + print 'Bluetooth address needed' + sys.exit(1) + try: + self.ctl.Authenticate(self.cmd_args[0]) + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + self.listen = True + self.exit_events.append('AuthenticationComplete') + + elif self.cmd == 'RoleSwitch': + if len(self.cmd_args) < 2: + print 'Bluetooth address and role needed' + exit.exit(1) + bda, role = self.cmd_args + if not (role == '0' or role == '1'): + print 'Role should be 0 (master) or 1 (slave)' + sys.exit(1) + try: + self.ctl.RoleSwitch(bda, dbus.Byte(role)) + except dbus.DBusException, e: + print 'Sending %s failed: %s' % (self.cmd, e) + sys.exit(1) + + elif self.cmd == 'Connections': + connections = self.ctl.Connections() + for conn in connections: + print conn + + else: + print 'Unknown command: %s' % self.cmd + sys.exit(1) + + if self.listen: + signal(SIGINT, self.signal_cb) + signal(SIGTERM, self.signal_cb) + self.main_loop = gobject.MainLoop() + self.main_loop.run() + +if __name__ == '__main__': + gobject.threads_init() + dbus.glib.init_threads() + + tester = Tester(sys.argv) + tester.run() + -- cgit