summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hcid/Makefile.am4
-rwxr-xr-xhcid/dbus-test283
2 files changed, 286 insertions, 1 deletions
diff --git a/hcid/Makefile.am b/hcid/Makefile.am
index 856f6bb5..d9e8ddbe 100644
--- a/hcid/Makefile.am
+++ b/hcid/Makefile.am
@@ -15,6 +15,8 @@ endif
sbin_PROGRAMS = hcid
+noinst_SCRIPTS = dbus-test
+
if DBUS
dbus_hcid_sources = dbus.h dbus.c
dbus_hcid_libs = @DBUS_LIBS@
@@ -42,7 +44,7 @@ AM_YFLAGS = -d
CLEANFILES = lexer.c parser.c parser.h
-EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c bluez-hcid.conf
+EXTRA_DIST = $(man_MANS) $(conf_DATA) dbus.h dbus.c dbus-test bluez-hcid.conf
MAINTAINERCLEANFILES = Makefile.in
diff --git a/hcid/dbus-test b/hcid/dbus-test
new file mode 100755
index 00000000..830d3bf7
--- /dev/null
+++ b/hcid/dbus-test
@@ -0,0 +1,283 @@
+#!/usr/bin/env python
+
+import dbus
+import dbus.glib
+import gobject
+import sys
+import getopt
+from signal import *
+
+mgr_cmds = [ "DeviceList", "DefaultDevice" ]
+dev_cmds = [ "Inquiry", "CancelInquiry", "PeriodicInquiry", "CancelPeriodic",
+ "RemoteName", "Connections", "Authenticate", "RoleSwitch" ]
+
+class Tester:
+ exit_events = []
+ dev_path = None
+ need_dev = False
+ listen = False
+ at_interrupt = None
+
+ def __init__(self, argv):
+ self.name = argv[0]
+
+ self.parse_args(argv[1:])
+
+ try:
+ self.dbus_setup()
+ except dbus.DBusException, e:
+ print 'Failed to do D-BUS setup: %s' % e
+ sys.exit(1)
+
+ self.dev_setup()
+
+ def parse_args(self, argv):
+ try:
+ opts, args = getopt.getopt(argv, "hli:")
+ except getopt.GetoptError:
+ self.usage()
+ sys.exit(1)
+
+ for o, a in opts:
+ if o == "-h":
+ self.usage()
+ sys.exit()
+ elif o == "-l":
+ self.need_dev = True
+ self.listen = True
+ elif o == "-i":
+ if a[0] == '/':
+ self.dev_path = a
+ else:
+ self.dev_path = '/org/bluez/Device/%s' % a
+
+ if not (args or self.listen):
+ self.usage()
+ sys.exit(1)
+
+ if args:
+ self.cmd = args[0]
+ self.cmd_args = args[1:]
+
+ if not self.cmd in mgr_cmds:
+ self.need_dev = True
+
+ def dev_setup(self):
+ if self.need_dev and not self.dev_path:
+ try:
+ self.dev_path = self.manager.DefaultDevice()
+ except dbus.DBusException, e:
+ print 'Failed to get default device: %s' % e
+ sys.exit(1)
+
+ if self.dev_path:
+ try:
+ obj = self.bus.get_object('org.bluez', self.dev_path)
+ self.dev = dbus.Interface(obj, 'org.bluez.Device')
+
+ self.dev.connect_to_signal('Up', self.dev_up)
+ self.dev.connect_to_signal('Down', self.dev_down)
+
+ obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path)
+ self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller')
+
+ self.ctl.connect_to_signal('InquiryStart', self.inquiry_start)
+ self.ctl.connect_to_signal('InquiryResult', self.inquiry_result)
+ self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete)
+ self.ctl.connect_to_signal('RemoteName', self.remote_name)
+ self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed)
+ self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete)
+
+ except dbus.DBusException, e:
+ print 'Failed to setup device path: %s' % e
+ sys.exit(1)
+
+ def dbus_setup(self):
+ self.bus = dbus.SystemBus()
+ manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager')
+ self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager')
+ self.manager.connect_to_signal('DeviceAdded', self.device_added)
+ self.manager.connect_to_signal('DeviceRemoved', self.device_removed)
+
+ def usage(self):
+ print 'Usage: %s [-i] [-l] [-h] <cmd> [arg1..]' % self.name
+ print ' -i <dev> Specify device (e.g. "hci0" or "/dev/bluez/Device/hci0")'
+ print ' -l Listen for events (no command required)'
+ print ' -h Show this help'
+ print 'Manager commands:'
+ for cmd in mgr_cmds:
+ print '\t%s' % cmd
+ print 'Device commands:'
+ for cmd in dev_cmds:
+ print '\t%s' % cmd
+
+ def device_added(self, path):
+ print 'DeviceAdded: %s' % path
+
+ def device_removed(self, path):
+ print 'DeviceRemoved: %s' % path
+
+ def remote_name(self, bda, name):
+ print 'RemoteName: %s, %s' % (bda, name)
+ if 'RemoteName' in self.exit_events:
+ self.main_loop.quit()
+
+ def remote_name_failed(self, bda, status):
+ print 'RemoteNameFailed: %s, 0x%02X' % (bda, status)
+ if 'RemoteNameFailed' in self.exit_events:
+ self.main_loop.quit()
+
+ def inquiry_start(self):
+ print 'InquiryStart'
+
+ def inquiry_complete(self):
+ print 'InquiryComplete'
+ if 'InquiryComplete' in self.exit_events:
+ self.main_loop.quit()
+
+ def inquiry_result(self, bda, cls, rssi):
+ print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi)
+
+ def authentication_complete(self, bda, status):
+ print 'AuthenticationComplete: %s, 0x%02X' % (bda, status)
+ if 'AuthenticationComplete' in self.exit_events:
+ self.main_loop.quit()
+
+ def dev_up(self):
+ print 'Up'
+
+ def dev_down(self):
+ print 'Down'
+
+ def signal_cb(self, sig, frame):
+ print 'Caught signal, exiting'
+ if self.at_interrupt:
+ self.at_interrupt()
+ self.main_loop.quit()
+
+ def run(self):
+ # Manager methods
+ if self.listen:
+ print 'Listening for events...'
+ elif self.cmd == 'DeviceList':
+ for dev in self.manager.DeviceList():
+ print dev
+ elif self.cmd == 'DefaultDevice':
+ print self.manager.DefaultDevice()
+
+ # Device methods
+ elif self.cmd == 'Up':
+ try:
+ self.dev.Up()
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+ elif self.cmd == 'Down':
+ try:
+ self.dev.Down()
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+
+ # Device.Controller methods
+ elif self.cmd == 'Inquiry':
+ if len(self.cmd_args) != 2:
+ length, maxrsp = (10, 100)
+ else:
+ length, maxrsp = self.cmd_args
+ try:
+ self.ctl.Inquiry(dbus.Byte(length), dbus.Byte(maxrsp))
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+ self.listen = True
+ self.exit_events.append('InquiryComplete')
+ self.at_interrupt = self.ctl.CancelInquiry
+
+ elif self.cmd == 'CancelInquiry':
+ try:
+ self.ctl.CancelInquiry()
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+
+ elif self.cmd == 'RemoteName':
+ if len(self.cmd_args) < 1:
+ print 'Bluetooth address needed'
+ sys.exit(1)
+ try:
+ self.ctl.RemoteName(self.cmd_args[0])
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+ self.listen = True
+ self.exit_events.append('RemoteNameFailed')
+ self.exit_events.append('RemoteName')
+
+ elif self.cmd == 'PeriodicInquiry':
+ if len(self.cmd_args) != 3:
+ length, min, max = (6, 20, 60)
+ else:
+ length, min, max = self.cmd_args
+ self.listen = True
+ try:
+ self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.Byte(min), dbus.Byte(max))
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+
+ elif self.cmd == 'CancelPeriodic':
+ try:
+ self.ctl.CancelPeriodic()
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+
+ elif self.cmd == 'Authenticate':
+ if len(self.cmd_args) < 1:
+ print 'Bluetooth address needed'
+ sys.exit(1)
+ try:
+ self.ctl.Authenticate(self.cmd_args[0])
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+ self.listen = True
+ self.exit_events.append('AuthenticationComplete')
+
+ elif self.cmd == 'RoleSwitch':
+ if len(self.cmd_args) < 2:
+ print 'Bluetooth address and role needed'
+ exit.exit(1)
+ bda, role = self.cmd_args
+ if not (role == '0' or role == '1'):
+ print 'Role should be 0 (master) or 1 (slave)'
+ sys.exit(1)
+ try:
+ self.ctl.RoleSwitch(bda, dbus.Byte(role))
+ except dbus.DBusException, e:
+ print 'Sending %s failed: %s' % (self.cmd, e)
+ sys.exit(1)
+
+ elif self.cmd == 'Connections':
+ connections = self.ctl.Connections()
+ for conn in connections:
+ print conn
+
+ else:
+ print 'Unknown command: %s' % self.cmd
+ sys.exit(1)
+
+ if self.listen:
+ signal(SIGINT, self.signal_cb)
+ signal(SIGTERM, self.signal_cb)
+ self.main_loop = gobject.MainLoop()
+ self.main_loop.run()
+
+if __name__ == '__main__':
+ gobject.threads_init()
+ dbus.glib.init_threads()
+
+ tester = Tester(sys.argv)
+ tester.run()
+