Building a Microsoft-like Recall Function on a Tabletop Device using UNIHIKER - IoT Python Single Bo
Building a Microsoft-like Recall Function on a Tabletop Device
Imagine having a device that can recall and summarize information for you, just like a personal assistant. Sounds like something out of a sci-fi movie, right? Well, with the UNIHIKER - IoT Python Single Board Computer with Touchscreen, you can build a device that does just that.
Models and Techniques Used
* **Vosk Speech Recognition Model**: This model is used for speech recognition. It's a deep learning-based model that can recognize speech in real-time.
* **Transformers Text Summarization Model**: This model is used for text summarization. It's a deep learning-based model that can summarize long documents into concise and easy-to-understand formats.
* **Tkinter GUI Library**: This library is used to create the GUI for our device. It's a Python library that allows us to create user-friendly interfaces.
Important Parts of the Code
* **Audio Capture**: This part of the code is responsible for capturing audio data from the microphone. It's done using the `alsaaudio` library.
* **Speech Recognition**: This part of the code is responsible for recognizing speech from the audio data. It's done using the `vosk` library.
* **Text Summarization**: This part of the code is responsible for summarizing text from the recognized speech. It's done using the `transformers` library.
* **GUI Creation**: This part of the code is responsible for creating the GUI for our device. It's done using the `tkinter` library.
Code Highlights
* **Audio Capture**:
```
import alsaaudio
RATE = 16000
card = 'default'
chunk_size = 8192
inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size)
```
This code sets up the audio capture parameters and creates a function to capture audio data from the microphone.
* **Speech Recognition**:
```
import vosk
model = vosk.Model('/root/models/vosk-model-small-en-us-0.15')
rec = vosk.KaldiRecognizer(model, RATE)
```
This code sets up the speech recognition model and creates a function to recognize speech from the audio data.
* **Text Summarization**:
```
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
tokenizer = AutoTokenizer.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
summary_model = AutoModelForSeq2SeqLM.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
```
This code sets up the text summarization model and creates a function to summarize text from the recognized speech.
* **GUI Creation**:
```
import tkinter as tk
from tkinter import ttk
import re
from datetime import datetime
import threading
import yaml
class DocumentRecallGUI:
def __init__(self, master=None):
self.master = master
if master:
master.title("Document Recall")
master.geometry("240x360")
self.current_index = 0
self.summaries = self.load_summaries()
self.create_widgets()
```
This code creates the GUI for our device using the `tkinter` library.
IMPORTANT
the unihiker has very limited memory for it to work perfectly, thus i have added extra swap memory and swappiness to 100% for it to work, use these command at the start
swapon -s
sudo sysctl -w vm.swappiness=100
sudo sysctl -w vm.min_free_kbytes=10240
here is an excellent blogpost on adding extra swap, refer to it. # How to Increase Swap Space in Linux
Conclusion
Building a Microsoft-like recall function on a tabletop device is a complex task, but with the right tools and guidance, it's definitely possible. By following these steps, you can create a device that can recall and summarize information for you, just like a personal assistant.
So why not give it a try? With the UNIHIKER - IoT Python Single Board Computer with Touchscreen, you can build a device that's truly one-of-a-kind.
#full on device, but summarization is buggy
import threading
import queue
import vosk
from vosk import SetLogLevel
import json
from datetime import datetime, timedelta
import time
import alsaaudio
import sys
import schedule
import yaml
import re
import logging
from unihiker import GUI
import psutil
from pinpong.board import Board
#import pinpong.tone as tone
import ruamel.yaml
from threading import Thread
import time
import tkinter as tk
#import tkinter
from time import sleep
from PIL import Image, ImageTk, ImageOps
import signal
#from pinpong.base.pin_data import PinData
#from pinpong.base.private_constants import PrivateConstants
from yaml import load_all, FullLoader
#yaml = ruamel.yaml.YAML()
#######################
# Define the capture_audio function
def capture_audio():
print(f"==========================Audio Capture Started==========================")
while True:
try:
audio_data = bytearray()
for _ in range(0, int(RATE / chunk_size * 6)):
l, data = inp.read()
if l > 0:
audio_data.extend(data)
# Put the audio data in the queue
audio_queue.put(audio_data)
except Exception as e:
logging.error(f"Error capturing audio: {e}")
#############################
with open("/root/transcription.yaml", "w") as clearfile:
clearfile.write("") #comment out if you want to keep transcriptions
#################################
class Transcriber(threading.Thread):
def __init__(self, audio_queue, print_lock):
super().__init__()
self.rec = vosk.KaldiRecognizer(model, RATE)
self.stop_event = threading.Event()
self.audio_queue = audio_queue
self.print_lock = print_lock
def run(self):
print(f"==========================Audio Transcription Started==========================")
while not self.stop_event.is_set():
try:
audio_data = self.audio_queue.get()
self.rec.AcceptWaveform(bytes(audio_data))
result = self.rec.Result()
with open("/root/transcription.yaml", "a") as f:
f.write("---" + "\n" + result + "\n")
with self.print_lock:
logging.info(f"Transcription: {result}")
sys.stdout.flush()
self.audio_queue.task_done()
except Exception as e:
logging.error(f"Error transcribing audio: {e}")
def stop(self):
self.stop_event.set()
###################
class SummaryGenerator:
def __init__(self):
self.stop_event = threading.Event()
self.thread = None
def generate_summary(self):
while True:
time.sleep(3) # wait for 1 second
#print(f"==============looping==============")
with open("/root/transcription.yaml", "r+") as file:
transcription_data = list(yaml.load_all(file, Loader=yaml.FullLoader))
if len(transcription_data) >= 10:
result_string = ', '.join([item.get('text', '') for item in transcription_data])
t = time.localtime()
timendate = time.strftime("%Y-%m-%d %I:%M:%S %p", t)
# Save the summary to a YAML file
with open("/root/transcriptions.yaml", "a") as savetranscription:
savetranscription.write("---" + "\n {" + timendate + " :" + result_string + "} \n")
print(f"==========================TRANSCRIPTION SAVED=============================")
print(f"=================Emptying Transcription.yaml===================")
file.seek(0)
file.write("")
file.truncate()
print(f"============================GENERATING SUMMARY===============================")
tokens_input = tokenizer.encode("summerize : "+result_string, return_tensors='pt', max_length=512, truncation=True)
summary_ids = summary_model.generate(tokens_input, min_length=150, max_length=512)
summary_response = tokenizer.decode(summary_ids[0], skip_special_tokens=False)
# Save the summary to a YAML file
with open("/root/summary.yaml", "a") as summaryfile:
summaryfile.write("---" + "\n {" + timendate + " :" + summary_response + "} \n")
print(f"==========================SUMMARY SAVED=============================")
#transcription_data = ""
time.sleep(1) # wait for 1 second
else:
#print("Less than 10 documents, waiting...")
time.sleep(1) # wait for 1 second
def start(self):
self.thread = threading.Thread(target=self.generate_summary)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.stop_event.set()
if self.thread:
self.thread.join()
###
generator = SummaryGenerator()
generator.start()
##########################
import tkinter as tk
from tkinter import ttk
import re
from datetime import datetime
import threading
import yaml
class DocumentRecallGUI:
def __init__(self, master=None):
self.master = master
if master:
master.title("Document Recall")
master.geometry("240x360")
self.current_index = 0
self.summaries = self.load_summaries()
self.create_widgets()
def create_widgets(self):
self.recall_button = tk.Button(self.master, text="Recall", command=self.show_summary_window, font=("Helvetica", 16))
self.recall_button.pack(expand=True)
self.listening_label = tk.Label(self.master, text="Listening...")
self.listening_label.pack(expand=True)
def load_summaries(self):
with open("/root/summary.yaml", "r") as file:
content = file.read()
entries = [entry.strip() for entry in content.split('---') if entry.strip()]
summaries = []
for entry in entries:
match = re.match(r'\{(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [AP]M) :\[CLS\] (.+)\[SEP\]\}', entry)
if match:
date_str, text = match.groups()
date_obj = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p")
summaries.append({date_obj: text.strip()})
return summaries
def show_summary_window(self):
for widget in self.master.winfo_children():
widget.destroy()
self.date_label = tk.Label(self.master, text="", font=("Helvetica", 12, "bold"), wraplength=220)
self.date_label.pack(pady=5)
# Create a frame to hold the text widget
self.text_frame = tk.Frame(self.master)
self.text_frame.pack(fill=tk.BOTH, expand=True)
# Create the text widget
self.summary_text = tk.Text(self.text_frame, wrap=tk.WORD, font=("Helvetica", 10), width=26, height=14)
self.summary_text.pack(fill=tk.BOTH, expand=True)
# Navigation buttons
self.nav_frame = tk.Frame(self.master)
self.nav_frame.pack(fill=tk.X, pady=5)
self.up_button = tk.Button(self.nav_frame, text="↑", command=self.scroll_up, width=5)
self.up_button.pack(side=tk.LEFT, padx=5)
self.down_button = tk.Button(self.nav_frame, text="↓", command=self.scroll_down, width=5)
self.down_button.pack(side=tk.LEFT, padx=5)
self.back_button = tk.Button(self.nav_frame, text="Back", command=self.show_recall_button)
self.back_button.pack(side=tk.RIGHT, padx=5)
# Initialize last_y
self.last_y = 0
# Disable text selection
self.summary_text.config(state=tk.DISABLED)
# Bind touch scrolling
self.summary_text.bind("<ButtonPress-1>", self.touch_start)
self.summary_text.bind("<B1-Motion>", self.touch_move)
self.summary_text.bind("<ButtonRelease-1>", self.touch_end)
self.display_summary(self.current_index)
def touch_start(self, event):
self.last_y = event.y
self.summary_text.config(cursor="fleur") # Change cursor to indicate scrolling
def touch_move(self, event):
delta = self.last_y - event.y
self.summary_text.yview_scroll(int(delta), "units")
self.last_y = event.y
def touch_end(self, event):
self.summary_text.config(cursor="")
def display_summary(self, index):
if 0 <= index < len(self.summaries):
summary = self.summaries[index]
date_obj, text = list(summary.items())[0]
if isinstance(date_obj, datetime):
date_str = date_obj.strftime("%Y-%m-%d %I:%M:%S %p")
else:
date_str = date_obj
date_obj = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p")
formatted_date = date_obj.strftime("%B %d, %Y %I:%M %p")
self.date_label.config(text=formatted_date)
self.summary_text.config(state=tk.NORMAL)
self.summary_text.delete(1.0, tk.END)
self.summary_text.insert(tk.END, text)
self.summary_text.config(state=tk.DISABLED)
self.current_index = index
def on_frame_configure(self, event=None):
self.canvas.configure(scrollregion=self.canvas.bbox("all"))
def scroll_start(self, event):
self.canvas.scan_mark(event.x, event.y)
def scroll_move(self, event):
self.canvas.scan_dragto(event.x, event.y, gain=1)
def show_recall_button(self):
for widget in self.master.winfo_children():
widget.destroy()
self.create_widgets()
def scroll_up(self):
if self.current_index > 0:
self.current_index -= 1
self.display_summary(self.current_index)
def scroll_down(self):
if self.current_index < len(self.summaries) - 1:
self.current_index += 1
self.display_summary(self.current_index)
def run(self):
self.root = tk.Tk()
self.root.title("Document Recall")
self.root.geometry("240x360")
self.__init__(self.root)
self.root.mainloop()
def start_gui_thread():
gui = DocumentRecallGUI()
gui_thread = threading.Thread(target=gui.run)
gui_thread.daemon = True
gui_thread.start()
return gui_thread
if __name__ == "__main__":
start_gui_thread()
# If you need to wait for the GUI to close before exiting:
# gui_thread.join()
#####################
# Set up logging
logging.basicConfig(level=logging.INFO)
SetLogLevel(-1)
# Set up audio capture
RATE = 16000
card = 'default'
chunk_size = 8192
inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size)
# Set up Vosk model
model = vosk.Model('/root/models/vosk-model-small-en-us-0.15')
rec = vosk.KaldiRecognizer(model, RATE)
# Load model directly from cache if available
from huggingface_hub import try_to_load_from_cache
# Load model directly using transformers
#from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
#tokenizer = AutoTokenizer.from_pretrained("cnicu/t5-small-booksum")
#summary_model = AutoModelForSeq2SeqLM.from_pretrained("cnicu/t5-small-booksum")
#from transformers import AutoTokenizer, AutoModel
#tokenizer = AutoTokenizer.from_pretrained("Lucas-Hyun-Lee/T5_small_lecture_summarization")
#summary_model = AutoModel.from_pretrained("Lucas-Hyun-Lee/T5_small_lecture_summarization")
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
tokenizer = AutoTokenizer.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
summary_model = AutoModelForSeq2SeqLM.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
# Create and start multiple transcriber threads
num_threads = 1 # specify the number of threads, recomended=1 with ondevice summarization and 2 for ollama
audio_queue = queue.Queue()
print_lock = threading.Lock()
# Create and start the capture thread
capture_thread = threading.Thread(target=capture_audio)
capture_thread.daemon = True
capture_thread.start()
# Create and start the transcriber thread
transcriber_threads = [Transcriber(audio_queue, print_lock) for _ in range(num_threads)]
for thread in transcriber_threads:
thread.start()
# Wait for all threads to finish
capture_thread.join()
for thread in transcriber_threads:
thread.join()
gui_thread.join()
######################
# Run the scheduler
while True:
time.sleep(0.1)
#this is an older version of the program that uses lm-studio on a local server for using llm's to summarize, it will help you as a starting point to change the code to add api calls.
import threading
import queue
import vosk
from vosk import SetLogLevel
import json
from datetime import datetime, timedelta
import time
import alsaaudio
import sys
import requests
from openai import OpenAI
import schedule
import yaml
import httpx
import logging
from unihiker import GUI
import psutil
from pinpong.board import Board
#import pinpong.tone as tone
import ruamel.yaml
from threading import Thread
import time
import tkinter as tk
#import tkinter
from time import sleep
from PIL import Image, ImageTk, ImageOps
import signal
#from pinpong.base.pin_data import PinData
#from pinpong.base.private_constants import PrivateConstants
from yaml import load_all, FullLoader
#yaml = ruamel.yaml.YAML()
Board().begin()
gui = GUI()
img = gui.fill_rect(x=0, y=0, w=240, h=320, color="#666666")
if img is None:
print("Error: Unable to create background")
exit(1)
gui.fill_rect(x=50, y=20, w=140, h=30, color="#2ff542")
gui.fill_rect(x=80, y=50, w=80, h=23, color="#2ff542")
text1 = gui.draw_text(x=120, y=20, text='CPU Usage:', origin='top')
text_cpu = gui.draw_text(x=120, y=45, text='', font_size=14, origin='top')
if text1 is None:
print("Error: Unable to create text element")
exit(1)
# Create the button
button = gui.fill_circle(x=120, y=200, r=60, color=(255, 255, 255), fill="#b774f2")
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255), (0, 255, 255)]
i = 0
#######################
# Define the capture_audio function
def capture_audio():
print(f"==========================Audio Capture Started==========================")
while True:
try:
audio_data = bytearray()
for _ in range(0, int(RATE / chunk_size * 6)):
l, data = inp.read()
if l > 0:
audio_data.extend(data)
# Put the audio data in the queue
audio_queue.put(audio_data)
except Exception as e:
logging.error(f"Error capturing audio: {e}")
#############################
with open("transcription.yaml", "w") as clearfile:
clearfile.write("")
#################################
class Transcriber(threading.Thread):
def __init__(self, audio_queue, print_lock):
super().__init__()
self.rec = vosk.KaldiRecognizer(model, RATE)
self.stop_event = threading.Event()
self.audio_queue = audio_queue
self.print_lock = print_lock
def run(self):
print(f"==========================Audio Transcription Started==========================")
while not self.stop_event.is_set():
try:
audio_data = self.audio_queue.get()
self.rec.AcceptWaveform(bytes(audio_data))
result = self.rec.Result()
with open("transcription.yaml", "a") as f:
f.write("---" + "\n" + result + "\n")
with self.print_lock:
logging.info(f"Transcription: {result}")
sys.stdout.flush()
self.audio_queue.task_done()
except Exception as e:
logging.error(f"Error transcribing audio: {e}")
def stop(self):
self.stop_event.set()
###################
class SummaryGenerator:
def __init__(self):
self.stop_event = threading.Event()
self.thread = None
def generate_summary(self):
while True:
time.sleep(3) # wait for 1 second
#print(f"==============looping==============")
with open("transcription.yaml", "r+") as file:
transcription_data = list(yaml.load_all(file, Loader=yaml.FullLoader))
if len(transcription_data) >= 20:
result_string = ', '.join([item.get('text', '') for item in transcription_data])
#print(f"=================Emptying Transcription.yaml===================")
file.seek(0)
file.write("")
file.truncate()
print(f"============================SENDING TO SERVER FOR INFERENCING===============================")
client = OpenAI(base_url="http://192.168.1.2:1234/v1", api_key="lm-studio")
completion = client.chat.completions.create(
model="QuantFactory/Phi-3-mini-128k-instruct-GGUF",
messages=[
{"role": "system", "content": "You are given text snippets, if possible try to identify if there are multiple people present, then summarize the transcription."},
{"role": "user", "content": result_string}
],
temperature=0.7,
)
summary_response = completion.choices[0].message.content
# Save the summary to a YAML file
with open("summary.yaml", "a") as summaryfile:
t = time.localtime()
timendate = time.strftime("%Y-%m-%d %I:%M:%S %p", t)
summaryfile.write("---" + "\n {" + timendate + " :" + summary_response + "} \n")
print(f"==========================SUMMARY SAVED=============================")
#transcription_data = ""
time.sleep(1) # wait for 1 second
else:
#print("Less than 10 documents, waiting...")
time.sleep(1) # wait for 1 second
def start(self):
self.thread = threading.Thread(target=self.generate_summary)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.stop_event.set()
if self.thread:
self.thread.join()
###
generator = SummaryGenerator()
generator.start()
##########################
def display():
def update_button_color():
global i
button.config(fill=colors[i])
i = (i + 1) % len(colors)
time.sleep(0.04)
clock_text = gui.draw_text(x=120, y=270, text='', font_size=28, color="#DDDDDD", origin='top')
emj1 = gui.draw_emoji(x=77, y=60, w=120, h=120, emoji="Nerve", duration=0.1)
update_queue1 = queue.Queue()
def update_all():
while True:
t = time.localtime()
clock_text.config(text=time.strftime("%I:%M:%S %p", t))
time.sleep(1)
cpu_usage = psutil.cpu_percent()
update_queue1.put(("cpu_usage", str(cpu_usage) + '%'))
if cpu_usage < 10:
update_queue1.put(("emoji", "Shock"))
elif cpu_usage < 30:
update_queue1.put(("emoji", "Sleep"))
elif cpu_usage < 45:
update_queue1.put(("emoji", "Smile"))
elif cpu_usage < 50:
update_queue1.put(("emoji", "Peace"))
elif cpu_usage < 65:
update_queue1.put(("emoji", "Think"))
elif cpu_usage < 90:
update_queue1.put(("emoji", "Sweat"))
else:
update_queue1.put(("emoji", "Angry"))
def main_loop():
while True:
try:
update_button_color()
message = update_queue1.get_nowait()
if message[0] == "cpu_usage":
text_cpu.config(text=message[1])
elif message[0] == "emoji":
emj1.config(emoji=message[1])
except queue.Empty:
pass
time.sleep(0.01)
thread = threading.Thread(target=update_all)
thread.start()
main_loop()
#####################
# Set up logging
logging.basicConfig(level=logging.INFO)
SetLogLevel(-1)
# Set up audio capture
RATE = 16000
card = 'default'
chunk_size = 8192
inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size)
# Set up Vosk model
model = vosk.Model('models/vosk-model-small-en-us-0.15')
rec = vosk.KaldiRecognizer(model, RATE)
# Create and start multiple transcriber threads
num_threads = 2 # specify the number of threads, recomended=2
audio_queue = queue.Queue()
print_lock = threading.Lock()
# Create and start the GUI thread
gui_thread = threading.Thread(target=display)
gui_thread.daemon = True
gui_thread.start()
# Create and start the capture thread
capture_thread = threading.Thread(target=capture_audio)
capture_thread.daemon = True
capture_thread.start()
# Create and start the transcriber thread
transcriber_threads = [Transcriber(audio_queue, print_lock) for _ in range(num_threads)]
for thread in transcriber_threads:
thread.start()
# Wait for all threads to finish
#gui_thread.join()
capture_thread.join()
for thread in transcriber_threads:
thread.join()
######################
# Run the scheduler
while True:
time.sleep(0.1)