[UNIHIKER+ChatGPT] Empowering Smart Home Automation with SBC (Single Board Computer) and LLM
1.Introduction
Imagine stepping through the door, you utter,”Hey, please turn on the lights and start the fan”, then the room comes alive with a soft glow and a welcoming breeze. Now,with the combination of a Single Board Computer —— UNIHIKER and ChatGPT, we can create a smart home assistant, enabling hardware control through voice interaction, turn the ordinary SBC UNIHIKER into an AI SBC.
We know the exceptional natural language understanding and interactive capabilities demonstrated by LLM large language models like ChatGPT. Leveraging ChatGPT's function calling feature facilitates seamless integration with hardware control. UNIHIKER , distinguished as a leading Single Board Computer (SBCs), excels in its ability to effortlessly incorporate AI applications. PinPong Library is a Python library for controlling open-source hardware development board like UNIHIKER. With the PinPong Library, we can use Python code to control various common open-source hardware devices.When combining UNIHIKER with ChatGPT and utilizing the PinPong Library, users can implement well-defined functions, tapping into the natural language power to effortlessly command and control diverse hardware devices.
This post aims to inspire by showcasing the potential of leveraging large language models to control hardware. The demonstration involves simulating the sound, light, and movement of smart home devices, such as on-board buzzers, LED light rings, and fans, as illustrated in the diagram below.
2. Function Calling-let ChatGPT connect to external world
In the June 2023, OpenAI introduced the function calling feature for Chat Completions models. While Chat Completions already allowed for dialogue with ChatGPT through API calls, the addition of Function Calling takes it a step further. Now, the model can not only answer knowledge-based questions based on its pre-training data but also integrate with an external function library. It can dynamically search the function library based on user queries, execute external functions, and provide the results according to the user's specific needs.
The introduction of function calling has profoundly transformed the way developers interact with the model. This feature allows developers to describe functions to the AI model, which can intelligently determine and output a JSON object containing the parameters for calling these functions. In simpler terms, the function calling capability in large models enables us to invoke functions in natural language, facilitating actions like having the GPT model make calls to the Google Mail API (Gmail API), automatically read emails, and respond accordingly. The current models that support function calling are summarized in the following diagram.
With the function calling feature, users can effortlessly control hardware through simple statements without delving into the intricacies of underlying code logic.
For example, suppose I have defined a function called "play_music" that controls the onboard buzzer to play tones, and I have informed the model about the functionality of this function. A conversation scenario could unfold as follows:
The entire implementation logic is as shown in the diagram.
Zhipu AI has outlined the procedure for utilizing models to invoke tools in the official documentation. The implementation logic for both ChatGLM3-6b and GPT3.5 is identical, allowing us to draw parallels. Therefore, we can use the same reference.
Next, we need to create a tool list, providing gpt3.5 with information about these function tools, including their names, specific functionalities, required parameters for calling the functions, specific types of parameters, and detailed information about each parameter. Following the official example is recommended to achieve optimal performance."
tools = [
{
"name": "track",
"description": "Track real-time prices of a specified stock",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"description": "Stock code to be tracked"
}
},
"required": ['symbol']
}
},
{
"name": "text-to-speech",
"description": "Convert text to speech",
"parameters": {
"type": "object",
"properties": {
"text": {
"description": "Text to be converted into speech"
},
"voice": {
"description": "Voice type to be used (male, female, etc.)"
},
"speed": {
"description": "Speech speed (fast, medium, slow, etc.)"
}
},
"required": ['text']
}
}
]
system_info = {"role": "system", "content": "Answer the following questions as best as you can. You have access to the following tools:", "tools": tools}
Lastly, parse the user's input in JSON format and pass it to the model. The model, based on the user's input, implements the invocation of the corresponding tool.
3. Preparation
1.Apply for an OpenAI API key
2.Apply for Microsoft Speech Services API.
3.API calls require internet connection. Therefore, in this project, we need to connect the UNIHIKER to the network.
(1) Open a browser and enter '10.1.2.3' to access the UNIHIKER page.
(2) Select 'Network Settings,' choose WIFI, enter the password, noting that the UNIHIKER only supports 2.4GHz WIFI hotspots. Click 'Connect,' and a successful connection will be displayed as 'Connected Successfully.
4. Function Definitionand Tool Description
Next, I will write code following the implementation process of function calling. I will follow the steps of "Writing hardware control function tools – Defining a tool list describing detailed information about function tools – Parsing user input and passing it to the model."
4.1 Buzzer
4.1.1 Example Program of onboard buzzer in the Pinpong Library
For fully implement the function-calling feature to control the on-board buzzer of the UNIHIKER let's take a look at how the example code for controlling the buzzer to play tones is written in the Pinpong library. The following is an example program related to the buzzer in the Pinpong library:
import time
from pinpong.board import Board, Pin
from pinpong.extension.UNIHIKER import *
Board().begin() # Initialize
# Music DADADADUM ENTERTAINER PRELUDE ODE NYAN RINGTONE FUNK BLUES BIRTHDAY WEDDING FUNERAL PUNCHLINE
# Music BADDY CHASE BA_DING WAWAWAWAA JUMP_UP JUMP_DOWN POWER_UP POWER_DOWN
# Playback modes: Once (play once) Forever (play continuously) OnceInBackground (play once in the background) ForeverInBackground (play continuously in the background)
buzzer.play(buzzer.DADADADUM, buzzer.Once) # Play music once
while True:
time.sleep(1) # Wait for 1 second, maintaining the state
4.1.2 Define the function 'play_music' to control the onboard buzzer
Based on the example program in the Pinpong library for controlling the onboard buzzer of the UNIHIKER, we can write a 'play_music' function to control the buzzer to play built-in music. It takes a parameter 'music' to specify the music name to be played. An 'if music in [...]' statement checks whether the passed music name is in the predefined list of music. If it is, the 'buzzer.play' function is used to play the music; otherwise, it notifies that the specified music name was not found, and playback cannot proceed.
def play_music(music):
if music in ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY", "WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP", "JUMP_DOWN", "POWER_UP", "POWER_DOWN"]:
buzzer.play(getattr(buzzer, music), buzzer.OnceInBackground)
print(f"Function called, playing {music}")
else:
print("Sorry, I don't have that music.")
4.1.3 Define the list of tools
Following the example format, a detailed description of the 'play_music' function, including its name, description, and parameters.
tools = [
{
"type": "function",
"function": {
"name": "play_music",
"description": "Plays music in the house when the user is bored or needs relaxation. The music list includes 'DADADADUM', 'ENTERTAINER', 'PRELUDE', 'ODE', 'NYAN', 'RINGTONE', 'FUNK', 'BLUES', 'BIRTHDAY', 'WEDDING', 'FUNERAL', 'PUNCHLINE', 'BADDY', 'CHASE', 'BA_DING', 'WAWAWAWAA', 'JUMP_UP', 'JUMP_DOWN', 'POWER_UP', 'POWER_DOWN'.",
"parameters": {
"type": "object",
"properties": {
"music": {
"type": "string",
"enum": ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY", "WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP", "JUMP_DOWN", "POWER_UP", "POWER_DOWN"],
"description": "The music to play in the house."
},
},
"required": ["music"],
},
}
}
]
Next, define a function that sends a chat completion request to gpt3.5.
def chat_completion_request(messages, tools=None, tool_choice=None):
"""
Sends a Chat Completion request to the OpenAI GPT-3.5-turbo model and returns the model's response.
Args:
- messages: A list containing conversation history, where each conversation history is a dictionary with "role" and "content".
- tools: (Optional) A list containing tool descriptions, where each tool is a dictionary.
- tool_choice: (Optional) Tool choice specifying which tool to use.
Returns:
- response: Model response after sending the request.
"""
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + OPENAI_API_KEY,
}
# Build the JSON data for the request
json_data = {"model": "gpt-3.5-turbo", "messages": messages}
if tools is not None:
json_data.update({"tools": tools})
if tool_choice is not None:
json_data.update({"tool_choice": tool_choice})
try:
# Send a POST request to the OpenAI API
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
except Exception as e:
# Handle exceptions
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return e
Next, in the main loop, retrieve user input and the model's response. Unlike the typical printing of the model's response, an additional step is to check whether the model's response contains relevant information for tool invocation (tool_calls). If present, extract the function and parameter information, then call the corresponding function to execute the operation.
while True:
# User input obtained through the command line
user_input = input('Please enter:')
# Add user input to the conversation history, with user role
conversation.append({"role": "user", "content": user_input})
# Call chat_completion_request function, send conversation history to the model, and get model response
response = chat_completion_request(conversation, tools)
print("Model Response: ", response.json())
# Extract the assistant's response content
assistant_response = response.json()["choices"][0]["message"]["content"]
# If the assistant's response is empty, set it to the default value "Performing operation"
if assistant_response is None:
assistant_response = "Performing operation"
# Print the assistant's response
print(f"Assistant: {assistant_response}")
# Check if the model's response contains tool_calls (Function Calling operations)
if 'tool_calls' in response.json()["choices"][0]["message"]:
# Extract function and arguments information from the tool call
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
arguments = json.loads(function_call["arguments"])
# Call the play_music function to execute music playback based on the model's instruction
play_music(arguments["music"])
# Add the assistant's response to the conversation history, with assistant role
conversation.append({"role": "assistant", "content": assistant_response})
4.1.4 The complete program for controlling the on-board buzzer
import requests
import json
from pinpong.board import Board, Pin
from pinpong.extension.UNIHIKER import *
import openai
Board().begin() # Initialization
OPENAI_API_KEY=' ' # OpenAI API Key
def chat_completion_request(messages, tools=None, tool_choice=None):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + OPENAI_API_KEY,
}
json_data = {"model": "gpt-3.5-turbo", "messages": messages}
if tools is not None:
json_data.update({"tools": tools})
if tool_choice is not None:
json_data.update({"tool_choice": tool_choice})
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
except Exception as e:
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return e
tools = [
{
"type": "function",
"function": {
"name": "play_music",
"description": "Plays music in the house when the user is bored or needs relaxation. The music list includes 'DADADADUM', 'ENTERTAINER', 'PRELUDE', 'ODE', 'NYAN', 'RINGTONE', 'FUNK', 'BLUES', 'BIRTHDAY', 'WEDDING', 'FUNERAL', 'PUNCHLINE', 'BADDY', 'CHASE', 'BA_DING', 'WAWAWAWAA', 'JUMP_UP', 'JUMP_DOWN', 'POWER_UP', 'POWER_DOWN'.",
"parameters": {
"type": "object",
"properties": {
"music": {
"type": "string",
"enum": ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY", "WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP", "JUMP_DOWN", "POWER_UP", "POWER_DOWN"],
"description": "The music to play in the house."
},
},
"required": ["music"],
},
}
}
]
def play_music(music):
if music in ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY", "WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP", "JUMP_DOWN", "POWER_UP", "POWER_DOWN"]:
buzzer.play(getattr(buzzer, music), buzzer.OnceInBackground)
print(f"Function called, playing {music}")
else:
print("Sorry, I don't have that music.")
conversation = [{"role": "system", "content": "You are a smart home assistant that can play music for relaxation when the user is bored or needs it. You can suggest playing music when the user seems bored or tired."}]
while True:
user_input = input('Please enter: ')
conversation.append({"role": "user", "content": user_input})
response = chat_completion_request(conversation, tools)
print("Model Response: ", response.json())
assistant_response = response.json()["choices"][0]["message"]["content"]
if assistant_response is None:
assistant_response = "Performing operation"
print(f"Assistant: {assistant_response}")
if 'tool_calls' in response.json()["choices"][0]["message"]:
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
arguments = json.loads(function_call["arguments"])
play_music(arguments["music"])
conversation.append({"role": "assistant", "content": assistant_response})
4.2 Fan
In daily life, when using a fan, we can change the fan speed by adjusting the gear. Here, we can achieve this function by simulating output using PWM.
4.2.1 The example program in the Pinpong library for controlling a fan with PWM output
We can refer to the official documentation for information about the pin numbers on the UNIHIKER that support PWM Output: PWM wiki.
According to the PWM documentation, here is the code for controlling the fan using PWM:
from pinpong.board import Board, Pin
Board(" ").begin() # Initialize the Pinpong board with a specific parameter (possibly a board name or ID)
fan = Pin(Pin.P22, Pin.PWM) # Initialize the fan pin location for PWM control
fan.write_analog(800) # PWM output to control the fan speed; the range is 0 to 1023, where 0 keeps the fan off, and 1023 sets the maximum speed.
4.2.2 Define the function to control the fan.
Through the example program, we can know that by using function calling, gpt3.5 can control the fan. First, we need to inform gpt3.5 about the initialization of the fan's pin number. Then, we need to instruct gpt3.5 on the operations for the fan, including turning on the fan, increasing speed, decreasing speed, and turning off the fan (controlled through PWM output to adjust the speed
We will continue by writing functions, completing the description of the tool, and generating the code for tool invocation. Let's start with the code for fan initialization. From the user's perspective, when using a fan, they would connect the fan to the pin number on the UNIHIKER that supports PWM using a pin wire. Then, they inform the model about the pin location. To achieve this, we need to define a list of pins that support PWM.
Define a global variable 'fan' to store the fan's pin information.
pin_map = {
21: Pin.P21,
22: Pin.P22,
23: Pin.P23,
}
Next, define the function 'fan_action' to control the fan. Use a branching structure to determine the specified fan type and complete the initialization and control of the fan based on user input. 'fan_action' has three parameters: 'action' specifies the action to be executed and is used for controlling the fan's state through branching structures. 'pn_number' is used to determine the pin number to which the fan is connected during initialization. 'user_input' is used to determine the user's control command for the fan.
def fan_action(action, pin_number=None, user_input=None):
"""
Control the behavior of the fan.
Parameters:
- action: Specifies the action to be performed, can be "initialize" or other operations.
- pin_number: (Required only when action is "initialize") Pin number to which the fan is connected.
- user_input: (Required only for other operations) User input to determine the specific operation.
Returns:
No return value. Informs the user of the result of the operation through print statements.
Detailed Explanation:
1. Use the global variable `fan` to store information about the fan pin.
2. If the action is "initialize," perform the initialization operation.
a. Check if the provided pin number is in the pin_map; if not, print an error message.
b. Create a fan object using the Pin class and set it to PWM mode.
c. Print the successful initialization message.
3. If the action is not "initialize," execute the corresponding operation.
a. Check if the fan has been initialized; if not, print an error message.
b. Execute the operation based on user input:
- "turn on": Turn on the fan, set PWM to 800.
- "turn off": Turn off the fan, set PWM to 0.
- "increase": Increase fan speed, set PWM to 1023.
- "decrease": Decrease fan speed, set PWM to 512.
c. Print the successful execution message.
Usage Example:
```
# Initialize the fan, connected to pin number 5
fan_action("initialize", pin_number=5)
# Perform an operation, control the fan based on user input
fan_action("operate", user_input="turn on")
```
Note: Make sure to set the global variable pin_map with the correct pin mappings before using.
"""
global fan
# If it's an initialization operation
if action == "initialize":
if pin_number not in pin_map:
print(f"Invalid pin number: {pin_number}")
return
# Create a fan object using the Pin class and set it to PWM mode
fan = Pin(pin_map[pin_number], Pin.PWM)
print(f"Fan has been initialized, connected to pin {pin_number}")
else:
# If the fan is not initialized
if fan is None:
print("Fan has not been initialized")
return
# Execute the corresponding operation based on user input
if "turn on" in user_input:
fan.write_analog(800)
print("Fan has been turned on")
elif "turn off" in user_input:
fan.write_analog(0)
print("Fan has been turned off")
elif "increase" in user_input:
fan.write_analog(1023)
print("Fan speed has been increased")
elif "decrease" in user_input:
fan.write_analog(512)
print("Fan speed has been decreased")
4.2.3 Add a tool description for the fan function.
Provide detailed explanations of the function and the roles of each parameter.
tools = [
{
"type": "function",
"function": {
"name": "fan_action",
"description": "This function initializes or controls the fan based on the given action and user's command. It can initialize the fan, turn on the fan, turn off the fan, increase the fan speed, or decrease the fan speed. It will print an error message if the fan has not been initialized.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "turn_on", "turn_off", "increase_speed", "decrease_speed"],
"description": "The action to perform on the fan.",
},
"pin_number": {
"type": "integer",
"description": "The pin number where the fan is connected. Required for 'initialize' action.",
},
"user_input": {
"type": "string",
"description": "The user's command to control the fan. Required for 'turn on', 'turn off', 'increase speed', 'decrease speed' actions.",
},
},
"required": ["action"],
},
},
},
]
4.2.4 The complete program for controlling the fan using the function
# Import required libraries and modules
import requests
import json
from pinpong.board import Board, Pin
import openai
import time
# Initialize Pinpong Board
Board(" ").begin()
# Set your OpenAI API key
OPENAI_API_KEY = ' '
# Initialize fan variable
fan = None
# Pin mapping to Pinpong Pins
pin_map = {
21: Pin.P21,
22: Pin.P22,
23: Pin.P23,
24: Pin.P24,
}
# Function to make a chat completion request to the OpenAI API
def chat_completion_request(messages, tools=None, tool_choice=None):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + OPENAI_API_KEY,
}
json_data = {"model": "gpt-3.5-turbo-1106", "messages": messages}
if tools is not None:
json_data.update({"tools": tools})
if tool_choice is not None:
json_data.update({"tool_choice": tool_choice})
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
except Exception as e:
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return e
# Define fan control tool
tools = [
{
"type": "function",
"function": {
"name": "fan_action",
"description": "This function initializes or controls the fan based on the given action and user command. It can initialize the fan, turn it on, turn it off, increase fan speed, or decrease fan speed. If the fan is not initialized, an error message will be printed.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "turn_on", "turn_off", "increase_speed", "decrease_speed"],
"description": "The action to be performed on the fan.",
},
"pin_number": {
"type": "integer",
"description": "The pin number to which the fan is connected. Required for the 'initialize' action.",
},
"user_input": {
"type": "string",
"description": "User command to control the fan. Required for 'turn on', 'turn off', 'increase speed', 'decrease speed' actions.",
},
},
"required": ["action"],
},
},
},
]
# Function to perform fan operations based on user commands
def fan_action(action, pin_number=None, user_input=None):
global fan
if action == "initialize":
if pin_number not in pin_map:
print(f"Invalid pin number: {pin_number}")
return
fan = Pin(pin_map[pin_number], Pin.PWM)
print(f"Fan has been initialized, connected to pin {pin_number}")
else:
if fan is None:
print("Fan has not been initialized")
return
if "turn on" in user_input:
fan.write_analog(800)
print("Fan has been turned on")
elif "turn off" in user_input:
fan.write_analog(0)
print("Fan has been turned off")
elif "increase" in user_input:
fan.write_analog(1023)
print("Fan speed has been increased")
elif "decrease" in user_input:
fan.write_analog(512)
print("Fan speed has been decreased")
# System-initiated conversation message
conversation = [
{"role": "system", "content": "You are a helpful assistant that can control a fan."},
]
# Main loop
while True:
user_input = input('Enter: ')
conversation.append({"role": "user", "content": user_input})
# Make a chat request to the OpenAI API
response = chat_completion_request(conversation, tools)
print("Model Response: ", response.json())
# Process the model's response and execute the requested function
if 'tool_calls' in response.json()["choices"][0]["message"]:
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
print(f"Model called function: {function_call['name']}")
if function_call["name"] == "fan_action":
arguments = json.loads(function_call["arguments"])
fan_action(arguments["action"], arguments.get("pin_number"), arguments.get("user_input"))
# Display the assistant's response
assistant_response = response.json()["choices"][0]["message"]["content"]
if assistant_response is None:
assistant_response = "Performing operation"
print(f"Assistant: {assistant_response}")
# Append the assistant's response to the conversation
conversation.append({"role": "assistant", "content": assistant_response})
4.3 LED Light Ring
4.3.1 The example program in the Pinpong library for controlling an LED ring
From the example program in the Pinpong library, it is evident that the initialization of an LED strip requires two parameters: (1) the pin location of the LED Light Ring and (2) the number of LED beads. The color setting for each LED is achieved by modifying the RGB values.
# -*- coding: utf-8 -*-
import time
from pinpong.board import Board, Pin, NeoPixel
# Define NeoPixel settings
NEOPIXEL_PIN = Pin.D7 # Pin number for the NeoPixel
PIXELS_NUM = 4 # Number of LEDs in the NeoPixel strip
# Initialize the NeoPixel strip
np = NeoPixel(Pin(NEOPIXEL_PIN), PIXELS_NUM)
while True:
# Set RGB brightness for each LED
np[0] = (0, 255, 0) # Set RGB brightness for the first LED
np[1] = (255, 0, 0) # Set RGB brightness for the second LED
np[2] = (0, 0, 255) # Set RGB brightness for the third LED
np[3] = (255, 0, 255) # Set RGB brightness for the fourth LED
# Pause for 1 second
time.sleep(1)
4.3.2 Define the function to light up the LED ring
When defining the function to light up the LED Light Ring, let's consider how users would use the LED Light Ring. After connecting the LED Light ring to the pins on the UNIHIKER , users would first inform the model about the pin number of the Light Ring and the number of LED beads. Only then can the correct initialization of the LED Light Ring be achieved. Therefore, we need to define two global variables for the LED Light Ring's initialization.
led = None # Used for initializing the LED strip object
num_beads_global = None # Used for initializing the number of LED beads
After the initialization of the LED Light Ring, users will inform the model about the number and color of LED beads they want to light up, such as 'I want to light up 4 beads in red.' In this case, we need to convert the color 'red' into its corresponding RGB values. To achieve this functionality, we can define a color dictionary that maps color names to their RGB values.
# Create a color dictionary
COLOR_DICT = {
"red": [255, 0, 0],
"green": [0, 255, 0],
"blue": [0, 0, 255],
"white": [255, 255, 255],
"black": [0, 0, 0],
"yellow": [255, 255, 0],
"pink": [255, 105, 180],
"purple": [128, 0, 128]
}
Next, define the function 'led_light_action' for initializing and controlling the LED light ring, supporting two operations: 'initialize' and 'lightup.' In the 'initialize' operation, it initializes the LED Light Ring based on the given pin number and the number of LED beads. In the 'lightup' operation, it lights up the specified number of LED beadss and sets their color. The function also performs error checks to ensure that necessary parameters are provided and outputs corresponding error messages in case of errors.
def led_light_action(action, pin_number=None, num_lights=None, color=None):
global led
global num_beads_global
# If the action is initialization
if action == "initialize":
# Check if parameters are complete
if pin_number is None or num_lights is None:
print("Error: 'pin_number' and 'num_lights' are required in the 'initialize' action.")
return
# Initialize the LED lights
led = NeoPixel(Pin(pin_number), num_lights)
num_beads_global = num_lights
print(f"LED lights have been initialized with {num_lights} beads on pin {pin_number}.")
# If the action is lighting up
elif action == "lightup":
# Check if LED is initialized
if led is None:
print("Error: LED lights have not been initialized.")
return
# Check if parameters are complete
if num_lights is None or color is None:
print("Error: 'num_lights' and 'color' are required in the 'lightup' action.")
return
# Check if 'num_lights' is a positive integer and not greater than the global number of beads
if not isinstance(num_lights, int) or num_lights <= 0 or num_lights > num_beads_global:
print("Error: 'num_lights' must be a positive integer and not greater than the number of beads.")
return
# Get the RGB values corresponding to the color
color_rgb = COLOR_DICT.get(color.lower())
if color_rgb is None:
print(f"Error: Unknown color '{color}'")
return
# Light up the specified number of beads one by one
for i in range(num_lights):
led[i] = tuple(color_rgb)
led.write(i, color_rgb[0], color_rgb[1], color_rgb[2])
print(f"Lights up {num_lights} beads with the color {color}.")
# If the action is unknown
else:
print(f"Error: Unknown action '{action}'")
4.3.3 Tool description for the function to light up the LED Light Ring
tools = [
{
"type": "function",
"function": {
"name": "led_light_action",
"description": "This function performs actions on a LED light. It can initialize the LED light on a specific pin with a certain number of beads, or light up a certain number of beads in a specific color.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "lightup"],
"description": "The action to perform. 'initialize' will set up the LED light on a specific pin with a certain number of beads. 'lightup' will light up a certain number of beads in a specific color."
},
"pin_number": {
"type": "integer",
"description": "The pin number where the LED light is connected. This is required when the 'action' is 'initialize'."
},
"num_lights": {
"type": "integer",
"description": "The number of beads to light up or initialize. This is required when the 'action' is 'initialize' or 'lightup'."
},
"color": {
"type": "string",
"description": "The color to use when lighting up the beads. This is required when the 'action' is 'lightup'."
},
},
"required": ["action"],
},
},
}
]
4.3.4 The Complete Program for lighting up the LED Light Ring
import requests
import json
from pinpong.board import Board, Pin, NeoPixel
Board().begin() # Initialize
OPENAI_API_KEY = ' ' # OpenAI API Key
# Create a color dictionary
COLOR_DICT = {
"red": [255, 0, 0],
"green": [0, 255, 0],
"blue": [0, 0, 255],
"white": [255, 255, 255],
"black": [0, 0, 0],
"yellow": [255, 255, 0],
"pink": [255, 105, 180],
"purple": [128, 0, 128]
}
led = None
num_beads_global = None
def chat_completion_request(messages, tools=None, tool_choice=None):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + OPENAI_API_KEY,
}
json_data = {"model": "gpt-3.5-turbo", "messages": messages}
if tools is not None:
json_data.update({"tools": tools})
if tool_choice is not None:
json_data.update({"tool_choice": tool_choice})
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
except Exception as e:
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return None
def led_light_action(action, pin_number=None, num_lights=None, color=None):
global led
global num_beads_global
if action == "initialize":
if pin_number is None or num_lights is None:
print("Error: 'pin_number' and 'num_lights' are required for 'initialize' action.")
return
led = NeoPixel(Pin(pin_number), num_lights)
num_beads_global = num_lights
print(f"LED light initialized on pin {pin_number} with {num_lights} beads.")
elif action == "lightup":
if led is None:
print("Error: LED light has not been initialized.")
return
if num_lights is None or color is None:
print("Error: 'num_lights' and 'color' are required for 'lightup' action.")
return
if not isinstance(num_lights, int) or num_lights <= 0 or num_lights > num_beads_global:
print("Error: 'num_lights' must be a positive integer and not greater than the number of beads.")
return
color_rgb = COLOR_DICT.get(color.lower())
if color_rgb is None:
print(f"Error: Unknown color '{color}'")
return
for i in range(num_lights):
led[i] = tuple(color_rgb)
led.write(i, color_rgb[0], color_rgb[1], color_rgb[2])
print(f"Lit up {num_lights} beads with color {color}.")
else:
print(f"Error: Unknown action '{action}'")
tools = [
{
"type": "function",
"function": {
"name": "led_light_action",
"description": "This function performs actions on a LED light. It can initialize the LED light on a specific pin with a certain number of beads or light up a certain number of beads in a specific color.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "lightup"],
"description": "The action to perform. 'initialize' will set up the LED light on a specific pin with a certain number of beads. 'lightup' will light up a certain number of beads in a specific color."
},
"pin_number": {
"type": "integer",
"description": "The pin number where the LED light is connected. This is required when the 'action' is 'initialize'."
},
"num_lights": {
"type": "integer",
"description": "The number of beads to light up or initialize. This is required when the 'action' is 'initialize' or 'lightup'."
},
"color": {
"type": "string",
"description": "The color to use when lighting up the beads. This is required when the 'action' is 'lightup'."
},
},
"required": ["action"],
},
},
]
]
messages = [
{"role": "system", "content": "You are a helpful assistant."},
]
while True:
user_input = input("User: ")
messages.append({"role": "user", "content": user_input})
response = chat_completion_request(messages, tools)
if not isinstance(response, requests.Response):
print("Error: Failed to generate ChatCompletion response.")
break
print("Model JSON Response: ", response.json()) # Print the model's JSON response
model_response = response.json()["choices"][0]["message"]["content"]
print("Model: ", model_response)
#messages.append({"role": "assistant", "content": model_response}) # Add the model's response to the message history
if "tool_calls" in response.json()["choices"][0]["message"]:
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
if function_call["name"] == "led_light_action":
arguments = json.loads(function_call["arguments"])
led_light_action(arguments["action"], arguments.get("pin_number"), arguments.get("num_lights"), arguments.get("color"))
4.4 Adding TTS and STT
After implementing the core functionality in the code, the next step is to add interaction and design the user interface. Utilize Microsoft's speech services to convert user speech to text and model responses from text to speech.
import azure.cognitiveservices.speech as speechsdk
from azure.cognitiveservices.speech import SpeechConfig, SpeechRecognizer, SpeechSynthesizer, AudioConfig
# Initialize speech recognition and text-to-speech synthesis services
speech_config = SpeechConfig(subscription=" ", region="eastus") # Fill in your Microsoft Speech API subscription key
recognizer = SpeechRecognizer(speech_config=speech_config)
synthesizer = SpeechSynthesizer(speech_config=speech_config)
import speechsdk
# Speech-to-text function
def recognize_from_microphone():
# Configure audio parameters
audio_config = speechsdk.AudioConfig(use_default_microphone=True)
# Create a speech recognizer object
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)
# Asynchronously perform a single speech recognition
result = speech_recognizer.recognize_once_async().get()
# Process based on the recognition result
if result.reason == speechsdk.ResultReason.RecognizedSpeech:
print("Recognized text: ", result.text) # Print the recognition result to the console
return result.text
elif result.reason == speechsdk.ResultReason.NoMatch:
print("Unable to recognize any speech: {}".format(result.no_match_details))
elif result.reason == speechsdk.ResultReason.Canceled:
# Handle the cancellation case
cancellation_details = result.cancellation_details
print("Speech recognition canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == speechsdk.CancellationReason.Error:
print("Error details: {}".format(cancellation_details.error_details))
print("Have you set the speech resource key and region?")
# Text-to-speech synthesis function
def tts(text):
# Check if the input text is empty
if text is None:
print("No text to synthesize.")
return
# Configure text-to-speech parameters
speech_config.set_property(property_id=speechsdk.PropertyId.SpeechServiceResponse_RequestSentenceBoundary, value='true')
# Configure audio output parameters
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True)
# Create a speech synthesizer object
speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)
# Asynchronously perform text-to-speech synthesis
speech_synthesis_result = speech_synthesizer.speak_text_async(text).get()
Next, we proceed with interface design by adding images and prompts for recording.
from UNIHIKER import Audio
from UNIHIKER import GUI
# Initialize the Audio class
audio = Audio()
# Initialize the GUI
u_gui = GUI()
u_gui.draw_text(x=120, y=10, text="Start Home Assistant", origin='top', color="blue", font_size=15)
u_gui.draw_image(image="mic.jpg", x=120, y=230, w=180, h=50, origin='center', onclick=on_mic_click)
u_gui.draw_image(image="Mind.jpg", x=120, y=120, w=350, h=150, origin='center')
recording_status = u_gui.draw_text(x=120, y=300, text="Press to start recording", origin='center', color="blue", font_size=15)
You can use a variable named flag to record the audio state. If the flag's value is 1, it indicates that the user has clicked the microphone button, and the program will switch to the recording state, calling the 'recognize_from_microphone' function to obtain the user's speech input. If the flag's value is 2, it indicates that the user's speech input has been processed, and the program will call the model to generate a response and execute the corresponding tool calls (if any). If the flag's value is 0, it indicates that the program has processed the user's input and is ready to accept the next recording.
flag = 0
def on_mic_click():
global flag
flag = 1
def main():
global flag
global user_input # Declare user_input as a global variable
user_input = ''
u_gui.draw_text(x=120, y=10, text="Start Home Assistant", origin='top', color="blue", font_size=15)
u_gui.draw_image(image="mic.jpg", x=120, y=230, w=180, h=50, origin='center', onclick=on_mic_click)
u_gui.draw_image(image="Mind.jpg", x=120, y=120, w=350, h=150, origin='center')
recording_status = u_gui.draw_text(x=120, y=300, text="Press to start recording", origin='center', color="blue", font_size=15)
while True:
# Check the value of the flag and update the UI
if flag == 1:
recording_status.config(text="listening...")
user_input = recognize_from_microphone() # Get user input here
time.sleep(3)
flag = 2
# If the user has finished speaking, set the flag value to 2 and send the user input to the model
if user_input != "" and flag == 2:
recording_status.config(text="thinking...")
conversation.append({"role": "user", "content": user_input})
response = chat_completion_request(conversation, tools)
print("Model Response: ", response.json())
# Convert the model's reply to speech and play it
try:
assistant_response = response.json()["choices"][0]["message"]["content"]
except KeyError:
assistant_response = "I have successfully operated for you"
if assistant_response is None:
assistant_response = "I have successfully operated for you"
print(f"Assistant: {assistant_response}")
#tts(assistant_response)
# Execute tool calls from the model's reply
if 'tool_calls' in response.json()["choices"][0]["message"]:
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
arguments = json.loads(function_call["arguments"])
if function_call["name"] == "play_music":
play_music(arguments["music"])
elif function_call["name"] == "fan_action":
fan_action(arguments["action"], arguments.get("pin_number"), arguments.get("user_input"))
elif function_call["name"] == "led_light_action":
led_light_action(arguments["action"], arguments.get("pin_number"), arguments.get("num_lights"), arguments.get("color"))
conversation.append({"role": "assistant", "content": assistant_response})
flag = 0 # Set the flag value to 1, indicating that the user's input has been processed
recording_status.config(text="Press to start recording")
if __name__ == "__main__":
main()
5. Complete Code
import requests
import json
import openai
import time
import azure.cognitiveservices.speech as speechsdk
from azure.cognitiveservices.speech import SpeechConfig, SpeechRecognizer, SpeechSynthesizer, AudioConfig
import os
from pinpong.extension.UNIHIKER import *
from UNIHIKER import Audio
from UNIHIKER import GUI
from pinpong.board import Board, Pin, NeoPixel
Board(" ").begin()
# Initialize the Audio class
audio = Audio()
# Initialize GUI
u_gui = GUI()
flag = 0
# Initialize speech recognition and speech synthesis services
speech_config = SpeechConfig(subscription=" ", region="eastus")
recognizer = SpeechRecognizer(speech_config=speech_config)
synthesizer = SpeechSynthesizer(speech_config=speech_config)
# OpenAI API key
OPENAI_API_KEY = ' '
global fan, led, num_beads_global
fan, led, num_beads_global = None, None, None
pin_map = {
21: Pin.P21,
22: Pin.P22,
23: Pin.P23,
24: Pin.P24,
}
# Create a color dictionary
COLOR_DICT = {
"red": [255, 0, 0],
"green": [0, 255, 0],
"blue": [0, 0, 255],
"white": [255, 255, 255],
"black": [0, 0, 0],
"yellow": [255, 255, 0],
"pink": [255, 105, 180],
"purple": [128, 0, 128]
}
def recognize_from_microphone():
audio_config = AudioConfig(use_default_microphone=True)
speech_recognizer = SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)
result = speech_recognizer.recognize_once_async().get()
if result.reason == speechsdk.ResultReason.RecognizedSpeech:
print("Recognized text: ", result.text) # This line prints the recognition result in the terminal
return result.text
elif result.reason == speechsdk.ResultReason.NoMatch:
print("No speech could be recognized: {}".format(result.no_match_details))
elif result.reason == speechsdk.ResultReason.Canceled:
cancellation_details = result.cancellation_details
print("Speech Recognition canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == speechsdk.CancellationReason.Error:
print("Error details: {}".format(cancellation_details.error_details))
print("Did you set the speech resource key and region values?")
def tts(text):
if text is None:
print("No text to synthesize.")
return
speech_config.set_property(property_id=speechsdk.PropertyId.SpeechServiceResponse_RequestSentenceBoundary, value='true')
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True)
speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)
speech_synthesis_result = speech_synthesizer.speak_text_async(text).get()
def chat_completion_request(messages, tools=None, tool_choice=None):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + OPENAI_API_KEY,
}
json_data = {"model": "gpt-3.5-turbo-1106", "messages": messages}
if tools is not None:
json_data.update({"tools": tools})
if tool_choice is not None:
json_data.update({"tool_choice": tool_choice})
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
except Exception as e:
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return e
def play_music(music):
if music in ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY",
"WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP", "JUMP_DOWN",
"POWER_UP", "POWER_DOWN"]:
buzzer.play(getattr(buzzer, music), buzzer.OnceInBackground)
print(f"Called, playing {music}")
else:
print("Sorry, I don't have that music.")
def fan_action(action, pin_number=None, user_input=None):
global fan
if action == "initialize":
if pin_number not in pin_map:
print(f"Invalid pin number: {pin_number}")
return
fan = Pin(pin_map[pin_number], Pin.PWM)
print(f"Fan initialized, connected to pin {pin_number}")
assistant_response = (f"Fan initialized, connected to pin {pin_number}")
else:
if fan is None:
print("Okay, the fan has not been initialized")
return
if "turn on" in user_input:
fan.write_analog(800)
print("Okay, the fan is now on")
assistant_response = "Okay, the fan is now on"
elif "turn off" in user_input:
fan.write_analog(0)
print("Okay, the fan is now off")
assistant_response = "Okay, the fan is now off"
elif "increase" in user_input:
fan.write_analog(1023)
print("Okay, the fan speed has increased")
assistant_response = "Okay, the fan speed has increased"
elif "decrease" in user_input:
fan.write_analog(512)
print("Okay, the fan speed has decreased")
assistant_response = "Okay, the fan speed has decreased"
def led_light_action(action, pin_number=None, num_lights=None, color=None):
global led
global num_beads_global
if action == "initialize":
if pin_number is None or num_lights is None:
print("Error, 'initialize' action for LED light requires 'pin_number' and 'num_lights'.")
return
led = NeoPixel(Pin(pin_number), num_lights)
num_beads_global = num_lights
print(f"Okay, now I know the LED light is initialized on pin {pin_number} with {num_lights} beads.")
assistant_response = (
f"Okay, now I know the LED light is initialized on pin {pin_number} with {num_lights} beads.")
elif action == "lightup":
if led is None:
print("Error: LED light has not been initialized.")
return
if num_lights is None or color is None:
print("'lightup' action requires 'num_lights' and 'color'. Tell me the complete details.")
return
if not isinstance(num_lights, int) or num_lights <= 0 or num_lights > num_beads_global:
print("'num_lights' must be a positive integer and cannot exceed the initialized number of beads.")
return
color_rgb = COLOR_DICT.get(color.lower())
if color_rgb is None:
print(f"Error: Unknown color '{color}'. Please add this color to the color dictionary.")
return
for i in range(num_lights):
led[i] = tuple(color_rgb)
led.write(i, color_rgb[0], color_rgb[1], color_rgb[2])
print(f"Okay, I have now lit up {num_lights} beads with the color {color}.")
assistant_response = (f"Okay, I have now lit up {num_lights} beads with the color {color}.")
else:
print(f"Error: Unknown action '{action}'")
tools = [
{
"type": "function",
"function": {
"name": "play_music",
"description": "When the user requests to play music, this function is called. Plays music in the house when the user is bored or needs relaxation. The music list includes 'DADADADUM', 'ENTERTAINER', 'PRELUDE', 'ODE', 'NYAN', 'RINGTONE', 'FUNK', 'BLUES', 'BIRTHDAY', 'WEDDING', 'FUNERAL', 'PUNCHLINE', 'BADDY', 'CHASE', 'BA_DING', 'WAWAWAWAA', 'JUMP_UP', 'JUMP_DOWN', 'POWER_UP', 'POWER_DOWN'.",
"parameters": {
"type": "object",
"properties": {
"music": {
"type": "string",
"enum": ["DADADADUM", "ENTERTAINER", "PRELUDE", "ODE", "NYAN", "RINGTONE", "FUNK", "BLUES", "BIRTHDAY",
"WEDDING", "FUNERAL", "PUNCHLINE", "BADDY", "CHASE", "BA_DING", "WAWAWAWAA", "JUMP_UP",
"JUMP_DOWN", "POWER_UP", "POWER_DOWN"],
"description": "The music to play in the house."
},
},
"required": ["music"],
},
}
},
{
"type": "function",
"function": {
"name": "fan_action",
"description": "This function initializes or controls the fan based on the given action and user's command. It can initialize the fan, turn on the fan, turn off the fan, increase the fan speed, or decrease the fan speed. It will print an error message if the fan has not been initialized.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "turn_on", "turn_off", "increase_speed", "decrease_speed"],
"description": "The action to perform on the fan.",
},
"pin_number": {
"type": "integer",
"description": "The pin number where the fan is connected. Required for 'initialize' action.",
},
"user_input": {
"type": "string",
"description": "The user's command to control the fan. Required for 'turn on', 'turn off', 'increase speed', 'decrease speed' actions.",
},
},
"required": ["action"],
},
},
},
{
"type": "function",
"function": {
"name": "led_light_action",
"description": "This function performs actions on a LED light. It can initialize the LED light on a specific pin with a certain number of beads, or light up a certain number of beads in a specific color.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["initialize", "lightup"],
"description": "The action to perform. 'initialize' will set up the LED light on a specific pin with a certain number of beads. 'lightup' will light up a certain number of beads in a specific color."
},
"pin_number": {
"type": "integer",
"description": "The pin number where the LED light is connected. This is required when the 'action' is 'initialize'."
},
"num_lights": {
"type": "integer",
"description": "The number of beads to light up or initialize. This is required when the 'action' is 'initialize' or 'lightup'."
},
"color": {
"type": "string",
"description": "The color to use when lighting up the beads. This is required when the 'action' is 'lightup'."
},
},
"required": ["action"],
},
},
}
]
conversation = [
{"role": "system", "content": "You are a smart home assistant that can play music for relaxation when the user is bored or needs it. You can suggest playing music when the user seems bored or tired. You are a helpful assistant that can also control a fan and a LED light."},
]
flag = 0
def on_mic_click():
global flag
flag = 1
def main():
global flag
global user_input # Declare user_input as a global variable
user_input = ''
u_gui.draw_text(x=120, y=10, text="Start Home Assistant", origin='top', color="blue", font_size=15)
u_gui.draw_image(image="mic.jpg", x=120, y=230, w=180, h=50, origin='center', onclick=on_mic_click)
u_gui.draw_image(image="Mind.jpg", x=120, y=120, w=350, h=150, origin='center')
recording_status = u_gui.draw_text(x=120, y=300, text="Press to start recording", origin='center', color="blue", font_size=15)
while True:
# Check the value of flag and update the UI
if flag == 1:
recording_status.config(text="listening...")
user_input = recognize_from_microphone() # Get user input here
time.sleep(3)
flag = 2 #
# If the user has finished speaking, set flag value to 2 and send the user's input to the model
if user_input != "" and flag == 2:
recording_status.config(text="thinking...")
conversation.append({"role": "user", "content": user_input})
response = chat_completion_request(conversation, tools)
print("Model Response: ", response.json())
# Convert the model's response to speech and play
try:
assistant_response = response.json()["choices"][0]["message"]["content"]
except KeyError:
assistant_response = "I have successfully performed the operation for you"
if assistant_response is None:
assistant_response = "I have successfully performed the operation for you"
print(f"Assistant: {assistant_response}")
tts(assistant_response)
# Execute the tool calls in the model's response
if 'tool_calls' in response.json()["choices"][0]["message"]:
function_call = response.json()["choices"][0]["message"]["tool_calls"][0]["function"]
arguments = json.loads(function_call["arguments"])
if function_call["name"] == "play_music":
play_music(arguments["music"])
elif function_call["name"] == "fan_action":
fan_action(arguments["action"], arguments.get("pin_number"), arguments.get("user_input"))
elif function_call["name"] == "led_light_action":
led_light_action(arguments["action"], arguments.get("pin_number"), arguments.get("num_lights"), arguments.get("color"))
conversation.append({"role": "assistant", "content": assistant_response})
flag = 0 # Set the flag value to 1, indicating that user input has been processed
recording_status.config(text="Press to start recording")
if __name__ == "__main__":
main()