diff --git a/nordicsemi/dfu/dfu_transport_ble.py b/nordicsemi/dfu/dfu_transport_ble.py index 9d8b353..e1765f8 100644 --- a/nordicsemi/dfu/dfu_transport_ble.py +++ b/nordicsemi/dfu/dfu_transport_ble.py @@ -38,14 +38,18 @@ # Python standard library import os import sys +import time +import wrapt import Queue import struct import logging import binascii +from types import NoneType from nordicsemi.dfu.dfu_transport import DfuTransport, DfuEvent from pc_ble_driver_py.exceptions import NordicSemiException, IllegalStateException -from pc_ble_driver_py.ble_driver import BLEDriver, BLEDriverObserver, BLEEnableParams, BLEUUIDBase, BLEUUID, BLEAdvData, BLEGapConnParams, NordicSemiException +from pc_ble_driver_py.ble_driver import BLEDriver, BLEDriverObserver, BLEEnableParams, BLEUUIDBase, BLEGapSecKDist, BLEGapSecParams, \ + BLEGapIOCaps, BLEUUID, BLEAdvData, BLEGapConnParams, BLEEvtID, BLEGattHVXType, NordicSemiErrorCheck, BLEGapSecStatus, driver from pc_ble_driver_py.ble_driver import ATT_MTU_DEFAULT from pc_ble_driver_py.ble_adapter import BLEAdapter, BLEAdapterObserver, EvtSync @@ -64,29 +68,44 @@ class ValidationException(NordicSemiException): pass - class DFUAdapter(BLEDriverObserver, BLEAdapterObserver): - BASE_UUID = BLEUUIDBase([0x8E, 0xC9, 0x00, 0x00, 0xF3, 0x15, 0x4F, 0x60, - 0x9F, 0xB8, 0x83, 0x88, 0x30, 0xDA, 0xEA, 0x50]) + + BASE_UUID = BLEUUIDBase([0x8E, 0xC9, 0x00, 0x00, 0xF3, 0x15, 0x4F, 0x60, + 0x9F, 0xB8, 0x83, 0x88, 0x30, 0xDA, 0xEA, 0x50]) + + # Buttonless characteristics + BLE_DFU_BUTTONLESS_CHAR_UUID = BLEUUID(0x0003, BASE_UUID) + BLE_DFU_BUTTONLESS_BONDED_CHAR_UUID = BLEUUID(0x0004, BASE_UUID) + SERVICE_CHANGED_UUID = BLEUUID(0x2A05) + + # Bootloader characteristics CP_UUID = BLEUUID(0x0001, BASE_UUID) DP_UUID = BLEUUID(0x0002, BASE_UUID) - LOCAL_ATT_MTU = 23 + CONNECTION_ATTEMPTS = 3 + ERROR_CODE_POS = 2 + LOCAL_ATT_MTU = 247 - def __init__(self, adapter): + def __init__(self, adapter, bonded=False, keyset=None): super(DFUAdapter, self).__init__() - self.evt_sync = EvtSync(['connected', 'disconnected']) + + self.evt_sync = EvtSync(['connected', 'disconnected', 'sec_params', + 'auth_status', 'conn_sec_update']) self.conn_handle = None self.adapter = adapter + self.bonded = bonded + self.keyset = keyset self.notifications_q = Queue.Queue() + self.indication_q = Queue.Queue() + self.att_mtu = ATT_MTU_DEFAULT + self.packet_size = self.att_mtu - 3 self.adapter.observer_register(self) self.adapter.driver.observer_register(self) - def open(self): self.adapter.driver.open() ble_enable_params = BLEEnableParams(vs_uuid_count = 10, - service_changed = False, + service_changed = True, periph_conn_count = 0, central_conn_count = 1, central_sec_count = 1) @@ -97,61 +116,240 @@ def open(self): self.adapter.driver.ble_enable(ble_enable_params) self.adapter.driver.ble_vs_uuid_add(DFUAdapter.BASE_UUID) + def close(self): + if self.conn_handle is not None: + logger.info('BLE: Disconnecting from target') + self.adapter.disconnect(self.conn_handle) + self.evt_sync.wait('disconnected') + self.conn_handle = None + self.evt_sync = None + self.adapter.observer_unregister(self) + self.adapter.driver.observer_unregister(self) + self.adapter.driver.close() + def connect(self, target_device_name, target_device_addr): + """ Connect to Bootloader or Application with Buttonless Service. + + Args: + target_device_name (str): Device name to scan for. + target_device_addr (str): Device addr to scan for. + """ self.target_device_name = target_device_name self.target_device_addr = target_device_addr - logger.debug('BLE: connect: target address: 0x{}'.format(self.target_device_addr)) - logger.info('BLE: Scanning...') + + logger.info('BLE: Scanning for {}'.format(self.target_device_name)) self.adapter.driver.ble_gap_scan_start() - self.conn_handle = self.evt_sync.wait('connected') + self.verify_stable_connection() if self.conn_handle is None: raise NordicSemiException('Timeout. Target device not found.') - logger.info('BLE: Connected to target') - logger.debug('BLE: Service Discovery') + + logger.info('BLE: Service Discovery') + self.adapter.service_discovery(conn_handle=self.conn_handle) + + # Check if connected peer has Buttonless service. + if self.adapter.db_conns[self.conn_handle].get_cccd_handle(DFUAdapter.BLE_DFU_BUTTONLESS_CHAR_UUID): + self.jump_from_buttonless_mode_to_bootloader(DFUAdapter.BLE_DFU_BUTTONLESS_CHAR_UUID) + elif self.adapter.db_conns[self.conn_handle].get_cccd_handle(DFUAdapter.BLE_DFU_BUTTONLESS_BONDED_CHAR_UUID): + self.jump_from_buttonless_mode_to_bootloader(DFUAdapter.BLE_DFU_BUTTONLESS_BONDED_CHAR_UUID) + + if self.bonded: + # For combined Updates with bonds enabled, re-encryption is needed + self.encrypt() if nrf_sd_ble_api_ver >= 3: if DFUAdapter.LOCAL_ATT_MTU > ATT_MTU_DEFAULT: logger.info('BLE: Enabling longer ATT MTUs') self.att_mtu = self.adapter.att_mtu_exchange(self.conn_handle) - logger.info('BLE: ATT MTU: {}'.format(self.att_mtu)) else: logger.info('BLE: Using default ATT MTU') - self.adapter.service_discovery(conn_handle=self.conn_handle) logger.debug('BLE: Enabling Notifications') self.adapter.enable_notification(conn_handle=self.conn_handle, uuid=DFUAdapter.CP_UUID) return self.target_device_name, self.target_device_addr - def close(self): + def jump_from_buttonless_mode_to_bootloader(self, buttonless_uuid): + """ Function for going to bootloader mode from application with + buttonless service. It supports both bonded and unbonded + buttonless characteristics. + + Args: + buttonless_uuid: UUID of discovered buttonless characteristic. + + """ + if buttonless_uuid == DFUAdapter.BLE_DFU_BUTTONLESS_BONDED_CHAR_UUID: + logger.info("Bonded Buttonless characteristic discovered -> Bond") + self.bond() + else: + logger.info("Un-bonded Buttonless characteristic discovered -> Increment target device addr") + self.target_device_addr = "{:X}".format(int(self.target_device_addr, 16) + 1) + self.target_device_addr_type.addr[-1] += 1 + + # Enable indication for Buttonless DFU Service + self.adapter.enable_indication(self.conn_handle, buttonless_uuid) + + # Enable indication for Service changed Service + self.adapter.enable_indication(self.conn_handle, DFUAdapter.SERVICE_CHANGED_UUID) + + # Enter DFU mode + self.adapter.write_req(self.conn_handle, buttonless_uuid, [0x01]) + response = self.indication_q.get(timeout=DfuTransportBle.DEFAULT_TIMEOUT) + if response[DFUAdapter.ERROR_CODE_POS] != 0x01: + raise Exception("Error - Unexpected response") + + # Wait for buttonless peer to disconnect + self.evt_sync.wait('disconnected') + + # Reconnect + self.target_device_name = None + self.adapter.driver.ble_gap_scan_start() + self.verify_stable_connection() + if self.conn_handle is None: + raise NordicSemiException('Timeout. Target device not found.') + logger.info('BLE: Connected to target') + + logger.debug('BLE: Service Discovery') + self.adapter.service_discovery(conn_handle=self.conn_handle) + + def verify_stable_connection(self): + """ Verify connection event, and verify that unexpected disconnect + events are not received. + + Returns: + True if connected, else False. + + """ + self.conn_handle = self.evt_sync.wait('connected') if self.conn_handle is not None: - logger.info('BLE: Disconnecting from target') - self.adapter.disconnect(self.conn_handle) - self.evt_sync.wait('disconnected') - self.conn_handle = None - self.evt_sync = None - self.adapter.observer_unregister(self) - self.adapter.driver.observer_unregister(self) - self.adapter.driver.close() + retries = DFUAdapter.CONNECTION_ATTEMPTS + while retries: + if self.evt_sync.wait('disconnected', timeout=1) is None: + break + + logger.warning("Received unexpected disconnect event, " + "trying to re-connect to: {}".format(self.target_device_addr)) + time.sleep(1) + + self.adapter.connect(address=self.target_device_addr_type, + conn_params=self.conn_params) + self.conn_handle = self.evt_sync.wait('connected') + retries -= 1 + else: + if self.evt_sync.wait('disconnected', timeout=1) is not None: + raise Exception("Failure - Connection failed due to 0x3e") + + logger.info("Successfully Connected") + return + self.adapter.driver.ble_gap_scan_stop() + raise Exception("Connection Failure - Device not found!") + + def setup_keyset(self): + """ Setup keyset structure. + + """ + self.keyset = driver.ble_gap_sec_keyset_t() + + self.id_key_own = driver.ble_gap_id_key_t() + self.id_key_peer = driver.ble_gap_id_key_t() + + self.enc_key_own = driver.ble_gap_enc_key_t() + self.enc_key_peer = driver.ble_gap_enc_key_t() + + self.sign_info_own = driver.ble_gap_sign_info_t() + self.sign_info_peer = driver.ble_gap_sign_info_t() + + self.lesc_pk_own = driver.ble_gap_lesc_p256_pk_t() + self.lesc_pk_peer = driver.ble_gap_lesc_p256_pk_t() + + self.keyset.keys_own.p_enc_key = self.enc_key_own + self.keyset.keys_own.p_id_key = self.id_key_own + self.keyset.keys_own.p_sign_key = self.sign_info_own + self.keyset.keys_own.p_pk = self.lesc_pk_own + self.keyset.keys_peer.p_enc_key = self.enc_key_peer + self.keyset.keys_peer.p_id_key = self.id_key_peer + self.keyset.keys_peer.p_sign_key = self.sign_info_peer + self.keyset.keys_peer.p_pk = self.lesc_pk_peer + + def setup_sec_params(self): + """ Setup Security parameters. + + """ + + self.kdist_own = BLEGapSecKDist(enc=True, + id=True, + sign=False, + link=False) + self.kdist_peer = BLEGapSecKDist(enc=True, + id=True, + sign=False, + link=False) + self.sec_params = BLEGapSecParams(bond=True, + mitm=False, + lesc=False, + keypress=False, + io_caps=BLEGapIOCaps.none, + oob=False, + min_key_size=7, + max_key_size=16, + kdist_own=self.kdist_own, + kdist_peer=self.kdist_peer) + + def bond(self): + """ Bond to Application with Buttonless Service. + + """ + self.bonded = True + self.setup_sec_params() + self.setup_keyset() + + self.adapter.driver.ble_gap_authenticate(self.conn_handle, self.sec_params) + self.evt_sync.wait(evt="sec_params") + self.adapter.driver.ble_gap_sec_params_reply(self.conn_handle, + BLEGapSecStatus.success, + None, + self.keyset, + None) + + result = self.evt_sync.wait(evt="auth_status") + if result != BLEGapSecStatus.success: + raise NordicSemiException("Auth Status returned error code: {}".format(result)) + + def encrypt(self): + """ Re-encrypt to bootloader. + + """ + logger.info("Re-encryption to bootloader") + self.adapter.driver.ble_gap_encrypt(self.conn_handle, + self.keyset.keys_peer.p_enc_key.master_id, + self.keyset.keys_peer.p_enc_key.enc_info) + self.evt_sync.wait('conn_sec_update') def write_control_point(self, data): self.adapter.write_req(self.conn_handle, DFUAdapter.CP_UUID, data) - def write_data_point(self, data): self.adapter.write_cmd(self.conn_handle, DFUAdapter.DP_UUID, data) + def on_gap_evt_sec_params_request(self, ble_driver, conn_handle, peer_params): + logger.info("Got sec params req") + self.evt_sync.notify(evt='sec_params', data=conn_handle) + + def on_gap_evt_auth_status(self, ble_driver, conn_handle, auth_status): + logger.info("Got auth status:{}".format(auth_status)) + self.evt_sync.notify(evt='auth_status', data=auth_status) + + def on_gap_evt_conn_sec_update(self, ble_driver, conn_handle): + logger.info("Got Conn sec update") + self.evt_sync.notify(evt='conn_sec_update', data=conn_handle) def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params): self.evt_sync.notify(evt = 'connected', data = conn_handle) - logger.info('BLE: Connected to {}'.format(peer_addr)) - + logger.info('BLE: Connected to {}'.format(peer_addr.addr)) def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason): self.evt_sync.notify(evt = 'disconnected', data = conn_handle) self.conn_handle = None - logger.info('BLE: Disconnected') - + logger.info('BLE: Disconnected with reason: {}'.format(reason)) def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data): dev_name_list = [] @@ -163,43 +361,65 @@ def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_ty dev_name = "".join(chr(e) for e in dev_name_list) address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr) - logger.debug('Received advertisement report, address: 0x{}, device_name: {}'.format(address_string, dev_name)) + logger.info('Received advertisement report, address: 0x{}, device_name: {}'.format(address_string, dev_name)) if (dev_name == self.target_device_name) or (address_string == self.target_device_addr): - conn_params = BLEGapConnParams(min_conn_interval_ms = 15, - max_conn_interval_ms = 30, - conn_sup_timeout_ms = 4000, - slave_latency = 0) + self.conn_params = BLEGapConnParams(min_conn_interval_ms = 7.5, + max_conn_interval_ms = 30, + conn_sup_timeout_ms = 4000, + slave_latency = 0) logger.info('BLE: Found target advertiser, address: 0x{}, name: {}'.format(address_string, dev_name)) logger.info('BLE: Connecting to 0x{}'.format(address_string)) - self.adapter.connect(address = peer_addr, conn_params = conn_params) + self.adapter.connect(address = peer_addr, conn_params = self.conn_params) # store the address for subsequent connections self.target_device_addr = address_string - + self.target_device_addr_type = peer_addr def on_notification(self, ble_adapter, conn_handle, uuid, data): if self.conn_handle != conn_handle: return - if DFUAdapter.CP_UUID.value != uuid.value: return - #logger.debug(data) + if DFUAdapter.CP_UUID.value != uuid.value: + return self.notifications_q.put(data) + def on_indication(self, ble_adapter, conn_handle, uuid, data): + if self.conn_handle != conn_handle: return + if DFUAdapter.BLE_DFU_BUTTONLESS_BONDED_CHAR_UUID.value != uuid.value and \ + DFUAdapter.BLE_DFU_BUTTONLESS_CHAR_UUID.value != uuid.value: + return + self.indication_q.put(data) def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu): logger.info('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu)) - + self.att_mtu = att_mtu + self.packet_size = att_mtu - 3 def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs): - logger.info('ATT MTU exchange response: conn_handle={}'.format(conn_handle)) - + pass + + +class DfuBLEDriver(BLEDriver): + def __init__(self, serial_port, baud_rate=115200, auto_flash=False): + super(DfuBLEDriver, self).__init__(serial_port, baud_rate) + + @NordicSemiErrorCheck + @wrapt.synchronized(BLEDriver.api_lock) + def ble_gap_sec_params_reply(self, conn_handle, sec_status, sec_params, own_keys, peer_keys): + assert isinstance(sec_status, BLEGapSecStatus), 'Invalid argument type' + assert isinstance(sec_params, (BLEGapSecParams, NoneType)), 'Invalid argument type' + assert isinstance(peer_keys, NoneType), 'NOT IMPLEMENTED' + + return driver.sd_ble_gap_sec_params_reply(self.rpc_adapter, + conn_handle, + sec_status.value, + sec_params.to_c() if sec_params else None, + own_keys) class DfuTransportBle(DfuTransport): - DATA_PACKET_SIZE = 20 DEFAULT_TIMEOUT = 20 RETRIES_NUMBER = 3 - def __init__(self, serial_port, target_device_name=None, @@ -214,15 +434,18 @@ def __init__(self, self.dfu_adapter = None self.prn = prn + self.bonded = False + self.keyset = None + def open(self): if self.dfu_adapter: raise IllegalStateException('DFU Adapter is already open') super(DfuTransportBle, self).open() - driver = BLEDriver(serial_port = self.serial_port, - baud_rate = self.baud_rate) + driver = DfuBLEDriver(serial_port = self.serial_port, + baud_rate = self.baud_rate) adapter = BLEAdapter(driver) - self.dfu_adapter = DFUAdapter(adapter = adapter) + self.dfu_adapter = DFUAdapter(adapter=adapter, bonded=self.bonded, keyset=self.keyset) self.dfu_adapter.open() self.target_device_name, self.target_device_addr = self.dfu_adapter.connect( target_device_name = self.target_device_name, @@ -230,13 +453,17 @@ def open(self): self.__set_prn() def close(self): + + # Get bonded status and BLE keyset from DfuAdapter + self.bonded = self.dfu_adapter.bonded + self.keyset = self.dfu_adapter.keyset + if not self.dfu_adapter: raise IllegalStateException('DFU Adapter is already closed') super(DfuTransportBle, self).close() self.dfu_adapter.close() self.dfu_adapter = None - def send_init_packet(self, init_packet): def try_to_recover(): if response['offset'] == 0 or response['offset'] > len(init_packet): @@ -278,7 +505,6 @@ def try_to_recover(): else: raise NordicSemiException("Failed to send init packet") - def send_firmware(self, firmware): def try_to_recover(): if response['offset'] == 0: @@ -328,26 +554,22 @@ def try_to_recover(): raise NordicSemiException("Failed to send firmware") self._send_event(event_type=DfuEvent.PROGRESS_EVENT, progress=len(data)) - def __set_prn(self): logger.debug("BLE: Set Packet Receipt Notification {}".format(self.prn)) self.dfu_adapter.write_control_point([DfuTransportBle.OP_CODE['SetPRN']] + map(ord, struct.pack('= 6.0 ecdsa >= 0.13 behave protobuf -pc_ble_driver_py >= 0.8.1 +pc_ble_driver_py >= 0.11.1 tqdm piccata pyspinel