#!/usr/bin/env python import dbus import dbus.decorators import dbus.glib import gobject import sys import getopt from signal import * mgr_cmds = [ "DeviceList", "DefaultDevice" ] dev_cmds = [ "Up", "Down", "SetProperty", "GetProperty", "Inquiry", "CancelInquiry", "PeriodicInquiry","CancelPeriodic", "RemoteName", "Connections", "Authenticate", "RoleSwitch" ] dev_setprop_bool = [ "auth", "encrypt", "discoverable", "connectable" ] dev_setprop_byte = [ "incmode" ] class Tester: exit_events = [] dev_path = None need_dev = False listen = False at_interrupt = None def __init__(self, argv): self.name = argv[0] self.parse_args(argv[1:]) try: self.dbus_setup() except dbus.DBusException, e: print 'Failed to do D-BUS setup: %s' % e sys.exit(1) self.dev_setup() def parse_args(self, argv): try: opts, args = getopt.getopt(argv, "hli:") except getopt.GetoptError: self.usage() sys.exit(1) for o, a in opts: if o == "-h": self.usage() sys.exit() elif o == "-l": self.need_dev = True self.listen = True elif o == "-i": if a[0] == '/': self.dev_path = a else: self.dev_path = '/org/bluez/Device/%s' % a if not (args or self.listen): self.usage() sys.exit(1) if args: self.cmd = args[0] self.cmd_args = args[1:] if not self.cmd in mgr_cmds: self.need_dev = True def dev_setup(self): if self.need_dev and not self.dev_path: try: self.dev_path = self.manager.DefaultDevice() except dbus.DBusException, e: print 'Failed to get default device: %s' % e sys.exit(1) if self.dev_path: try: obj = self.bus.get_object('org.bluez', self.dev_path) self.dev = dbus.Interface(obj, 'org.bluez.Device') self.dev.connect_to_signal('Up', self.dev_up) self.dev.connect_to_signal('Down', self.dev_down) self.bus.add_signal_receiver(self.dev_name_changed, 'DeviceNameChanged', 'org.bluez.Device', 'org.bluez', '/org/bluez/Device/hci0') obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path) self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller') self.ctl.connect_to_signal('InquiryStart', self.inquiry_start) self.ctl.connect_to_signal('InquiryResult', self.inquiry_result) self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete) self.ctl.connect_to_signal('RemoteName', self.remote_name) self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed) self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete) except dbus.DBusException, e: print 'Failed to setup device path: %s' % e sys.exit(1) def dbus_setup(self): self.bus = dbus.SystemBus() manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager') self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager') self.manager.connect_to_signal('DeviceAdded', self.device_added) self.manager.connect_to_signal('DeviceRemoved', self.device_removed) def usage(self): print 'Usage: %s [-i ] [-l] [-h] [arg1..]' % self.name print ' -i Specify device (e.g. "hci0" or "/org/bluez/Device/hci0")' print ' -l Listen for events (no command required)' print ' -h Show this help' print 'Manager commands:' for cmd in mgr_cmds: print '\t%s' % cmd print 'Device commands:' for cmd in dev_cmds: print '\t%s' % cmd def device_added(self, path): print 'DeviceAdded: %s' % path def device_removed(self, path): print 'DeviceRemoved: %s' % path def remote_name(self, bda, name): print 'RemoteName: %s, %s' % (bda, name) if 'RemoteName' in self.exit_events: self.main_loop.quit() def remote_name_failed(self, bda, status): print 'RemoteNameFailed: %s, 0x%02X' % (bda, status) if 'RemoteNameFailed' in self.exit_events: self.main_loop.quit() def inquiry_start(self): print 'InquiryStart' def inquiry_complete(self): print 'InquiryComplete' if 'InquiryComplete' in self.exit_events: self.main_loop.quit() def inquiry_result(self, bda, cls, rssi): print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi) def authentication_complete(self, bda, status): print 'AuthenticationComplete: %s, 0x%02X' % (bda, status) if 'AuthenticationComplete' in self.exit_events: self.main_loop.quit() def dev_up(self): print 'Up' def dev_down(self): print 'Down' @dbus.decorators.explicitly_pass_message def dev_name_changed(*args, **keywords): name = args[1] dbus_message = keywords["dbus_message"] print 'Device %s name changed: %s' % (dbus_message.get_path(), name) def signal_cb(self, sig, frame): print 'Caught signal, exiting' if self.at_interrupt: self.at_interrupt() self.main_loop.quit() def run(self): # Manager methods if self.listen: print 'Listening for events...' elif self.cmd == 'DeviceList': for dev in self.manager.DeviceList(): print dev elif self.cmd == 'DefaultDevice': print self.manager.DefaultDevice() # Device methods elif self.cmd == 'Up': try: self.dev.Up() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'Down': try: self.dev.Down() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'SetProperty': if len(self.cmd_args) < 2: print 'Usage: %s -i SetProperty strPropName arg' % self.name sys.exit(1) try: if self.cmd_args[0].lower() in dev_setprop_bool: self.dev.SetProperty(self.cmd_args[0], dbus.Boolean(self.cmd_args[1])) elif self.cmd_args[0].lower() in dev_setprop_byte: self.dev.SetProperty(self.cmd_args[0], dbus.Byte(self.cmd_args[1])) else: self.dev.SetProperty(self.cmd_args[0], self.cmd_args[1]) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'GetProperty': if len(self.cmd_args) < 1: print 'Usage: %s -i GetProperty strPropName' % self.name sys.exit(1) try: print self.dev.GetProperty(self.cmd_args[0]) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) # Device.Controller methods elif self.cmd == 'Inquiry': try: if len(self.cmd_args) != 2: self.ctl.Inquiry() else: length, lap = self.cmd_args self.ctl.Inquiry(dbus.Byte(length), dbus.UInt32(long(lap, 0))) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) self.listen = True self.exit_events.append('InquiryComplete') self.at_interrupt = self.ctl.CancelInquiry elif self.cmd == 'CancelInquiry': try: self.ctl.CancelInquiry() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'RemoteName': if len(self.cmd_args) < 1: print 'Bluetooth address needed' sys.exit(1) try: self.ctl.RemoteName(self.cmd_args[0]) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) self.listen = True self.exit_events.append('RemoteNameFailed') self.exit_events.append('RemoteName') elif self.cmd == 'PeriodicInquiry': try: if len(self.cmd_args) < 3: length, min, max = (6, 20, 60) self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max)) elif len(self.cmd_args) == 3: length, min, max = self.cmd_args self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max)) else: length, min, max, lap = self.cmd_args self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max), dbus.UInt32(long(lap, 0))) self.listen = True except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'CancelPeriodic': try: self.ctl.CancelPeriodic() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'Authenticate': if len(self.cmd_args) < 1: print 'Bluetooth address needed' sys.exit(1) try: self.ctl.Authenticate(self.cmd_args[0]) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) self.listen = True self.exit_events.append('AuthenticationComplete') elif self.cmd == 'RoleSwitch': if len(self.cmd_args) < 2: print 'Bluetooth address and role needed' exit.exit(1) bda, role = self.cmd_args if not (role == '0' or role == '1'): print 'Role should be 0 (master) or 1 (slave)' sys.exit(1) try: self.ctl.RoleSwitch(bda, dbus.Byte(role)) except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) elif self.cmd == 'Connections': connections = self.ctl.Connections() for conn in connections: print conn else: print 'Unknown command: %s' % self.cmd sys.exit(1) if self.listen: signal(SIGINT, self.signal_cb) signal(SIGTERM, self.signal_cb) self.main_loop = gobject.MainLoop() self.main_loop.run() if __name__ == '__main__': gobject.threads_init() dbus.glib.init_threads() tester = Tester(sys.argv) tester.run()