Building a Microsoft-like Recall Function on a Tabletop Device using UNIHIKER - IoT Python Single Bo
Building a Microsoft-like Recall Function on a Tabletop Device
Imagine having a device that can recall and summarize information for you, just like a personal assistant. Sounds like something out of a sci-fi movie, right? Well, with the UNIHIKER - IoT Python Single Board Computer with Touchscreen, you can build a device that does just that.

Models and Techniques Used
* **Vosk Speech Recognition Model**: This model is used for speech recognition. It's a deep learning-based model that can recognize speech in real-time.
* **Transformers Text Summarization Model**: This model is used for text summarization. It's a deep learning-based model that can summarize long documents into concise and easy-to-understand formats.
* **Tkinter GUI Library**: This library is used to create the GUI for our device. It's a Python library that allows us to create user-friendly interfaces.
Important Parts of the Code
* **Audio Capture**: This part of the code is responsible for capturing audio data from the microphone. It's done using the `alsaaudio` library.
* **Speech Recognition**: This part of the code is responsible for recognizing speech from the audio data. It's done using the `vosk` library.
* **Text Summarization**: This part of the code is responsible for summarizing text from the recognized speech. It's done using the `transformers` library.
* **GUI Creation**: This part of the code is responsible for creating the GUI for our device. It's done using the `tkinter` library.
Code Highlights
* **Audio Capture**:
```
import alsaaudio
RATE = 16000
card = 'default'
chunk_size = 8192
inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size)
```
This code sets up the audio capture parameters and creates a function to capture audio data from the microphone.
* **Speech Recognition**:
```
import vosk
model = vosk.Model('/root/models/vosk-model-small-en-us-0.15')
rec = vosk.KaldiRecognizer(model, RATE)
```
This code sets up the speech recognition model and creates a function to recognize speech from the audio data.
* **Text Summarization**:
```
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
tokenizer = AutoTokenizer.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
summary_model = AutoModelForSeq2SeqLM.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary")
```
This code sets up the text summarization model and creates a function to summarize text from the recognized speech.
* **GUI Creation**:
```
import tkinter as tk
from tkinter import ttk
import re
from datetime import datetime
import threading
import yaml
class DocumentRecallGUI:
def __init__(self, master=None):
self.master = master
if master:
master.title("Document Recall")
master.geometry("240x360")
self.current_index = 0
self.summaries = self.load_summaries()
self.create_widgets()
```
This code creates the GUI for our device using the `tkinter` library.
IMPORTANT
the unihiker has very limited memory for it to work perfectly, thus i have added extra swap memory and swappiness to 100% for it to work, use these command at the start
swapon -s
sudo sysctl -w vm.swappiness=100
sudo sysctl -w vm.min_free_kbytes=10240
here is an excellent blogpost on adding extra swap, refer to it. # How to Increase Swap Space in Linux
Conclusion
Building a Microsoft-like recall function on a tabletop device is a complex task, but with the right tools and guidance, it's definitely possible. By following these steps, you can create a device that can recall and summarize information for you, just like a personal assistant.
So why not give it a try? With the UNIHIKER - IoT Python Single Board Computer with Touchscreen, you can build a device that's truly one-of-a-kind.
#full on device, but summarization is buggy import threading import queue import vosk from vosk import SetLogLevel import json from datetime import datetime, timedelta import time import alsaaudio import sys import schedule import yaml import re import logging from unihiker import GUI import psutil from pinpong.board import Board #import pinpong.tone as tone import ruamel.yaml from threading import Thread import time import tkinter as tk #import tkinter from time import sleep from PIL import Image, ImageTk, ImageOps import signal #from pinpong.base.pin_data import PinData #from pinpong.base.private_constants import PrivateConstants from yaml import load_all, FullLoader #yaml = ruamel.yaml.YAML() ####################### # Define the capture_audio function def capture_audio(): print(f"==========================Audio Capture Started==========================") while True: try: audio_data = bytearray() for _ in range(0, int(RATE / chunk_size * 6)): l, data = inp.read() if l > 0: audio_data.extend(data) # Put the audio data in the queue audio_queue.put(audio_data) except Exception as e: logging.error(f"Error capturing audio: {e}") ############################# with open("/root/transcription.yaml", "w") as clearfile: clearfile.write("") #comment out if you want to keep transcriptions ################################# class Transcriber(threading.Thread): def __init__(self, audio_queue, print_lock): super().__init__() self.rec = vosk.KaldiRecognizer(model, RATE) self.stop_event = threading.Event() self.audio_queue = audio_queue self.print_lock = print_lock def run(self): print(f"==========================Audio Transcription Started==========================") while not self.stop_event.is_set(): try: audio_data = self.audio_queue.get() self.rec.AcceptWaveform(bytes(audio_data)) result = self.rec.Result() with open("/root/transcription.yaml", "a") as f: f.write("---" + "\n" + result + "\n") with self.print_lock: logging.info(f"Transcription: {result}") sys.stdout.flush() self.audio_queue.task_done() except Exception as e: logging.error(f"Error transcribing audio: {e}") def stop(self): self.stop_event.set() ################### class SummaryGenerator: def __init__(self): self.stop_event = threading.Event() self.thread = None def generate_summary(self): while True: time.sleep(3) # wait for 1 second #print(f"==============looping==============") with open("/root/transcription.yaml", "r+") as file: transcription_data = list(yaml.load_all(file, Loader=yaml.FullLoader)) if len(transcription_data) >= 10: result_string = ', '.join([item.get('text', '') for item in transcription_data]) t = time.localtime() timendate = time.strftime("%Y-%m-%d %I:%M:%S %p", t) # Save the summary to a YAML file with open("/root/transcriptions.yaml", "a") as savetranscription: savetranscription.write("---" + "\n {" + timendate + " :" + result_string + "} \n") print(f"==========================TRANSCRIPTION SAVED=============================") print(f"=================Emptying Transcription.yaml===================") file.seek(0) file.write("") file.truncate() print(f"============================GENERATING SUMMARY===============================") tokens_input = tokenizer.encode("summerize : "+result_string, return_tensors='pt', max_length=512, truncation=True) summary_ids = summary_model.generate(tokens_input, min_length=150, max_length=512) summary_response = tokenizer.decode(summary_ids[0], skip_special_tokens=False) # Save the summary to a YAML file with open("/root/summary.yaml", "a") as summaryfile: summaryfile.write("---" + "\n {" + timendate + " :" + summary_response + "} \n") print(f"==========================SUMMARY SAVED=============================") #transcription_data = "" time.sleep(1) # wait for 1 second else: #print("Less than 10 documents, waiting...") time.sleep(1) # wait for 1 second def start(self): self.thread = threading.Thread(target=self.generate_summary) self.thread.daemon = True self.thread.start() def stop(self): self.stop_event.set() if self.thread: self.thread.join() ### generator = SummaryGenerator() generator.start() ########################## import tkinter as tk from tkinter import ttk import re from datetime import datetime import threading import yaml class DocumentRecallGUI: def __init__(self, master=None): self.master = master if master: master.title("Document Recall") master.geometry("240x360") self.current_index = 0 self.summaries = self.load_summaries() self.create_widgets() def create_widgets(self): self.recall_button = tk.Button(self.master, text="Recall", command=self.show_summary_window, font=("Helvetica", 16)) self.recall_button.pack(expand=True) self.listening_label = tk.Label(self.master, text="Listening...") self.listening_label.pack(expand=True) def load_summaries(self): with open("/root/summary.yaml", "r") as file: content = file.read() entries = [entry.strip() for entry in content.split('---') if entry.strip()] summaries = [] for entry in entries: match = re.match(r'\{(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [AP]M) :\[CLS\] (.+)\[SEP\]\}', entry) if match: date_str, text = match.groups() date_obj = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p") summaries.append({date_obj: text.strip()}) return summaries def show_summary_window(self): for widget in self.master.winfo_children(): widget.destroy() self.date_label = tk.Label(self.master, text="", font=("Helvetica", 12, "bold"), wraplength=220) self.date_label.pack(pady=5) # Create a frame to hold the text widget self.text_frame = tk.Frame(self.master) self.text_frame.pack(fill=tk.BOTH, expand=True) # Create the text widget self.summary_text = tk.Text(self.text_frame, wrap=tk.WORD, font=("Helvetica", 10), width=26, height=14) self.summary_text.pack(fill=tk.BOTH, expand=True) # Navigation buttons self.nav_frame = tk.Frame(self.master) self.nav_frame.pack(fill=tk.X, pady=5) self.up_button = tk.Button(self.nav_frame, text="↑", command=self.scroll_up, width=5) self.up_button.pack(side=tk.LEFT, padx=5) self.down_button = tk.Button(self.nav_frame, text="↓", command=self.scroll_down, width=5) self.down_button.pack(side=tk.LEFT, padx=5) self.back_button = tk.Button(self.nav_frame, text="Back", command=self.show_recall_button) self.back_button.pack(side=tk.RIGHT, padx=5) # Initialize last_y self.last_y = 0 # Disable text selection self.summary_text.config(state=tk.DISABLED) # Bind touch scrolling self.summary_text.bind("<ButtonPress-1>", self.touch_start) self.summary_text.bind("<B1-Motion>", self.touch_move) self.summary_text.bind("<ButtonRelease-1>", self.touch_end) self.display_summary(self.current_index) def touch_start(self, event): self.last_y = event.y self.summary_text.config(cursor="fleur") # Change cursor to indicate scrolling def touch_move(self, event): delta = self.last_y - event.y self.summary_text.yview_scroll(int(delta), "units") self.last_y = event.y def touch_end(self, event): self.summary_text.config(cursor="") def display_summary(self, index): if 0 <= index < len(self.summaries): summary = self.summaries[index] date_obj, text = list(summary.items())[0] if isinstance(date_obj, datetime): date_str = date_obj.strftime("%Y-%m-%d %I:%M:%S %p") else: date_str = date_obj date_obj = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p") formatted_date = date_obj.strftime("%B %d, %Y %I:%M %p") self.date_label.config(text=formatted_date) self.summary_text.config(state=tk.NORMAL) self.summary_text.delete(1.0, tk.END) self.summary_text.insert(tk.END, text) self.summary_text.config(state=tk.DISABLED) self.current_index = index def on_frame_configure(self, event=None): self.canvas.configure(scrollregion=self.canvas.bbox("all")) def scroll_start(self, event): self.canvas.scan_mark(event.x, event.y) def scroll_move(self, event): self.canvas.scan_dragto(event.x, event.y, gain=1) def show_recall_button(self): for widget in self.master.winfo_children(): widget.destroy() self.create_widgets() def scroll_up(self): if self.current_index > 0: self.current_index -= 1 self.display_summary(self.current_index) def scroll_down(self): if self.current_index < len(self.summaries) - 1: self.current_index += 1 self.display_summary(self.current_index) def run(self): self.root = tk.Tk() self.root.title("Document Recall") self.root.geometry("240x360") self.__init__(self.root) self.root.mainloop() def start_gui_thread(): gui = DocumentRecallGUI() gui_thread = threading.Thread(target=gui.run) gui_thread.daemon = True gui_thread.start() return gui_thread if __name__ == "__main__": start_gui_thread() # If you need to wait for the GUI to close before exiting: # gui_thread.join() ##################### # Set up logging logging.basicConfig(level=logging.INFO) SetLogLevel(-1) # Set up audio capture RATE = 16000 card = 'default' chunk_size = 8192 inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size) # Set up Vosk model model = vosk.Model('/root/models/vosk-model-small-en-us-0.15') rec = vosk.KaldiRecognizer(model, RATE) # Load model directly from cache if available from huggingface_hub import try_to_load_from_cache # Load model directly using transformers #from transformers import AutoTokenizer, AutoModelForSeq2SeqLM #tokenizer = AutoTokenizer.from_pretrained("cnicu/t5-small-booksum") #summary_model = AutoModelForSeq2SeqLM.from_pretrained("cnicu/t5-small-booksum") #from transformers import AutoTokenizer, AutoModel #tokenizer = AutoTokenizer.from_pretrained("Lucas-Hyun-Lee/T5_small_lecture_summarization") #summary_model = AutoModel.from_pretrained("Lucas-Hyun-Lee/T5_small_lecture_summarization") from transformers import AutoTokenizer, AutoModelForSeq2SeqLM tokenizer = AutoTokenizer.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary") summary_model = AutoModelForSeq2SeqLM.from_pretrained("Shobhank-iiitdwd/Distil_BERT_summary") # Create and start multiple transcriber threads num_threads = 1 # specify the number of threads, recomended=1 with ondevice summarization and 2 for ollama audio_queue = queue.Queue() print_lock = threading.Lock() # Create and start the capture thread capture_thread = threading.Thread(target=capture_audio) capture_thread.daemon = True capture_thread.start() # Create and start the transcriber thread transcriber_threads = [Transcriber(audio_queue, print_lock) for _ in range(num_threads)] for thread in transcriber_threads: thread.start() # Wait for all threads to finish capture_thread.join() for thread in transcriber_threads: thread.join() gui_thread.join() ###################### # Run the scheduler while True: time.sleep(0.1)
#this is an older version of the program that uses lm-studio on a local server for using llm's to summarize, it will help you as a starting point to change the code to add api calls. import threading import queue import vosk from vosk import SetLogLevel import json from datetime import datetime, timedelta import time import alsaaudio import sys import requests from openai import OpenAI import schedule import yaml import httpx import logging from unihiker import GUI import psutil from pinpong.board import Board #import pinpong.tone as tone import ruamel.yaml from threading import Thread import time import tkinter as tk #import tkinter from time import sleep from PIL import Image, ImageTk, ImageOps import signal #from pinpong.base.pin_data import PinData #from pinpong.base.private_constants import PrivateConstants from yaml import load_all, FullLoader #yaml = ruamel.yaml.YAML() Board().begin() gui = GUI() img = gui.fill_rect(x=0, y=0, w=240, h=320, color="#666666") if img is None: print("Error: Unable to create background") exit(1) gui.fill_rect(x=50, y=20, w=140, h=30, color="#2ff542") gui.fill_rect(x=80, y=50, w=80, h=23, color="#2ff542") text1 = gui.draw_text(x=120, y=20, text='CPU Usage:', origin='top') text_cpu = gui.draw_text(x=120, y=45, text='', font_size=14, origin='top') if text1 is None: print("Error: Unable to create text element") exit(1) # Create the button button = gui.fill_circle(x=120, y=200, r=60, color=(255, 255, 255), fill="#b774f2") colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255), (0, 255, 255)] i = 0 ####################### # Define the capture_audio function def capture_audio(): print(f"==========================Audio Capture Started==========================") while True: try: audio_data = bytearray() for _ in range(0, int(RATE / chunk_size * 6)): l, data = inp.read() if l > 0: audio_data.extend(data) # Put the audio data in the queue audio_queue.put(audio_data) except Exception as e: logging.error(f"Error capturing audio: {e}") ############################# with open("transcription.yaml", "w") as clearfile: clearfile.write("") ################################# class Transcriber(threading.Thread): def __init__(self, audio_queue, print_lock): super().__init__() self.rec = vosk.KaldiRecognizer(model, RATE) self.stop_event = threading.Event() self.audio_queue = audio_queue self.print_lock = print_lock def run(self): print(f"==========================Audio Transcription Started==========================") while not self.stop_event.is_set(): try: audio_data = self.audio_queue.get() self.rec.AcceptWaveform(bytes(audio_data)) result = self.rec.Result() with open("transcription.yaml", "a") as f: f.write("---" + "\n" + result + "\n") with self.print_lock: logging.info(f"Transcription: {result}") sys.stdout.flush() self.audio_queue.task_done() except Exception as e: logging.error(f"Error transcribing audio: {e}") def stop(self): self.stop_event.set() ################### class SummaryGenerator: def __init__(self): self.stop_event = threading.Event() self.thread = None def generate_summary(self): while True: time.sleep(3) # wait for 1 second #print(f"==============looping==============") with open("transcription.yaml", "r+") as file: transcription_data = list(yaml.load_all(file, Loader=yaml.FullLoader)) if len(transcription_data) >= 20: result_string = ', '.join([item.get('text', '') for item in transcription_data]) #print(f"=================Emptying Transcription.yaml===================") file.seek(0) file.write("") file.truncate() print(f"============================SENDING TO SERVER FOR INFERENCING===============================") client = OpenAI(base_url="http://192.168.1.2:1234/v1", api_key="lm-studio") completion = client.chat.completions.create( model="QuantFactory/Phi-3-mini-128k-instruct-GGUF", messages=[ {"role": "system", "content": "You are given text snippets, if possible try to identify if there are multiple people present, then summarize the transcription."}, {"role": "user", "content": result_string} ], temperature=0.7, ) summary_response = completion.choices[0].message.content # Save the summary to a YAML file with open("summary.yaml", "a") as summaryfile: t = time.localtime() timendate = time.strftime("%Y-%m-%d %I:%M:%S %p", t) summaryfile.write("---" + "\n {" + timendate + " :" + summary_response + "} \n") print(f"==========================SUMMARY SAVED=============================") #transcription_data = "" time.sleep(1) # wait for 1 second else: #print("Less than 10 documents, waiting...") time.sleep(1) # wait for 1 second def start(self): self.thread = threading.Thread(target=self.generate_summary) self.thread.daemon = True self.thread.start() def stop(self): self.stop_event.set() if self.thread: self.thread.join() ### generator = SummaryGenerator() generator.start() ########################## def display(): def update_button_color(): global i button.config(fill=colors[i]) i = (i + 1) % len(colors) time.sleep(0.04) clock_text = gui.draw_text(x=120, y=270, text='', font_size=28, color="#DDDDDD", origin='top') emj1 = gui.draw_emoji(x=77, y=60, w=120, h=120, emoji="Nerve", duration=0.1) update_queue1 = queue.Queue() def update_all(): while True: t = time.localtime() clock_text.config(text=time.strftime("%I:%M:%S %p", t)) time.sleep(1) cpu_usage = psutil.cpu_percent() update_queue1.put(("cpu_usage", str(cpu_usage) + '%')) if cpu_usage < 10: update_queue1.put(("emoji", "Shock")) elif cpu_usage < 30: update_queue1.put(("emoji", "Sleep")) elif cpu_usage < 45: update_queue1.put(("emoji", "Smile")) elif cpu_usage < 50: update_queue1.put(("emoji", "Peace")) elif cpu_usage < 65: update_queue1.put(("emoji", "Think")) elif cpu_usage < 90: update_queue1.put(("emoji", "Sweat")) else: update_queue1.put(("emoji", "Angry")) def main_loop(): while True: try: update_button_color() message = update_queue1.get_nowait() if message[0] == "cpu_usage": text_cpu.config(text=message[1]) elif message[0] == "emoji": emj1.config(emoji=message[1]) except queue.Empty: pass time.sleep(0.01) thread = threading.Thread(target=update_all) thread.start() main_loop() ##################### # Set up logging logging.basicConfig(level=logging.INFO) SetLogLevel(-1) # Set up audio capture RATE = 16000 card = 'default' chunk_size = 8192 inp = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE, format=alsaaudio.PCM_FORMAT_S16_LE, channels=1, rate=RATE, periodsize=chunk_size) # Set up Vosk model model = vosk.Model('models/vosk-model-small-en-us-0.15') rec = vosk.KaldiRecognizer(model, RATE) # Create and start multiple transcriber threads num_threads = 2 # specify the number of threads, recomended=2 audio_queue = queue.Queue() print_lock = threading.Lock() # Create and start the GUI thread gui_thread = threading.Thread(target=display) gui_thread.daemon = True gui_thread.start() # Create and start the capture thread capture_thread = threading.Thread(target=capture_audio) capture_thread.daemon = True capture_thread.start() # Create and start the transcriber thread transcriber_threads = [Transcriber(audio_queue, print_lock) for _ in range(num_threads)] for thread in transcriber_threads: thread.start() # Wait for all threads to finish #gui_thread.join() capture_thread.join() for thread in transcriber_threads: thread.join() ###################### # Run the scheduler while True: time.sleep(0.1)
