基于BlueZ的BLE Peripheral开发

原创
2023/06/14 10:13
阅读数 591

基于BlueZ的BLE Peripheral开发

概述

BLE Peripheral设备通常以GATT Server模式存在。Peripheral设备首先需要向外广播Advertisement,以通知周围Central设备。Central设备发现Peripheral设备后会主动发起Peripheral设备连接操作和配对操作。只有当连接和配对操作成功后,Central设备作为GATT Client会向Peripheral设备发送读取所有GATT Services操作,之后根据GATT Server提供的service/characteristic执行读写、订阅通知等操作。

Advertising

  1. 概述。
    Advertising是Peripheral设备周期性的广播小于31个字节的数据包,目的是能够通知周围的Central设备知道该Peripheral设备是否存在。
    BlueZ为Advertising定义了两个接口,接口名称分别是LEAdvertisement1和LEAdvertisingManager1。

  2. 创建Advertisement类。
    Advertisement类继承自dbus.servivce.Object类。除了具备Object类属性外,Advertisement类还添加了额外的属性,这些额外的数据包含在周期性广播数据包内。
    以python为例,创建Advertisement类代码如下:

class Advertisement(dbus.service.Object):
    PATH_BASE = '/org/bluez/xxx/advertisement'
    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = 'Hello'
        self.include_tx_power = False
        self.data = None
        self.discoverable = True
        dbus.service.Object.__init__(self, bus, self.path)
  1. 添加Advertisement属性。
    前面步骤创建的Advertisement类以及在类构造函数中初始化了一些Advertisement类自定义的数据类型,这些数据除了定义外,还需要被BlueZ访问,之后由BlueZ负责将这些属性值添加到广播数据中。
    BlueZ通过D-Bus的org.freedesktop.DBus.Properties接口中的GetAll()方法获取所有属性值(属性值以字典格式存储)。
    Advertisement类需要一个成员函数get_properties()负责将所有属性值以字典格式输出。以python为例,get_properties()成员函数说明如下:
    def get_properties(self):
        properties = dict()
        properties['Type'] = self.ad_type
        if self.service_uuids is not None:
            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s')
        if self.solicit_uuids is not None:
            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s')
        if self.manufacturer_data is not None:
            properties['ManufacturerData'] = dbus.Dictionary(self.manufacturer_data, signature='qv')
        if self.service_data is not None:
            properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv')
        if self.local_name is not None:
            properties['LocalName'] = dbus.String(self.local_name)
        if self.discoverable is not None and self.discoverable == True:
            properties['Discoverable'] = dbus.Boolean(self.discoverable)
        if self.include_tx_power:
            properties['Includes'] = dbus.Array(["tx-power"], signature='s')
        if self.data is not None:
            properties['Data'] = dbus.Dictionary(self.data, signature='yv')
        print(properties)
        return {"org.bluez.LEAdvertisingManager1": properties}
  1. 实现必要的类方法。
  • get_path():获取DBus格式的Object路径的方法。以python为例,get_path()成员函数说明如下:
    def get_path(self):
        return dbus.ObjectPath(self.path)
  • GetAll():获取接口所有属性的方法。该方法是override类org.freedesktop.DBus.Properties的GetAll()方法。以python为例,GetAll()函数说明如下:
    @dbus.service.method("org.freedesktop.DBus.Properties", in_signature='s', out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != "org.bluez.LEAdvertisement1":
            raise bluetooth_exceptions.InvalidArgsException()
        return self.get_properties()["org.bluez.LEAdvertisingManager1"]
  • Relase():当Advertisement对象从BlueZ中移除时被调用,目的是释放系统资源。该方法是override类org.bluez.LEAdvertisingManager1的Release()方法。以python为例,Release()函数说明如下:
    @dbus.service.method("org.bluez.LEAdvertisingManager1",in_signature='', out_signature='')
    def Release(self):
        print('%s: Released' % self.path)
  1. 获取LEAdvertisingManager1接口。
  • 获取system dbus接口。
    import dbus
    bus = dbus.SystemBus()
  • 获取蓝牙适配器对象。
    import dbus
    adapter_object = dbus.get_object("org.bluez", "/org/bluez/hci0")
  • 获取蓝牙适配器对象的LEAdvertisingManager1接口。
    import dbus
    adv_mgr_interface = dbus.Interface(adapter_object, "org.bluez.LEAdvertisingManager1")
  1. 注册Advertisement类对象。
  • 创建Advertisement类对象。
    adv = Advertisement(bus, 0, "peripheral")
  • 通过LEAdvertisingManager1接口注册Advertisement类对象。
    def register_ad_cb():
        print('Advertisement registered OK')
    def register_ad_error_cb(error):
        print('Error: Failed to register advertisement: ' + str(error))
        mainloop.quit()
    def start_advertising():
        global adv
        global adv_mgr_interface
        print("Registering advertisement",adv.get_path())
        adv_mgr_interface.RegisterAdvertisement(adv.get_path(), {},reply_handler=register_ad_cb, error_handler=register_ad_error_cb)

    start_advertising()

Connection and Disconnection

  1. 使用BlueZ和DBus处理连接事件。
  • InterfaceAdd信号: 当Central设备向Peripheral设备发起连接请求时,BlueZ会通过org.freedesktop.DBus.ObjectManager接口的InterfaceAdd信号广播待连接的蓝牙设备信息。
    def stop_advertising():
        global adv
        global adv_mgr_interface
        print("Unregistering advertisement",adv.get_path())
        adv_mgr_interface.UnregisterAdvertisement(adv.get_path())

    def set_connected_status(status):
        global connected
        if (status == 1):
            print("connected")
            connected = 1
            stop_advertising()
        else:
            print("disconnected")
            connected = 0
            start_advertising()

    def interfaces_added(path, interfaces):
        if "org.bluez.Device1" in interfaces:
            properties = interfaces["org.bluez.Device1"]
            if ("Connected" in properties):
                set_connected_status(properties["Connected"])

    bus.add_signal_receiver(interfaces_added, dbus_interface = bluetooth_constants.DBUS_OM_IFACE, signal_name = "InterfacesAdded")
  • PropertiesChanged信号:对于已经连接的设备来说,当连接事件发生时,BlueZ会通过org.freedesktop.DBus.Properties接口的PropertiesChanged信号广播蓝牙设备连接状态信息。
    def properties_changed(interface, changed, invalidated, path):
        if (interface == "org.bluez.Device1"):
            if ("Connected" in changed):
                set_connected_status(changed["Connected"])

    bus.add_signal_receiver(properties_changed, dbus_interface = bluetooth_constants.DBUS_PROPERTIES, signal_name = "PropertiesChanged", path_keyword = "path")
  1. 使用BlueZ和DBus处理断开连接事件。
  • PropertiesChanged信号:对于已经连接的设备来说,当断开连接事件发生时,BlueZ会通过org.freedesktop.DBus.Properties接口的PropertiesChanged信号广播蓝牙设备连接状态信息。
    def properties_changed(interface, changed, invalidated, path):
        if (interface == "org.bluez.Device1"):
            if ("Connected" in changed):
                set_connected_status(changed["Connected"])

    bus.add_signal_receiver(properties_changed, dbus_interface = bluetooth_constants.DBUS_PROPERTIES, signal_name = "PropertiesChanged", path_keyword = "path")

Implementing GATT Service and Characteristics

  1. Application。
  • Application类定义。 自定义的Application类需要继承自dbus.service.Object类,扩展自定义Application属性包括Application包含的services等信息。
    class Application(dbus.service.Object):
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        print("Adding TemperatureService to the Application")
        self.add_service(TemperatureService(bus, '/org/bluez/ldsg', 0))
  • Application类方法定义。
  • 获取Application路径方法。
     def get_path(self):
        return dbus.ObjectPath(self.path)
  • 添加services方法。
    def add_service(self, service):
        self.services.append(service)
  • 获取管理对象的方法。
    该方法由BlueZ调用,返回application包含的所有services/characteristics/descriptors信息。
    @dbus.service.method("org.freedesktop.DBus.ObjectManager", out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            print("GetManagedObjects: service="+service.get_path())
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response
  1. GATT Service。
  • GATT服务类定义。
    自定义的GATT服务需要继承自dbus.service.Object类,扩展自定义GATT服务属性包括uuid、service包含的characteristics等信息。
    class Service(dbus.service.Object):
        def __init__(self, bus, path_base, index, uuid, primary):
            self.path = path_base + "/service" + str(index)
            self.bus = bus
            self.uuid = uuid
            self.primary = primary
            self.characteristics = []
            dbus.service.Object.__init__(self, bus, self.path)
  • GATT服务类方法定义。
    • GATT服务类需要提供获取服务属性值的方法,即:GetAll方法。
    def get_properties(self):
        return {
            "org.bluez.GattService1": {
                'UUID': self.uuid,
                'Primary': self.primary,
                'Characteristics': dbus.Array(
                self.get_characteristic_paths(),
                signature='o')
        }
    }

    @dbus.service.method("org.freedesktop.DBus.Properties", in_signature='s',out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != "org.bluez.GattService1":
            raise bluetooth_exceptions.bluetooth_exceptions.InvalidArgsException()
        return self.get_properties()["org.bluez.GattService1"]
  • 获取服务路径的方法。
    def get_path(self):
        return dbus.ObjectPath(self.path)
  • 向GATT服务类添加Characteristic方法。
    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)
  • 获取服务所有Characteristic方法。
    def get_characteristics(self):
        return self.characteristics
  • 获取服务所有Characteristic路径的方法。
    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result
  1. GATT Characteristics。
  • GATT属性类定义。 自定义的GATT属性需要继承自dbus.service.Object类,扩展自定义GATT属性类属性包括uuid、属性flags、属性所属服务、属性包含的descriptors等信息。
    class Characteristic(dbus.service.Object):
    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path + '/char' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)
  • GATT属性类方法定义。
  • GATT属性类需要提供获取属性值的方法,即:GetAll方法。
    def get_properties(self):
        return {
                "org.bluez.GattCharacteristic1": {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }
    
    @dbus.service.method("org.freedesktop.DBus.Properties",
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != "org.bluez.GattCharacteristic1":
            raise bluetooth_exceptions.bluetooth_exceptions.InvalidArgsException()

        return self.get_properties()["org.bluez.GattCharacteristic1"]
  • 获取属性路径的方法。
    def get_path(self):
        return dbus.ObjectPath(self.path)
  • 添加属性描述信息的方法。
    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)
  • 获取属性描述信息集合的方法。
    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors
  • 属性读/写/通知方法。
    @dbus.service.method("org.bluez.GattCharacteristic1",
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print('Default ReadValue called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

    @dbus.service.method("org.bluez.GattCharacteristic1", in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

    @dbus.service.method("org.bluez.GattCharacteristic1")
    def StartNotify(self):
        print('Default StartNotify called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

    @dbus.service.method("org.bluez.GattCharacteristic1")
    def StopNotify(self):
        print('Default StopNotify called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

    @dbus.service.signal("org.freedesktop.DBus.Properties", signature='sa{sv}as')
    def PropertiesChanged(self, interface, changed, invalidated):
        pass
  1. GATT Descriptors。
  • Descriptor类定义。 自定义Descriptor类需要继承自dbus.service.Object类,扩展Descriptor类属性包括uuid、flags以及所属characteristic等信息。
    class Descriptor(dbus.service.Object):
    def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)
  • Descriptor类方法定义。
  • 提供获取属性值的方法,即:GetAll方法。
    def get_properties(self):
        return {
                "org.bluez.GattDescriptor1": {
                        'Characteristic': self.chrc.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                }
        }
    
    @dbus.service.method("org.freedesktop.DBus.Properties",
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != "org.bluez.GattDescriptor1":
            raise bluetooth_exceptions.InvalidArgsException()

        return self.get_properties()["org.bluez.GattDescriptor1"]
  • 获取Descriptor路径的方法。
    def get_path(self):
        return dbus.ObjectPath(self.path)
  • Descriptor读/写方法。
    @dbus.service.method("org.bluez.GattDescriptor1",
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print ('Default ReadValue called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

    @dbus.service.method("org.bluez.GattDescriptor1", in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise bluetooth_exceptions.NotSupportedException()

Permissions

  1. 蓝牙permission分类。
  • access permission:访问控制许可,包括读、写、读写。
  • encryption permission:只有加密的蓝牙连接才允许访问。
  • authentication permission:只有经过指定配对模式才允许访问,例如:mode 1 level 3以上。
  • authorization permission:自定义的授权方式。
  1. 蓝牙配对模式。
  • Just work模式。
    • 当蓝牙配对的某一方的agent是NoInputNoOutput模式时,蓝牙配对模式会使用Just work模式。
    • 不推荐,无法防止中间人攻击。
  • Passkey模式。
    • 蓝牙配对双方的agent都是KeyBoardDisplay模式时,蓝牙配对模式会使用Passkey模式。
    • 推荐,可以防止中间人攻击。
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部