This project is a hands-on tutorial for creating a smart home voice-controlled lighting system using the MQTT protocol. It guides you through configuring an MQTT broker on the UNIHIKER device, setting up and flashing MicroPython firmware onto an FireBeetle ESP32-C6 microcontroller, and integrating an Gravity: Offline Language Learning Voice Recognition Sensor to control an RGBW smart bulb (Shelly Duo - RGBW) through voice commands.
You will learn to program the system that respond to various voice commands, such as "turn on the light," "turn off the light," or change the bulb color to red, green, blue, or white.
I created already a detailed tutorial where I describe installation and configuration. You can find this tutorial here. In this project, I will show only the content of used configuration files for Mosquitto on UNIHIKER. Please create/adapt them and restart the service.
# stop mosquitto
$ systemctl stop mosquitto.service
# show mosquitto configuration file
$ cat /etc/mosquitto/conf.d/secure.conf
# show mosquitto passwd file
$ cat /etc/mosquitto/mqttpasswd
# show mosquitto acl file
$ cat /etc/mosquitto/user.acl
secure.conf
listener 1883
allow_anonymous false
password_file /etc/mosquitto/mqttpasswd
acl_file /etc/mosquitto/user.acl
mqttpasswd
admin:test123
shelly_bulb:test456
esp32:esp_test123
user.acl
user admin
topic readwrite #
topic read $SYS/#
user shelly_bulb
topic readwrite shellies/#
user esp32
topic write shellies/+/color/+/command
topic write shellies/+/color/+/set
pattern write $SYS/broker/connection/%c/state
After you're done with configuration, start the service.
# start mosquitto
$ systemctl start mosquitto.service
Once the MQTT broker is running on the UNIHIKER, you can use the Shelly RGBW light bulb and connect it to the broker.
Shelly Duo - RGBW
Note: The used IP is different for you! Please change to your needs.
Download and install latest VCP drivers. These are needed to recognize the ESP and communicate with it.
In order to flash firmware and to communicate with the ESP, a few Python libraries are required. These can be installed via 'pip'. You can install these libraries 'Globally' on your system or recommended into a Python 'virtualenv'.
It is best practice to create a text file (requirements.txt) with the necessary libraries.
requirements.txt
rshell
esptool
micropython-esp32-stubs
esptool - to flash firmware
rshell - to communicate with ESP
micropython-stubs - for better development (optional)
# upgrade pip (optional)
$ pip install -U pip
# install required libraries
$ pip install -r requirements.txt
# verify installation (optional)
$ pip freeze
Download latest MicroPython for ESP32-C6 from this source.
Now connect the ESP to your computer via USB and flash the firmware.
# scan for device (macOS)
$ ls -l /dev/cu.*
# erase the entire flash
$ esptool.py --chip esp32c6 --port /dev/cu.usbmodem14301 erase_flash
# install firmware
$ esptool.py --chip esp32c6 --port /dev/cu.usbmodem14301 --baud 460800 write_flash -z 0x0 esp32c6-20241025-v1.24.0.bin
Note: The port name 'usbmodem14301' and/or version 'esp32c6-20241025-v1.24.0.bin' can be different for you! The version used in the example is the most current version when this tutorial was created.
Test MicroPython firmware
# run Micropython REPL via rshell
$ rshell -p /dev/cu.usbmodem14301 repl
# list installed modules
>>> help('modules')
To stop and leave the REPL, press keys `control` + `x`.
Important: Disconnect the FireBeetle now from USB cable (power off)!
The I2C cabling is quite simple! 3.3V to VCC, GND to GND, SCL to C/R and SDA to D/T. As soon as you have completed the I2C cabling, you can reconnect the ESP to your computer via USB.
Note: Make sure the sensor communication mode is on I2C!
To check whether the I2C cabling is correct and the ESP can communicate with the sensor, you can create and use the following MicroPython script.
i2c_scanner.py
from micropython import const
from machine import I2C, Pin
from usys import exit
I2C_SDA_PIN = const(19)
I2C_SCL_PIN = const(20)
I2C_FREQUENCY = const(400000)
def list_devices(i2c_devices: list) -> None:
"""
print out the list of I2C devices
:param i2c_devices: list of I2C devices
:return: None
"""
count = len(i2c_devices)
if count == 0:
print('[INFO] No I2C device found')
else:
print(f'[INFO] {count} device(s) found')
for device in i2c_devices:
print(f'[INFO] Decimal address: {device} Hex address: {hex(device)}')
if __name__ == '__main__':
if 0 < I2C_FREQUENCY >= 500000:
print(f'[ERROR] Wrong value for I2C frequency')
exit()
i2c = None
try:
i2c = I2C(0, scl=Pin(I2C_SCL_PIN), sda=Pin(I2C_SDA_PIN), freq=I2C_FREQUENCY)
except Exception as err:
print(f'[ERROR] I2C bus initialization failed: {err}')
if i2c:
print('[INFO] Scanning the I2C bus')
devices = i2c.scan()
list_devices(devices)
The MicroPython script can be easily copied to the ESP via RSHELL (as main.py) and executed.
# copy I2C scan file to ESP
$ rshell -p /dev/cu.usbmodem14301 cp i2c_scanner.py /pyboard/main.py
# start REPL
$ rshell -p /dev/cu.usbmodem14301 repl
Note: The port name 'usbmodem14301' will be different on your system!
All other folders and files are created in this step. When you're done, your local environment should look something like this.
$ tree .
├── lib
│ └── DFRobot_DF2301Q_I2C.py
├── boot.py
├── i2c_scanner.py
├── main.py
└── requirements.txt
boot.py
After switching on (power supply), the ESP first executes the file 'boot.py' and then 'main.py'. So it makes sense to create the WLAN connection via 'boot.py'. Adjust the constants (WLAN_SSID & WLAN_PASSWORD) according to your needs.
from micropython import const
from network import WLAN, STA_IF
from utime import sleep_ms
WLAN_SSID = 'YOUR WLAN SSID'
WLAN_PASSWORD = 'YOUR WLAN PASSWORD'
WLAN_CONNECT_DELAY = const(500)
WLAN_MAX_RETRIES = const(15)
def connect_to_ap() -> bool:
"""
connect as station to access point
:return: bool
"""
attempt = 0
sta = WLAN(STA_IF)
sta.active(True)
sta.connect(WLAN_SSID, WLAN_PASSWORD)
while not sta.isconnected():
sleep_ms(WLAN_CONNECT_DELAY)
attempt += 1
print(f'[INFO] {attempt}. connect to {WLAN_SSID} ... please wait')
if attempt >= WLAN_MAX_RETRIES:
return False
return True
if connect_to_ap():
print(f'[INFO] connection to {WLAN_SSID} established')
else:
print(f'[ERROR] connection to {WLAN_SSID} failed')
DFRobot_DF2301Q_I2C.py
The file 'DFRobot_DF2301Q_I2C.py' in directory 'lib', serves as a MicroPython driver for the Gravity: Offline Language Learning Voice Recognition Sensor.
from micropython import const
from machine import I2C, Pin
from utime import sleep
DF2301Q_I2C_ADDR = const(0x64)
class DFRobot_DF2301Q_I2C:
"""
MicroPython class for communication with the DF2301Q from DFRobot via I2C
"""
DF2301Q_I2C_REG_CMDID = const(0x02)
DF2301Q_I2C_REG_PLAY_CMDID = const(0x03)
DF2301Q_I2C_REG_SET_MUTE = const(0x04)
DF2301Q_I2C_REG_SET_VOLUME = const(0x05)
DF2301Q_I2C_REG_WAKE_TIME = const(0x06)
DF2301Q_I2C_8BIT_RANGE = const(0xFF)
DF2301Q_I2C_PLAY_CMDID_DURATION = const(1)
def __init__(self, sda, scl, i2c_addr=DF2301Q_I2C_ADDR, i2c_bus=0):
"""
Initialize the DF2301Q I2C communication
:param sda: I2C SDA pin
:param scl: I2C SCL pin
:param i2c_addr: I2C address
:param i2c_bus: I2C bus number
"""
self._addr = i2c_addr
try:
self._i2c = I2C(i2c_bus, sda=Pin(sda), scl=Pin(scl))
except Exception as err:
print(f'Could not initialize i2c! bus: {i2c_bus}, sda: {sda}, scl: {scl}, error: {err}')
def _write_reg(self, reg, data) -> None:
"""
Writes data to the I2C register
:param reg: register address
:param data: data to write
:return: None
"""
if isinstance(data, int):
data = [data]
try:
self._i2c.writeto_mem(self._addr, reg, bytearray(data))
except Exception as err:
print(f'Write issue: {err}')
def _read_reg(self, reg, length) -> bytes:
"""
Reads data from the I2C register
:param reg: register address
:param length: number of bytes to read
:return: bytes or 0
"""
try:
result = self._i2c.readfrom_mem(self._addr, reg, length)
except Exception as err:
print(f'Read issue: {err}')
result = [0, 0]
return result
def get_cmdid(self) -> int:
"""
Returns the current command id
:return: int
"""
val = self._read_reg(self.DF2301Q_I2C_REG_CMDID, 1)
int_val = int.from_bytes(val, "big")
return int(int_val)
def get_wake_time(self) -> int:
"""
Returns the current wake-up duration
:return: int
"""
return int(self._read_reg(self.DF2301Q_I2C_REG_WAKE_TIME, 1))
def play_by_cmdid(self, cmdid: int) -> None:
"""
Play the current command words by command id
:param cmdid: command words as integer
:return: None
"""
self._write_reg(self.DF2301Q_I2C_REG_PLAY_CMDID, int(cmdid))
sleep(self.DF2301Q_I2C_PLAY_CMDID_DURATION)
def set_wake_time(self, wake_time: int) -> None:
"""
Set the wake-up duration of the device
:param wake_time: integer between 0 and 255
:return: None
"""
wake_up_time = int(wake_time) & self.DF2301Q_I2C_8BIT_RANGE
self._write_reg(self.DF2301Q_I2C_REG_WAKE_TIME, wake_up_time)
def set_volume(self, vol: int) -> None:
"""
Set the volume of the device
:param vol: integer between 1 and 7
:return: None
"""
self._write_reg(self.DF2301Q_I2C_REG_SET_VOLUME, int(vol))
def set_mute_mode(self, mode) -> None:
"""
Set the mute mode of the device
:param mode: integer 0 for off, 1 for on
:return: None
"""
self._write_reg(self.DF2301Q_I2C_REG_SET_MUTE, int(bool(mode)))
main.py
The 'main.py' file contains the actual logic of the program. The sensor returns IDs for voice commands. These IDs are interpreted/processed and sent/published as a topic via MQTT protocol to the broker (UNIHIKER).
The following command id's are used in this example (ALLOWED_COMMAND_IDS): 103 (Turn on the light), 104 (Turn off the light), 116 (Set to red), 119 (Set to green), 121 (Set to blue) and 123 (Set to white).
Note: Change the constant values for 'MQTT_BROKER', 'MQTT_TOPIC_POWER' and 'MQTT_TOPIC_COLOR' to your needs! As user 'admin' with MQTT Explorer I found out the name very easily.
from micropython import const
from urandom import randint
from umqtt.simple import MQTTClient
from lib.DFRobot_DF2301Q_I2C import DFRobot_DF2301Q_I2C
from utime import sleep_ms
SDA_PIN = const(19)
SCL_PIN = const(20)
MQTT_BROKER = 'BROKER IP'
MQTT_PORT = 1883
MQTT_USER = 'esp32'
MQTT_PASSWORD = 'esp_test123'
MQTT_TOPIC_POWER = 'shellies/[device-id]/color/0/command'
MQTT_TOPIC_COLOR = 'shellies/[device-id]/color/0/set'
MQTT_CLIENT_ID = f'esp32-{randint(0, 1000)}'
ALLOWED_COMMAND_IDS = {
103: (MQTT_TOPIC_POWER, 'on'),
104: (MQTT_TOPIC_POWER, 'off'),
116: (MQTT_TOPIC_COLOR, '{"red": 255, "green": 0, "blue": 0}'),
119: (MQTT_TOPIC_COLOR, '{"red": 0, "green": 255, "blue": 0}'),
121: (MQTT_TOPIC_COLOR, '{"red": 0, "green": 0, "blue": 255}'),
123: (MQTT_TOPIC_COLOR, '{"red": 255, "green": 255, "blue": 255}')
}
DELAY_MS = const(300)
def publish_topic(client, topic, payload) -> None:
"""
Publish MQTT message
:param client: MQTT client instance
:param topic: Topic on which to publish the message
:param payload: Message payload to send
:return: None
"""
client.publish(topic, payload.encode() if isinstance(payload, str) else payload)
if __name__ == '__main__':
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD)
mqtt_client.connect()
sensor = DFRobot_DF2301Q_I2C(SDA_PIN, SCL_PIN)
sensor.set_volume(5)
sensor.set_mute_mode(0)
sensor.set_wake_time(20)
while True:
cmd_id = int(sensor.get_cmdid())
if cmd_id in ALLOWED_COMMAND_IDS:
mqtt_topic, mqtt_payload = ALLOWED_COMMAND_IDS[cmd_id]
print(f'[INFO] Command id: {cmd_id} -> Publishing to {mqtt_topic} with payload: {mqtt_payload}')
publish_topic(mqtt_client, mqtt_topic, mqtt_payload)
else:
print(f'[WARNING] undefined command ID: {cmd_id}')
sleep_ms(DELAY_MS)
The three files (boot.py, main.py and DFRobot_DF2301Q_I2C.py) now need to be copied to the ESP. RSHELL is used again for this.
# copy DFRobot_DF2301Q_I2C.py to ESP
$ rshell -p /dev/cu.usbmodem14301 cp -r lib/DFRobot_DF2301Q_I2C.py /pyboard/lib
# copy main.py to ESP
$ rshell -p /dev/cu.usbmodem14301 cp main.py /pyboard/
# copy boot.py to ESP
$ rshell -p /dev/cu.usbmodem14301 cp boot.py /pyboard/
Note: The port name 'usbmodem14301' will be different on your system!
To test, the REPL is started again via RSHELL. Press keys `control` + `d` to execute all files.
The following error can sometimes occur: OSError: Wifi Internal Error. In this case, check the WLAN connection data or press the key combination again: 'control' + 'd' (for soft reboot).
Now speak the sensor commands like "Hello Robot ... turn on the light".
To make this project even more powerful and personalized, you could consider the following ideas for further expansion:
Add more commands for enhanced functionality, such as brightness adjustments, dimming, or custom color combinations.
Besides controlling a single bulb, you can extend MQTT to manage multiple smart devices, such as additional bulbs, smart plugs, or sensors.
Implement Secure Remote Access (e.g., TLS/SSL) for MQTT.