BLE: Temperature sensor with Unihiker K10 and M10

0 290 Easy

In this second part of the BLE tutorial series for DFRobots Unihiker platform, we will dive into how to use Bluetooth Low Energy (BLE) to transmit real-time temperature data from the Unihiker K10 (peripheral) to the Unihiker M10 (central). The K10 reads temperature sensor data and notifies the M10. On both displays will be a simple graphical interface.

You will learn:

- How to develop the code for K10 to advertise a BLE service with a temperature characteristic.
- How to develop the code to make the M10 scan, connect, and subscribe to BLE notifications from the K10.
- How to handle BLE data updates and show real-time temperature in a simple GUI.
- How to upload and run Python/MicroPython code on Unihiker boards.

If you followed the previous tutorial, you will already have a basic understanding of BLE on Unihiker. Now we will expand that with a practical sensor data example!

HARDWARE LIST
1 Unihiker M10
1 Unihiker K10
2 USB cables
STEP 1
Project Setup

As mentioned in the introduction, this tutorial builds on the previous one. Take a quick look at which folders and files we have already created. In this section, a new folder named TemperatureSensor is created in the BLE folder. This time, two additional folders named K10 and M10 are created within it. An empty file called main.py is created in each of the new folders (K10 and M10).

When you're done, the structure should look something like this:

CODE
BLE/
├── SimpleMessage/
│   ├── K10/
│   │   └── main.py
│   └── M10/
│       └── main.py
└── TemperatureSensor/
    ├── K10/
    │   └── main.py
    └── M10/
        └── main.py

Note: This time, I will skip the preparation for flashing the MicroPython firmware (K10), installing the Thonny IDE and to install on M10 the required Python modules. If you still need to do that, read the previous tutorial to learn how.

STEP 2
The code for both devices

MicroPython code for Unihiker K10 (BLE Peripheral)

This MicroPython code:

- Initializes the BLE peripheral
- Advertises a temperature service with a notify characteristic
- Periodically reads the onboard temperature sensor
- Sends updated temperature values over BLE

CODE
from micropython import const
from machine import Timer
from utime import sleep_ms
from struct import pack
from ubluetooth import BLE, UUID, FLAG_NOTIFY, FLAG_WRITE
from unihiker_k10 import screen, temp_humi


DELAY_MS = const(3000)
BG_COLOR = const(0x000000)
INFO_COLOR = const(0x0000FF)
MSG_COLOR = const(0xFFFFFF)


class TemperaturePeripheral:

    IRQ_CENTRAL_CONNECT = const(1)
    IRQ_CENTRAL_DISCONNECT = const(2)
    IRQ_GATTS_WRITE = const(3)
    MAX_CONNECTIONS = const(3)
    TEMPERATURE_SERVICE_UUID = UUID(0x1809)
    TEMPERATURE_CHAR_UUID = UUID(0x2A6E)
    FLAG_READ = const(0x02)
    FLAG_NOTIFY = const(0x10)

    def __init__(self, device_name, notify_interval_ms=3000):
        self._ble = BLE()
        self._ble.active(True)
        self._ble.config(gap_name=device_name)

        mac = self._ble.config('mac')[1]
        self.mac_str = ':'.join('{:02X}'.format(b) for b in mac)

        self._ble.irq(self._irq)
        self._connections = set()
        self.temp_celsius = None
        self._payload = self._make_payload(name=device_name)

        self._temperature_char = (self.TEMPERATURE_CHAR_UUID, self.FLAG_READ | self.FLAG_NOTIFY)

        self._service = (self.TEMPERATURE_SERVICE_UUID, (self._temperature_char, ), )

        ((self.tx_handle, ), ) = self._ble.gatts_register_services((self._service,))

        self._timer = Timer(-1)
        self._notify_interval_ms = notify_interval_ms
        self._start_notify_timer()

        self._advertise()

    def _irq(self, event, data) -> None:
        if event == self.IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            self._connections.add(conn_handle)
            print('[INFO] central connected')

            self._advertise()
        elif event == self.IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            self._connections.discard(conn_handle)
            print('[INFO] central disconnected')

            self._advertise()

    @staticmethod
    def _make_payload(name) -> bytearray:
        name_bytes = name.encode('utf-8')

        payload = bytearray()
        payload += bytes((2, 0x01, 0x06))
        payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes

        return payload

    def _advertise(self) -> None:
        if len(self._connections) < self.MAX_CONNECTIONS:
            print('[INFO] send ble advertise')
            self._ble.gap_advertise(100, self._payload)
        else:
            self._ble.gap_advertise(None)

    def _read_notify_temperature(self) -> None:
        self.temp_celsius = temp_humi.read_temp()

        if self._connections:
            value = pack('<f', self.temp_celsius)

            for conn_handle in self._connections:
                self._ble.gatts_notify(conn_handle, self.tx_handle, value)

    def _start_notify_timer(self) -> None:
        self._timer.init(
            period=self._notify_interval_ms,
            mode=Timer.PERIODIC,
            callback=lambda t: self._read_notify_temperature()
        )


if __name__ == '__main__':
    # variables
    ble_name = 'Unihiker K10'

    # initialize BLE
    ble = TemperaturePeripheral(device_name=ble_name, notify_interval_ms=DELAY_MS)

    # initialize display
    screen.init(dir=2)
    screen.show_bg(color=BG_COLOR)

    # display loop
    while True:
        screen.draw_text(text=ble_name, x=10, y=10, color=INFO_COLOR)
        screen.draw_line(x0=10, y0=30, x1=230, y1=30, color=INFO_COLOR)

        temp_value = ble.temp_celsius
        if temp_value is not None:
            print(f'[INFO] {round(temp_value, 2)} Celsius')

            txt = f'Sensor: {round(temp_value, 2)} °C'
            screen.draw_text(text=txt, x=10, y=40, color=MSG_COLOR)

        screen.show_draw()
        sleep_ms(DELAY_MS)

Python code for Unihiker M10 (BLE Central)

This python code:

- Scans for the K10 BLE signal
- Connects and subscribes to temperature notifications
- Displays temperature updates in a simple Tkinter GUI

CODE
import asyncio
from signal import signal, SIGINT
from struct import unpack
from threading import Thread
from tkinter import Tk, Label
from types import FrameType
from typing import Optional
from bleak import BleakScanner, BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic


DEVICE_NAME: str = "Unihiker K10"
SERVICE_UUID: str = "00001809-0000-1000-8000-00805f9b34fb"
CHAR_UUID: str = "00002a6e-0000-1000-8000-00805f9b34fb"
SCREEN_WIDTH: int = 240
SCREEN_HEIGHT: int = 320
BG_COLOR: str = "white"
FONT_COLOR: str = "black"


def update_label(text: str) -> None:
    """
    Updates the text of the global `temperature_label` widget to the provided value.

    :param text: New text value to update in the `temperature_label` widget.
    :type text: str
    :return: None
    """
    global temperature_label

    if temperature_label:
        temperature_label.after(0, temperature_label.config, {'text': text})


def handle_temperature(characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
    """
    Handles temperature data received from a Bluetooth Low Energy (BLE) GATT
    characteristic.

    :param characteristic: The BLE GATT characteristic from which the data is read.
    :type characteristic: BleakGATTCharacteristic
    :param data: The raw data received from the characteristic, encoded as a bytearray.
    :type data: bytearray
    :return: None
    """
    _ = characteristic

    temp = unpack("<f", data)[0]
    print(f"[INFO] Temperature: {temp:.2f} Celsius")

    update_label(f"{temp:.2f} °C")


def shutdown_handler(signum: int, frame: Optional[FrameType]) -> None:
    """
    Handles shutdown signals for clean application termination.

    :param signum: Signal number indicating the received signal.
    :type signum: int
    :param frame: Current stack frame when the signal was received.
    :type frame: Optional[FrameType]
    :return: None
    """
    _ = signum
    _ = frame

    global main_task

    print("[INFO] Disconnect requested...")
    if main_task:
        main_task.cancel()


async def main() -> None:
    """
    Discover and connect to a specific BLE device, then start receiving notifications.

    :raises asyncio.CancelledError: Raised when the user interrupts the operation.
    :raises Exception: For any unexpected errors during scanning, connecting, or notification handling.
    :return: None
    """
    global client

    print("[INFO] Scanning for BLE devices...")
    update_label("Scanning...")
    devices = await BleakScanner.discover(timeout=5.0)

    k10_device = next((d for d in devices if d.name == DEVICE_NAME), None)
    if not k10_device:
        print(f"[ERROR] {DEVICE_NAME} not found")
        update_label("Nothing found")
        return

    print(f"[INFO] Connect to {k10_device.name} ({k10_device.address})...")
    update_label("Connecting...")
    client = BleakClient(k10_device.address)

    try:
        await client.connect()
        print("[INFO] Connected. Starting notifications...")
        update_label("Connected...")

        await client.start_notify(CHAR_UUID, handle_temperature)

        print("[INFO] Waiting for notifications... Press Ctrl+C to exit")
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("[INFO] Cancelled by user (Ctrl+C)")
        update_label("Cancelled...")
    except Exception as e:
        print(f"[ERROR] {e}")
        update_label("Error")
    finally:
        if client and client.is_connected:
            print("[INFO] Stopping notifications and disconnecting...")
            await client.stop_notify(CHAR_UUID)
            await client.disconnect()
            print("[INFO] Connection closed")
            update_label("Disconnected...")


def start_ble_loop() -> None:
    """
    Starts the Bluetooth Low Energy (BLE) event loop.

    :return: None
    """
    global main_task

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    main_task = loop.create_task(main())

    try:
        loop.run_until_complete(main_task)
    finally:
        loop.close()


def start_gui_loop() -> None:
    """
    Starts the graphical user interface loop and initializes the main application window.

    :return: None
    """
    global temperature_label

    root = Tk()
    root.geometry(f'{SCREEN_WIDTH}x{SCREEN_HEIGHT}+0+0')
    root.resizable(False, False)
    root.configure(bg=BG_COLOR)

    logo_label = Label(root, text="Unihiker M10", font=("Arial", 24), fg=FONT_COLOR, bg=BG_COLOR)
    logo_label.pack()

    temperature_label = Label(root, text="-- °C", font=("Arial", 22, "bold"), fg=FONT_COLOR, bg=BG_COLOR)
    temperature_label.pack(expand=True, fill='both')

    ble_thread = Thread(target=start_ble_loop, daemon=True)
    ble_thread.start()

    root.mainloop()


if __name__ == "__main__":
    client = None
    main_task = None
    temperature_label = None

    signal(SIGINT, shutdown_handler)
    start_gui_loop()

Note: Make sure to copy the correct code into the corresponding main.py!

STEP 3
Upload and run

These steps are also no different from the previous tutorial! Make sure you load the correct main.py file onto the respective Unihiker device.

First, launch Unihiker K10. After a few seconds, the screen should look like this:

The final step is to run the Python code on the Unihiker M10. If everything goes well, it should look like this:

STEP 4
Further ideas

Ready to expand? Here are some cool next steps:

- Add a Humidity Sensor: Modify the K10 code to include humidity readings.
- Display Graphs on M10: Use matplotlib or a similar library to graph temperature trends in real time.
- Multiple Peripherals: Extend the code to connect the M10 to multiple K10 devices and aggregate their data.

License
All Rights
Reserved
licensBg
0