A Smart Hiking Stick that can help the visually impaired navigate and stay safe, that also provides tons of additional utility for any user.
For the UNIHIKER AIoT Innovation Challenge, I have chosen the Portable Device category:
Hardware components
DFRobot UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
Blues Notecarrier-F OR Blues Notecarrier-A
DFRobot Unihiker Expansion Board
USB Speaker
Ultrasonic Sensor - HC-SR04 (Generic) ×2
DFRobot Gravity: GNSS GPS Module
Digilent WS2812 Addressable LED Strip
Solar Powered Power Bank
Software apps and online services
DFRobot Mind+
Hand tools and fabrication machines
3D Printer (generic)
Introduction
It all begins with the mighty stick. It's already a staple of hiking so, if you're carrying it around anyway, why not increase its usefulness? Similarly, a guiding cane is one of the options those who are visually impaired have for navigation. Why not make it so that the guiding can could also warn you of overhangs above you? Or warn you of incoming bad weather? The answer is - we're all kinds of going to make it so it can do that, and way, way more.
The goal of this project is to not only specifically help those with visual impairments, but also to create a device that's generally useful enough that anyone would be happy to carry it around. It can be used to help visually impaired people navigate with ultrasonic sensors, monitor for falls, monitor walks to ensure the user is ok, and send alerts. And, for more general usefulness, it can tell you how far you are from home, send messages, check the time, check the weather, automatically warn you of bad weather, and more.
That's a lot to unpack, so let's just jump into it step by step, starting with what we're using to make it smart in the first place.
Smart Sticks Need a Brain
If the stick is going to be smart, we need to give it a brain. For this, we're using the UNIHIKER from DFRobot. They have their own documentation for getting started, but I'll provide some quick notes to get you on your way all the same. I used their home grown software Mind+, since it felt like a safe bet that the UNIHIKER would work well with it. After you plug your UNIHIKER into your computer and it goes through its boot process, you'll need to first add the UNIHIKER library within your Mind+ project. Simply click Blocks --> Extensions, and then you'll see UNIHIKER as a option in the Official Library tab.
After you do this, you'll see the option to "Connect Remote Terminal". This is how you connect to your UNIHIKER. By default, 10.1.2.3 [http://10.1.2.3/] is your UNIHIKER.
Similarly, if you need to ssh into your UNIHIKER, use [email protected] with the password dfrobot. When connected with Mind+, it checks for updates on your libraries, but in my experience it seemed to need connected to the internet for this. To that end, what worked for me was specifically going to the URL 10.1.2.3/pc/network-setting on my computer. It may be that the Network Settings tab will appear for you, but I've used 2 UNIHIKERs now and am fairly confident this little tid bit will save people out there some time and brain power!
Another note is how to get the program to run on the go. You need to enable Auto Boot, as seen below. Once you do this, it runs the last program you've run. So, just enable Auto boot within the UNIHIKER settings, run your program, and from then on your program will start with every reboot. This is obviously pretty important for programs like the Smart Hiking Stick that need to run automatically when powered on.
The blocks themselves are useful in seeing syntax and such for code you're unfamiliar with, which was super helpful in my case since this is the first time I've tackled a lot of the code involved in this project. You simply drag the blocks in, see how it works, and can make and run simple test code you can reference for as you write your program. One bit of code that was somewhat familiar though was the usage of ultrasonic sensors, which brings us to our next section.
Helping the Vision Impaired Navigate
The Smart Hiking Stick does have the "stick" aspect going for it, so it can inherently act a bit like a guiding cane. However, on a hike there may be hanging obstacles that one wouldn't feel with a guiding cane, and we may as well provide advanced notice for obstacles in front of the user or on the ground so they don't have to bother having the stick bump into anything in the first place.
There are 2 ultrasonic sensors. One faces slightly up, and one faces slightly down. When an object is detected within a certain distance threshold, a sound is played through the UNIHIKER. To provide as much information as possible as simply as possible, there are sound ranges for each ultrasonic sensor. Within those ranges, the pitch will change based on how close or far the distance is from the detected object. By doing this, the user will be able to know where an object is in terms of distance and direction simply by learning the tone frequencies over time. The distance threshold for when we start playing a tone on object detection can be adjusted for the up and down sensor separately. The down sensor would be largely driven by the user height, where most users would likely set it to just under the value where the floor is detected while walking. The up sensor would be moreso based on user preference.
As you'll see in the code, because we're doing so much within the Smart Hiking Stick, there is a lot to process and a lot of multithreading. The initial approach I used was with the buzzer.pitch(pitch, buzz_duration) type logic. However, I found that this causes thread locks that halt the entire application, so eventually I moved to the approach you'll now see within the code. This approach plays a tone, sleeps, then stops the tone. It's a little less concise than I'd prefer, but it works with the multithreading needs the Smart Hiking Stick requires and it forced my hand into having the tones go back and forth between the top and bottom ultrasonic sensor, which is probably a good thing for user clarity.
Voice Commands
It took me an embarrassingly long time to realize that there was a microphone built into the UNIHIKER. Until this point, I was intending to use the built-in buttons, and use braille to make it user friendly for the vision impaired users, but voice commands were a clear upgrade so I moved that direction immediately. As such, there is some limited functionality for buttons that I left in, since having more capabilities isn't a bad thing. Considering that the project is geared toward those with limited visibility, integrating voice commands into the project was a huge upgrade in usability.
One super important note for setting it up was an install that I had to do through ssh instead of Mind+. As I mentioned in the UNIHIKER section, use [email protected] with password dfrobot to ssh in. Then, you can run the following command:
sudo apt-get install flac
This is important because Google's api for interpreting voices doesn't seem to work without it.
With it, we can control all the various functions on the Smart Hiking Stick with voice commands, which is a night and day improvement for the user experience regardless of whether they have disabilities. Naturally, I've also added text to speech responses for the hiking stick itself and a speaker for audio playback.
Here is a quick run down of what functions I've added:
"Set home.",
"Where am I?",
"Help or SOS for emergencies",
"Extend walk by [duration].",
"Start a [duration] walk.",
"Extend walk.",
"Start walk.",
"End the walk.",
"What time is it?",
"How far am I from home?",
"How fast am I going?",
"What's my elevation?",
"Send a message.",
"Send my location.",
"Disable distance sensor.",
"Enable distance sensor.",
"What is the weather?",
"What is the temperature?",
"What is the chance of rain?",
"What is the wind speed?",
"What is the humidity?",
"List voice commands."
Conveniently, that leads us directly into actually building out all of this functionality!
GPS Functionality
As we'll get to, and as you'd expect, location data is pretty pivotal to a lot of the features involved in the Smart Hiking Stick. To that end, shoutout to this tutorial for helping get me going on utilizing the GNSS for location data.
The GPS coordinates are used in quite a bit of the functionality within the program, which includes providing coordinates when alerts are sent out, getting weather information, geofencing, and more.
What ended up working for me was taking the relevant code from the DFRobot_GNSS_I2C class in the aforementioned tutorial and simply including it within my python script. Beyond that, getting setup within Mind+ is similar to setting up the UNIHIKER itself. This time you go to blocks --> extensions --> user-ext and search for gnss.
One important note is that the device needs to be set somewhere with good signal. The red light isn't just a power indicator - it means you don't have signal and won't get readings! Conveniently, there won't be any issue getting clear signal hiking in the great outdoors, but just setting it on the desk while debugging doesn't seem to cut it. Propping it up against a window works, which you'll be able to visualize when the light turns green. At this point the provided code will cooperate and you'll get good gps readings.
Geofencing
The stick is meant to be so feature-rich that it is just universally useful on any hike for any user. However, there are 2 main use cases I had in mind that inspired the project in the first place. These both rely on geofencing to automatically perform functions for convenience and safety.
Vision Impaired Solo Hikes - The goal is that a vision impaired user would be able to just get up and go on hikes with drastically decreased risk. There are utilities built into the stick to make the process that much smoother, as well as safety features. First, the user defines their home geolocation, which can be set with a simple voice command. This can be changed anytime, so if the user moves or is even just staying somewhere else for awhile, this can be updated painlessly. There is a geofence that is then set on that user's home location. When the user leaves their home with the Smart Hiking Stick, a hike is automatically determined to have started. There is then a time frame that the user is expected to complete the walk, and if the walk isn't completed in that time frame we know to send an alert with the user's location. This can be easily extended or cancelled to avoid false alarms. When the user returns to their home geofence, the walk is then concluded automatically, as we know the user has returned home safe and sound.
Extended Through-Hikes - This is one of the aspects that initially inspired the project. Those doing extremely long hikes, like those through-hiking the Appalachian Trail require food drop-offs. Realistically, a vision impaired user would likely be using the previously mentioned solo hike features rather than for food drop-offs, but this is one of the key aspects that inspired the project to begin with. Through-hikers in general would clearly benefit, and it feels worth at least opening that realm of possibilities for disabled users. This would likely be something best done with a group of people, and the Smart Hiking Stick would be able to automatically provide massive convenience to everyone present. Coordinating food drop-offs requires a good amount of pre-planning and specific timings as is. With the Smart Hiking Stick, there would be pre-defined geofences that trigger notifications. When the user enters a geofence, the person(s) in charge of bringing food for the drop-off would automatically receive a message.
When we check the geofences, we get 5 values and take the median value. This is to get rid of outliers. If we simply average them and a value is way off, the average value would also be incorrect. We then have a simple equation to check if our current location is within a geofence. If so, the relevant functionality is triggered.
Blues Wireless
I went all in with Blues Wireless this time around. The Smart Hiking Stick is useful on the go, but a huge part of its feature set is related to safety for those with visual disabilities. We have several alerts that get sent out whenever there is a safety concern or crisis. Since we have that setup, I also included code for simply sending messages via voice commands. As we'll get to shortly, Blues can also make calls to external api's for fetching data while out and about, far from wireless connectivity.
It's worth noting that even though we send the gps coordinates from the hiking stick, Blues Wireless has this data as well even without being given these values.
I included both the Notecarrier-F and the Notecarrier-A in the project because I started with the Notecarrier-F but went ahead and also setup a Notecarrier-A, which could be used alternatively. You only need one of these for the project. I'll go through setting up the Notecarrier-A below:
There's a little mounting screw on the Notecarrier - unscrew it and put the Notecard you want to use in. The black one that came with the Superbox is the Notecard WiFi, which is wifi only, so we're going to use the green one, which has GPS enabled. Simply slot it into place, put the mounting screw back in, and plug it into your computer.
Next, navigate to the quickstart guide. This is a guide, but also how to get your device up and running within your project. You should be able to click the button to Connect a Notecard at this point, but if you can't just switch out your usb cable - it needs to be a cable that can actually transfer data, not just power the device.
Test your setup by pasting the following into command line: {"req":"card.version"}
This should give you basic data about the device, which confirms it's cooperating. Next navigate to https://notehub.io/projects and create a project, if you don't already have one ready for this device. This provides the project UID that you need for getting your device connected to your project. Simply modify and run the following commands and you'll be up and running.
{"req":"hub.set", "product":"<your project's UID>"}
{"req":"hub.sync"}
As far as the code aspect of things go, the UNIHIKER runs in python, so we're using the Blues python library. In terms of getting everything working correctly, I tried a few approaches but what I had success with was connecting via serial. Sometimes the simplest approach is the best - I simply plugged the Notecarrier into the UNIHIKER via usb and was able to connect via serial.
We initialize the notecard when the program starts, but I included a setting for whether to send Blues Alerts. However, we initialize the notecard anyway because this allows us to fetch weather data even if we don't want to send alerts/messages, which we'll get to shortly. When we send alerts, there's a simple function that all alert/message logic flows into, where we add the last known coordinates to the message in the request object and send the transaction.
It's worth noting that since we're using a notecard with gps capabilities, the coordinates are already available from these Blues transactions themselves. However, since we use the gnss' coordinates anyway, we have the lat/lng readily available.
We use Blues a lot here already, since it's what we use for all the safety alerts with location data, so why not also use it to send out messages in general? Among the various commands is "send a message". When prompted, we listen for what the user wants to say and send the message.
To that end, there are a lot of options for where to route the events that come in. The one that stands out to me as most intuitive is to have these routed into text messages, which can be accomplished with Twilio. As one may expect, this can be accomplished by clicking the Create Route button and filling out your Twilio information as seen below.
In the next step, we need to make a call to an external api. Since we're discussing the Blues Wireless part of the equation, let's go ahead and set that up now. First, go to your project.
In the upper right corner click the Create Route Button. Within the options below, you'll see Proxy for Notecard Web Requests. Click that, and fill in the form similarly to what is shown below. You can pick your own route name and alias. I made them the same to make things easy but you don't need to. We'll get to the specifics of the weather api calls in the following section.
Weather
The feedback I got for the project was largely positive, but the main concern that was brought to my attention was weather. Relying on other senses, like hearing, while it's pouring rain would understandably make things difficult. I would also think that it'd be a little harder to tell that rough weather is coming until it's too late. As such, it felt critical to add a feature where the hiking stick itself regularly checked for poor weather on the go. This can be accomplished with the Blues networking services we just setup. Blues is able to do more than send alerts and can be utilized to make calls to services, which in our case will be the openweathermap api.
To get access to the nicer api calls, you have to sign up for a subscription, but it's free for the first 1000 calls a day which is more than enough for personal use. For anyone following along and building this themselves, I'd recommend setting up the subscription sooner than later. It takes awhile to process the status of the account and gives an error on seemingly valid api keys, which can lead to confused ooga booga noises. I may or may not know this from experience.
why api call no work? (give it time after subscribing)
As for the code itself, included in the project is a simple test wherein we make the call for weather information via Blues wireless. This program succinctly shows how to make the calls to the weather api via Blues while also providing you a functional starting place. We print out a ton of information in this test function, which of course was cherry picked within our Smart Hiking Stick application based on what users would likely find useful.
Through some extensive trial and error I found that what worked best for getting this api to work correctly is to use the following as your route url (with your own api key, of course): https://api.openweathermap.org/data/3.0/onecall?lat=[.lat]&lon=[.lon]&appid=<your api key here>&exclude=minutely, hourly
What that does is allows you to inject your latitude and longitude in your code. I encountered some weird issues from the openweathermap api that weren't replicated in a browser, but using that url works. Excluding the minutely and hourly weather reports is also necessary, as you'll get a response length error if you don't cut down on the information being received.
We check the weather every 15 minutes while the stick is in use. The user can activate all weather alerts if desired, but I have this feature turned off by default. Otherwise, the user will be notified only if there is concerning weather and what the percent chance of that weather will be.
Beyond that, we have access to weather information at our fingertips at this point, so I also included voice commands that allow the user to get updates. I added the ability to inquire about all the weather items that I could see a user actually wanting to know, which are - temperature, chance of rain, wind speed, and humidity.
Fall Detection
In searching for the syntax for how to correctly utilize the accelerometer functionality within the Unihiker, I came across a project that was specifically a fall alert system that utilized the Unihiker so shout out to that project for providing a good starting point. One key thing I added was the gyroscopic values. The user may do any number of things that would cause a sudden increase in acceleration, and we don't want false positives on fall alerts for intentional actions. Instead, we see if the stick is more horizontal than it should be and only trigger if that is the case.
The first point of action if a fall is detected is that we request an update from the user. The text to speech of "A fall was detected. Are you okay? Please say yes or no." is played. Despite having the response request be a simple yes or no, there is a larger set of valid responses a user can say that will be understood by the hiking stick. The text to speech helps the user find the stick and also provides a very straightforward initial process for determining if the user is injured.
If the accelerometer picks up a rapid fall, we first give it a moment to see if the stick is picked back up. We can determine this with the values we can pull from the UNIHIKER as well. If the stick is moved back upright, we cancel the alert. If the user is hurt, they can use a voice command to send an alert. This allows us to avoid sending unnecessary alerts while also ensuring the user can get help if needed.
User confirming they are ok
When the fall is detected, a timer starts. If we haven't cancelled the alert within the allocated time, which I have set to 1 minute, we send out an alert. The assumption here is that the user is injured or unconscious, and we need to take action and request help.
Importantly, the user can also just tell the stick to call for help anytime by saying help or SOS.
Automated Walk Safety
The through hiking for the AT logistics are actually much easier than setting up a walk. The geofencing triggers alerts, we forward which geofence was triggered, mark it as triggered, and we're good to go. What's a bit more tricky is setting up walks that automatically start for the user and ensure they make it back safely.
We have an ongoing check for geofences. If the user isn't on a walk and leaves the home geofence, we start the walk. The user can also opt to start the walk themselves, so we also have a flag to indicate that the user has left their home. This way, when a user starts a walk it isn't ended as soon as the next geofence check sees that they're home.
The home geofence is easily set by saying "I'm home". Since it's so easy to set, this also means that if a user goes on vacation, they can have the same type of setup where they can have automated walks with their hotel/BnB/etc as their temporary home location.
The duration of a walk can be set by the user if they use a voice command. Otherwise, the assumption is that the most common walk has a known duration, so that value can be the default within the stick. This keeps the user interaction to a minimum, so they can just get up and walk. If they decide to take a longer walk, they simply extend the duration with a voice command.
When the user heads out for a walk, a timer is started. When the timer completes, we check in on the user to see if they're ok. We use the same positive or negative evaluation to provide ample ways to respond. If they give a positive affirmation that they're ok, we extend the walk duration. If they don't respond or say no, we send an alert via Blues.
More Code Notes
There is quite a lot of code within this project, and I did my best to cover what the code did within each relevant section. However, there are a few more notes to touch on because, again, there is a lot going on.
For one, a lot of processes need to happen simultaneously. We don't want the ultrasonic sensor to stop producing tones because you're talking and the stick thinks it needs to process a voice command, right? This is relevant to basically every thread we have running - if all the various functionalities had to wait on each other, the stick would offer a lot less utility. As such, I utilized multithreading. This can be kind of a pain sometimes. In some ways, this caused fewer issues than I expected. In other ways, I did have to make adjustments to the code to accommodate this approach. The two main examples that are top of mind are with voice commands, where a timeout is added so that we don't have a chance of getting indefinitely stuck waiting for the next command, and the ultrasonic sensors, which required a different approach than the buzzer function. I wanted to note this so that anyone who decides to build off this doesn't revert those changes and end up confused when their program locks up!
Beyond that, to adjust how often threads are run, just adjust how long they sleep before running through their loop again. There are some variables setup at the start that will keep the program running well, such as specifying the audio codec so that the text to speech output goes through the speaker as intended.
I believe the following is a full list of the pip commands required for the Smart Hiking Stick program to run in full:
pip install pyserial
pip install threading
pip install queue
pip install notecard
pip install pinpong
pip install pygame
pip install sys
pip install logging
pip install pyttsx3
pip install SpeechRecognition
pip install re
pip install os
pip install paho-mqtt
These can be installed via the Library Management tab in Mind+.
Hopefully the code was explained well enough throughout this journey! The full program is attached.
Modeling the Stick
There are quite a lot of electronic gismos to keep safe from the elements here, so 3d modeling a nice enclosure for everything was inevitable. Since 3d modeling isn't my strong suit, I utilized a VR modeling program called Medium to create the initial design. Then, I brought in pieces of smaller existing models to ensure a good end result. The model is included within the project. To credit where credit is due, I used the front plate section of a UNIHIKER based project for the UNIHIKER housing, and I used part of the handle of this gun handle project so that there's a comfortable place for the fingers to rest on the stick.
A picture is worth a thousand words for understanding the setup of the model, but I'll do a quick run down anyway. The top section is an enclosure for the speaker, which keeps it safe from the elements and ensures that audio playback faces toward the user in a convenient manner. There is a grip at hand height for comfort. There is a slot for the UNIHIKER to fit into. As noted in the video, the initial model I had included enough height for the UNIHIKER but the initial version I printed didn't account for the expansion board - that is fixed for the model I've attached (along with numerous other fixes). The holes you see throughout are for cables - the intention is for nice cable management (improvements were made since the initial print for this as well). There are little overhangs to protect electronics from the elements by guiding water away if it comes up. The large slot below the grip is for the power bank, because....
One feature I determined early on would be really important was solar power. When exploring power bank options, I discovered that some come with solar power harvesting built in. So, that meant the solar power integration part of the project was put into the modeling part of the process. I added an outward facing slot for the power bank to sit in that would keep it secure but allow it to absorb the sun's rays. This elongates the usage period and allows the user to continuously recharge the stick during any breaks.
As far as the one I printed goes, I do realize that black isn't the most logical choice for a hiking stick, since it'll be more likely to heat up. However, I'm building this project out for others and am just creating a very, very thorough proof of concept, so I went with a color scheme that I thought looked nice. It's also made out of PLA, which would likely melt out in the sun for extended periods of time. It's a great material for solid prototyping, though! For those wanting to make put this together to take out on adventures in the sun, I'd both suggest a lighter color for the print and a material with a higher melting point.
You'll notice in the video that I discovered needed modifications as I printed it. As such, the model attached in the project has improvements made to it, hence not matching the one in the video.
Lights (Camera, Action?)
You never know when you'll get turned around or even just underestimate how long a hike will take and end up walking in the dark. Or maybe you're just out camping! The hiking stick itself can turn into a light source. The initial intention was to have this built into the UNIHIKER setup as well, but the LED strip I bought with those capabilities came DOA. Thankfully plan B is very nearly just as good. I just wrapped a simple button activated LED strip around the stick such that with a single press the stick illuminates. Unless the user is 100% blind, this just feels like a universally helpful feature. In the case of 100% blindness, the ultrasonic sensors would be able to function in the dark regardless.
Bonus Functionality
Another project I put together that utilizes the UNIHIKER is a Smart Home system, which can be controlled by voice commands. Our Smart Hiking Stick just so happens to have voice commands already setup as well as wifi capabilities, so it stands to reason that we give it the ability to control these custom Smart Home functionalities as well.
Within our voice command logic, we listen for the word "butler". What we hear after "butler" is the command we send to a flask server we have setup on a CM4Stack from M5Stack. With this, the Smart Hiking Stick will be able to control any and all extra features we add to our Smart Home setup without any changes required.
Onward to the Great Outdoors!
The end result is a solar powered Smart Hiking Stick with a built in light, safety monitoring in many forms, alerts, navigation features, the capabilities to automate walks and hikes, and more. It can help act as the eyes of those who have vision disabilities and offers a plethora of features such that those without impairments would have plenty of use for it as well. There are also pretty clear additional user groups that would benefit greatly, such as the elderly. As noted in the feedback, there are different groups of people with vision impairments, and they may have different needs and different methods of navigating. My goal here was to provide as many tools as possible, such that it covers the needs of any user, from fully blind to not impaired at all. With the amount of time and effort I put into these types of projects, I do like to be able to put them to use myself and with this iteration I can already be warned of bad weather, send a message, or even see how fast I'm going.
This project took quite a lot of time and effort so hopefully this helps someone out there! It works but I probably won't hike the AT for another 15 years or so, so there's plenty of time for improvements as inspiration hits - I'd love to see any remixes or add-ons anyone puts together!
import serial
import threading
from queue import Queue
from notecard import notecard
from pinpong.board import *
from pinpong.extension.unihiker import *
from time import sleep, time
from datetime import datetime, timedelta
from math import radians, cos, sin, sqrt, atan2
import pygame
import sys
import logging
import pyttsx3
import speech_recognition as sr
import re
import os
import paho.mqtt.publish as publish
sys.path.append("/root/mindplus/.lib/thirdExtension/liliang-gravitygnss-thirdex")
# Configuration
PORT = '/dev/ttyACM0'
BAUD_RATE = 9600
PRODUCT_UID = "com.<company name>:<project name>"
# Constants for GNSS Device
GNSS_DEVICE_ADDR = 0x20
MODE_GPS_BEIDOU_GLONASS = 0x07
I2C_LAT_1 = 0x07
I2C_LON_1 = 0x0D
home_location = {"lat": 0, "lng": 0, "radius": 50} # Home radius in meters
home_set = False
walk_active = False
walk_start_time = None
user_left_home=False # in case the user starts the walk themselves, we need to ensure they leave the house before eventually triggering the walk end when they return home
walk_extension = 0
all_weather_updates = False # whether or not to say every weather update, or only inform the user when there's bad weather
lat=0
lng=0
walk_timer = None
fell_timer = None
activate_blues_alerts = False
notecard_port = None
# Specify the card and device number - this seems to be needed for it to work correctly
os.environ['ALSA_CARD'] = '1' # Use card 1 for the onboard audio codec
os.environ['ALSA_DEVICE'] = '0'
os.environ['AUDIODEV'] = 'hw:1,0'
# Initialize TTS engine
engine = pyttsx3.init()
# Directly set the voice to "english" (from running some tests it seems to sound better)
engine.setProperty('voice', 'english')
# Set volume to maximum
engine.setProperty('volume', 1.0)
# MQTT Broker Settings
BROKER = "<ip_of_your_server>"
PORT = 1883
TOPIC = "home/<your_topic>"
USERNAME = "<your_username>"
PASSWORD = "<your_password>"
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
settings_file = 'settings.json'
if not os.path.exists(settings_file):
print("No settings file. Creating one.")
with open(settings_file, 'w') as file:
file.write('{}') # start with an empty file if it doesn't exist - we'll write to it with defaults shortly
print(f"Created default settings file at {settings_file}")
def save_settings(settings):
try:
with open(settings_file, 'w') as file:
json.dump(settings, file)
logging.info("Settings saved successfully.")
except Exception as e:
logging.error(f"Error saving settings: {e}")
def load_settings():
try:
settings = {
"home_location": {"lat": None, "lng": None},
"home_set": False,
"ultrasonic_sensor_enabled": True,
"activate_blues_alerts": True,
"all_weather_updates": False
}
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
home_location = settings.get('home_location', {"lat": None, "lng": None})
home_set = settings.get('home_set', False)
ultrasonic_sensor_enabled = settings.get('ultrasonic_sensor_enabled', True)
activate_blues_alerts = settings.get('activate_blues_alerts', True)
all_weather_updates = settings.get('all_weather_updates', False)
logging.info("Settings loaded successfully.")
return settings, home_location, home_set, ultrasonic_sensor_enabled, activate_blues_alerts, all_weather_updates
else:
logging.warning("Settings file not found. Creating default settings.")
save_settings(settings)
except Exception as e:
logging.error(f"Error loading settings: {e}")
save_settings(settings)
return settings, settings['home_location'], settings['home_set'], settings['ultrasonic_sensor_enabled'], settings['activate_blues_alerts'], settings['all_weather_updates']
settings, home_location, home_set, ultrasonic_sensor_enabled, activate_blues_alerts, all_weather_updates = load_settings()
geofences = [
{"name": "add your locations here", "center": {"lat": 31.0959, "lng": -73.2668}, "radius": 25000, "triggered": False}
]
Board().begin()
ultrasonic_sensor_up = SR04_URM10(Pin(Pin.P0), Pin(Pin.P1))
ultrasonic_sensor_down = SR04_URM10(Pin(Pin.P2), Pin(Pin.P8))
tone = Tone(Pin(Pin.P26)) # Create a tone object with Pin 26
LOG_ULTRASONIC_VALUES = False # These values are useful but can be annoying if you're debugging something else
#Credit to "Lupin" from GNSS tutorial - this is a simplified version of the class they use (only keeping what we need for the smart hiking stick)
class DFRobot_GNSS_I2C:
def __init__(self, i2c_addr=0x20, bus=0):
self._i2c = I2C(bus)
self._addr = i2c_addr
self._lock = threading.Lock()
def set_gnss_mode(self, mode):
with self._lock:
self._i2c.writeto_mem(self._addr, 0x22, bytearray([mode]))
sleep(0.1)
def enable_power(self):
with self._lock:
self._i2c.writeto_mem(self._addr, 0x23, bytearray([0x00]))
def get_coordinate(self, reg):
with self._lock:
try:
result = self._i2c.readfrom_mem(self._addr, reg, 6)
if result is None:
raise ValueError("Received no data from the GNSS sensor.")
degree = result[0] + result[1] / 60.0 + (result[2] * 65536 + result[3] * 256 + result[4]) / 100000.0 / 60.0
return degree
except Exception as e:
logging.error(f"Unexpected error encountered: {str(e)}")
return None # Handle other possible exceptions
def get_latitude(self):
# return 31.5582 # Test loc (Burnt Corn Alabama) if/when your gnss is red (needs to be green to get values) - uncomment to test items that require gps while in low signal areas
with self._lock:
global lat
lat = self.get_coordinate(0x07)
return self.get_coordinate(0x07) # Latitude register
def get_longitude(self):
# return 87.1728 # Test loc
with self._lock:
global lng
lng = self.get_coordinate(0x07)
return self.get_coordinate(0x0D) # Longitude register
def set_home_location():
global settings, home_location, home_set
# latitude = get_latitude()
# longitude = get_longitude()
global lat,lng
if lat is not None and lng is not None:
settings['home_location'] = {"lat": lat, "lng": lng}
settings['home_set'] = True
home_set = True
save_settings(settings)
logging.info(f"Home location set to latitude: {lat}, longitude: {lng}")
speak("Home location set")
home_location["lat"] = lat
home_location["lng"] = lng
else:
settings['home_set'] = False
home_set = False
speak("Failed to set home location. Please try again.")
# Save home location to file
def save_home_location(location):
with open(home_location_file, 'w') as file:
json.dump(location, file)
logging.info("Home location saved.")
def check_geofence(latitude, longitude, geofence):
distance = haversine_distance(latitude, longitude, geofence["lat"], geofence["lng"])
return distance <= geofence["radius"]
def manage_walk():
global walk_active, walk_start_time, walk_extension, home_location
current_time = datetime.now()
global lat,lng
if not walk_active and check_geofence(lat,lng,home_location):
# User leaves home geofence
walk_active = True
user_left_home = True
walk_start_time = current_time
logging.info("Walk has started.")
speak("You have started a walk.")
elif walk_active:
if check_geofence(lat,lng,home_location) and user_left_home:
# User returns to home geofence
walk_active = False
user_left_home = False
logging.info("Walk has ended.")
elif (current_time - walk_start_time).total_seconds() > 3600 + walk_extension:
user_left_home = True
# Walk time exceeded, check for extension or send alert
speak("You've passed your expected walk duration. Are you ok? Say. Yes. or. no. ") # we check for other responses, but yes or no is easier to understand. Adding periods for tts clarity.
# Listen for a response from the user
response = listen_for_response()
response_evaluation = evaluate_response(response) # Provides a positive or negative response, so we're just using a simple flag to represent that
if response_evaluation:
print("User is ok - Extend the walk")
speak("Extending walk duration.")
walk_timer.cancel()
elif response_evaluation == False:
send_alert("User has fallen and is injured. Last known location is:") # we always send location data
if not button_b.is_pressed(): # Assuming button B is used for extending the walk
send_alert("Walk time exceeded without extension.")
logging.info("Alert sent - user passed walk time.")
else:
walk_extension += 1800 # Extend by 30 minutes
logging.info("Walk time extended by 30 minutes.")
else:
user_left_home = True
def calculate_pitch(distance, is_up, min_distance, max_distance):
print("calculate_pitch")
try:
if distance <= min_distance:
if LOG_ULTRASONIC_VALUES:
print(f"Calculating pitch for very close distance: {distance} cm")
return 440 # Lowest pitch for close proximity
elif distance > max_distance:
pitch = 880 if is_up else 220
if LOG_ULTRASONIC_VALUES:
print(f"Calculating pitch for distance beyond max threshold: {distance} cm, Pitch: {pitch}")
return pitch # High pitch for up, low pitch for down for far distances
else:
# Scale pitch dynamically based on the distance
dynamic_pitch = int(440 + (440 * (distance - min_distance) / (max_distance - min_distance))) if is_up \
else int(220 - (110 * (distance - min_distance) / (max_distance - min_distance)))
if LOG_ULTRASONIC_VALUES:
print(f"Calculating dynamic pitch for distance: {distance} cm, Pitch: {dynamic_pitch}")
return dynamic_pitch
except Exception as e:
if LOG_ULTRASONIC_VALUES:
print(f"Error calculating pitch for distance: {distance} cm, is_up: {is_up}. Error: {str(e)}")
return 440 # Default to a safe pitch if there's an error so the user still knows that there's an obstacle
def get_distance():
return ultrasonic_sensor.distance_cm()
gnss = DFRobot_GNSS_I2C()
gnss.set_gnss_mode(0x07)
gnss.enable_power()
def gps_monitor():
while True:
latitudes = []
longitudes = []
for _ in range(5): # Collect 5 samples rapidly
latitudes.append(gnss.get_latitude())
longitudes.append(gnss.get_longitude())
sleep(0.2) # Rapid sampling interval
# Pull the median to get rid of outliers - outliers would still skew an average, so the median is safest
median_latitude = sorted(latitudes)[len(latitudes)//2]
median_longitude = sorted(longitudes)[len(longitudes)//2]
global lat,lng
lat=median_latitude
lng=median_longitude
location_name = check_geofence(median_latitude, median_longitude)
if location_name:
send_message_over_blues(f"User has arrived at {location_name}")
sleep(60) # Wait for a minute before next readings
def haversine_distance(lat1, lon1, lat2, lon2):
# Calculate the great circle distance between two points
R = 6371000 # Earth radius in meters
dLat = radians(lat2 - lat1)
dLon = radians(lon2 - lon1)
a = sin(dLat/2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dLon/2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
def button_watch():
last_state_a = False
last_state_b = False
try:
while True:
current_state_a = button_a.is_pressed()
current_state_b = button_b.is_pressed()
if current_state_a and not last_state_a:
logging.info("Button A pressed")
handle_button_press()
if current_state_b and not last_state_b:
logging.info("Button B pressed")
sos_button_pressed()
last_state_a = current_state_a
last_state_b = current_state_b
sleep(0.1)
except KeyboardInterrupt:
logging.info("Button watch stopped.")
def speak(text):
print("Tts: ",text)
engine.say(text)
engine.runAndWait()
print("Tts done")
if not home_set:
speak("Say set home to set your home location, or press button a")
def sos_button_pressed():
global sos_sent
if sos_sent:
logging.info("SOS Cancelled.")
send_alert("SOS alert has been cancelled. Last known location is: ") # We always send the location with all messages
speak("Emergency alert cancelled.")
sos_sent = False # Reset the SOS sent flag
else:
logging.info("SOS activated")
send_alert("SOS! User needs help at location: ")
speak("Emergency alert sent.")
sos_sent = True
def send_alert(message):
global lat,lng
location= f"Latitude: {lat}, Longitude: {lng}"
if activate_blues_alerts:
req = {"req": "note.add", "body": {"text": message + f" Last known coordinates: {location}"}}
rsp = notecard_port.Transaction(req)
print(f"Alert sent: {rsp}")
fall_detected = False
def handle_button_press():
logging.info("Button was pressed!")
if not home_set:
set_home_location()
if fall_detected:
fall_detected = False
def detect_fall():
global fall_detected, walk_timer
g = 9.81
threshold_acceleration = 1.15 * g
upright_threshold = 0.1 * g
cos_threshold_angle = math.cos(math.radians(90 - 30)) # adjust the 30 here if you want to adjust your angle threshold
while True:
Ax = accelerometer.get_x() / 16384.0
Ay = accelerometer.get_y() / 16384.0
Az = accelerometer.get_z() / 16384.0
# Calculate angles to check if the stick is significantly tilted
cos_angle_x = Ax / math.sqrt(Ax**2 + Ay**2 + Az**2)
cos_angle_y = Ay / math.sqrt(Ax**2 + Ay**2 + Az**2)
acc_magnitude = math.sqrt(Ax**2 + Ay**2 + Az**2) * g
# Uncomment if you're having issues with your accelerometer values
# print(f"Acc magnitude: {acc_magnitude} m/s^2, Angle X: {math.degrees(math.acos(cos_angle_x))}, Angle Y: {math.degrees(math.acos(cos_angle_y))}")
if acc_magnitude > threshold_acceleration and (cos_angle_x < cos_threshold_angle or cos_angle_y < cos_threshold_angle):
user_fell()
# Check if the device is upright - if a user fell and they move their stick upright, we can assume theyre ok and that they can request an alert if not
if fall_detected and abs(acc_magnitude - g) < upright_threshold:
fall_detected = False
speak("Normal stick usage detected. Cancelling alert. You can send an alert by saying 'I fell' or 'Help'.")
if walk_timer.is_alive():
walk_timer.cancel()
sleep(0.5)
def user_fell():
global fall_detected,fell_timer
fall_detected = True
# Start a timer to send an alert if no clear response
fell_timer = threading.Timer(60, send_fall_alert)
fell_timer.start()
print("Fall detected!")
speak("A fall was detected. Are you okay? Please say yes or no.") # we check for other responses, but yes or no is easier to understand
# Listen for a response from the user
response = listen_for_response()
response_evaluation = evaluate_response(response)
if response_evaluation:
print("Timer and alert cancelled.")
speak("Cancelling fall alert.")
fell_timer.cancel()
elif response_evaluation == False:
send_alert("User has fallen and is injured. Last known location is:") # we always send location data
def send_fall_alert():
global fall_detected # Check to see if the user cancelled detected fall
if fall_detected:
send_alert("User has fallen and no response has been received in the allocated time. Last known location: ")
speak("Fall alert sent")
import speech_recognition as sr
def listen_for_voice_commands():
recognizer = sr.Recognizer()
microphone = sr.Microphone()
print("listen_for_voice_commands")
while True:
try:
with microphone as source:
recognizer.adjust_for_ambient_noise(source)
audio = recognizer.listen(source, timeout=5.0, phrase_time_limit=10.0) # 5 seconds to start speaking, 10 seconds for speaking
# process speech with Google's speech recognition
command = recognizer.recognize_google(audio)
print("You said:", command)
handle_voice_command(command, microphone, recognizer)
except sr.UnknownValueError:
print("Google Speech Recognition could not understand audio")
except sr.RequestError as e:
print("Could not request results; {0}".format(e))
except sr.WaitTimeoutError:
print("Listening timed out while waiting for phrase to start")
except KeyboardInterrupt:
print("Voice command listening terminated.")
break
def handle_voice_command(command, microphone, recognizer):
command = command.lower() # Normalize the command to lowercase for easier comparison
global ultrasonic_sensor_enabled, activate_blues_alerts, sos_sent
print(f"Voice command: {command}")
if "set home" in command "i'm home" in command "i'm at home" in command:
set_home_location()
elif "where am i" in command:
speak(process_location())
elif "help" in command:
sos_sent = False
activate_blues_alerts = True # turn blues alerts back on in the case of an emergency
sos_button_pressed()
elif "i'm okay" in command or "im ok" in command:
speak("Glad to hear that you're okay. Sending status message.")
send_alert("User has stated that they are ok. Location:")
elif "enable blues alerts" in command or "enable alerts" in command:
settings['activate_blues_alerts'] = True
save_settings(settings)
activate_blues_alerts = True
# initialize_blues_service()
speak("Blues alerts enabled.")
elif "disable blues alerts" in command or "disable alerts" in command:
settings['activate_blues_alerts'] = False
save_settings(settings)
activate_blues_alerts = False
# terminate_blues_service() # I commented this out so we can get weather updates
speak("Blues alerts disabled.")
elif "enable ultrasonic sensor" in command or "enable proximity sensor" in command or "enable distance sensor" in command:
settings['ultrasonic_sensor_enabled'] = True
save_settings(settings)
ultrasonic_sensor_enabled = True
speak("Ultrasonic sensor enabled.")
elif "disable ultrasonic sensor" in command or "disable proximity sensor" in command or "disable distance sensor" in command or "disable distance" in command or "disable object sensor" in command or "disable distant sensor" in command:
settings['ultrasonic_sensor_enabled'] = False
save_settings(settings)
ultrasonic_sensor_enabled = False
print("Ultrasonic sensor disabled.")
speak("Ultrasonic sensor disabled.")
elif "extend walk by" in command:
time_phrase = command.split("extend walk by")[-1].strip()
minutes = parse_time_to_minutes(time_phrase)
if minutes > 0:
extend_walk(minutes)
else:
speak("Could not understand the duration. Please repeat the command with a number.")
elif "start a" in command and "walk" in command:
time_phrase = command.split("start a")[-1].split("walk")[0].strip()
minutes = parse_time_to_minutes(time_phrase)
if minutes > 0:
start_timed_walk(minutes)
else:
speak("Could not understand the walk duration. Please specify a number of minutes.")
elif "extend walk" in command:
extend_walk()
elif "start walk" in command:
start_walk()
elif "end walk" in command or "i am home" in command:
end_walk()
elif "i fell" in command:
speak("User fell and requested an alert be sent.")
elif "what time is it" in command:
current_time = datetime.now().strftime('%H:%M')
speak(f"The current time is {current_time}")
elif "how far am i from home" in command:
report_distance_from_home()
elif "how fast am i going" in command:
get_speed()
elif "what's my elevation" in command:
get_elevation()
elif "send a message" in command or "send message" in command:
prompt_message(microphone, recognizer)
elif "send my location" in command:
send_my_location()
elif "what is the weather" in command:
speak_weather(fetch_latest_weather_data())
elif "what is the temperature" in command:
get_temperature()
elif "what is the chance of rain" in command:
get_chance_of_rain()
elif "what is the wind speed" in command:
get_wind_speed()
elif "what is the humidity" in command:
get_humidity()
elif "list voice commands" in command:
list_voice_commands()
elif 'butler' in command:
command = command.split('butler', 1)[1].strip() # Get the part after 'butler'
if command:
print(f"Command received: {command}")
# Send the command to the MQTT broker
publish.single(TOPIC, command, hostname=BROKER, port=PORT, auth={'username': USERNAME, 'password': PASSWORD})
print("Command sent to the Flask server via MQTT.")
else:
print("No command detected after the trigger word.")
def parse_time_to_minutes(input_time):
"""
Convert spoken time phrases into minutes.
Handles phrases like '30 minute walk', '1 hour walk', 'an hour and a half walk'.
"""
input_time = input_time.lower()
# Regex patterns to capture numbers and time units
number_pattern = r'\d+'
hour_pattern = r'hour'
minute_pattern = r'minute'
# Check for minutes and hours in the input
minutes = sum(map(int, re.findall(number_pattern, re.sub(minute_pattern, '', input_time))))
hours = sum(map(int, re.findall(number_pattern, re.sub(hour_pattern, '', input_time))))
# Special cases for "a" or "an" which typically mean one
if 'hour' in input_time and ('an ' in input_time or ' a ' in input_time):
hours += 1
if 'minute' in input_time and ('a ' in input_time or ' an ' in input_time):
minutes += 1
# Calculate total minutes
total_minutes = minutes + hours * 60
# Additional logic to handle complex phrases like "an hour and a half"
if 'half' in input_time and 'hour' in input_time:
total_minutes += 30
return total_minutes
def listen_for_response():
recognizer = sr.Recognizer()
microphone = sr.Microphone()
with microphone as source:
recognizer.adjust_for_ambient_noise(source)
audio = recognizer.listen(source)
try:
response = recognizer.recognize_google(audio).lower()
return response
except sr.UnknownValueError:
print("Speech Recognition could not understand audio")
except sr.RequestError as e:
print(f"Could not request results from Google Speech Recognition service; {e}")
return ""
def evaluate_response(response):
affirmative_responses = ["yes", "i am ok", "i'm ok", "i am okay", "i'm okay", "all good", "no problem", "fine"]
negative_responses = ["no", "i fell", "help", "i'm not okay", "not ok", "not okay", "i am hurt", "injured", "bad"]
if any(word in response for word in affirmative_responses):
print("User confirmed they are okay.")
return True
elif any(word in response for word in negative_responses):
print("User indicated they are not okay or needs help.")
return False
else:
print("Unrecognized response.")
return None
def initialize_blues_service():
global notecard_port
if not notecard_port:
try:
serial_port = setup_serial_connection(PORT, BAUD_RATE)
if serial_port is not None:
notecard_port = setup_notecard(serial_port)
else:
print("Failed to set up the serial port connection.")
except Exception as e:
speak("Failed to connect to Blues services.")
print(f"Error: {str(e)}")
def get_weather_data(lat, lon):
print("get_weather_data")
req = {
"req": "web.get",
"route": "weatherInfo", # Replace with your for Proxy Route if it's different
"name": "", # Leave this empty - we will use placeholders in the route
"body": {
"lat": lat,
"lon": lon
},
"content": "application/json"
}
response = notecard_port.Transaction(req)
return response
# This is a nice way to visualize a bunch of different weather information
def print_weather_data(weather_data):
print("Current Weather:")
current = weather_data.get("current", {})
print(f" Temperature: {kelvin_to_fahrenheit(current.get('temp', 'N/A'))}C")
print(f" Feels Like: {current.get('feels_like', 'N/A')}C")
print(f" Humidity: {current.get('humidity', 'N/A')}%")
print(f" Pressure: {current.get('pressure', 'N/A')} hPa")
print(f" Wind Speed: {current.get('wind_speed', 'N/A')} m/s")
print(f" Wind Direction: {current.get('wind_deg', 'N/A')}")
print(f" UV Index: {current.get('uvi', 'N/A')}")
print(f" Cloudiness: {current.get('clouds', 'N/A')}%")
print(f" Visibility: {current.get('visibility', 'N/A')} meters")
print(f" Weather Description: {current.get('weather', [{}])[0].get('description', 'N/A')}")
print(f" Rain Volume (1h): {current.get('rain', {}).get('1h', 'N/A')} mm")
print(f" Snow Volume (1h): {current.get('snow', {}).get('1h', 'N/A')} mm")
print("\nDaily Forecast:")
daily = weather_data.get("daily", [])[0] # Get today's forecast
print(f" Morning Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('morn', 'N/A'))}C")
print(f" Day Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('day', 'N/A'))}C")
print(f" Evening Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('eve', 'N/A'))}C")
print(f" Night Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('night', 'N/A'))}C")
print(f" Min Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('min', 'N/A'))}C")
print(f" Max Temperature: {kelvin_to_fahrenheit(daily.get('temp', {}).get('max', 'N/A'))}C")
print(f" Humidity: {daily.get('humidity', 'N/A')}%")
print(f" Pressure: {daily.get('pressure', 'N/A')} hPa")
print(f" Wind Speed: {daily.get('wind_speed', 'N/A')} m/s")
print(f" Wind Direction: {daily.get('wind_deg', 'N/A')}")
print(f" UV Index: {daily.get('uvi', 'N/A')}")
print(f" Cloudiness: {daily.get('clouds', 'N/A')}%")
print(f" Probability of Precipitation: {daily.get('pop', 'N/A') * 100}%")
print(f" Rain Volume: {daily.get('rain', 'N/A')} mm")
print(f" Snow Volume: {daily.get('snow', 'N/A')} mm")
print(f" Weather Description: {daily.get('weather', [{}])[0].get('description', 'N/A')}")
def terminate_blues_service():
global notecard_port
if notecard_port:
notecard_port.close()
notecard_port = None
speak("Blues service terminated.")
def extend_walk(additional_minutes=30):
global walk_extension, walk_start_time
if walk_active:
walk_extension += additional_minutes
new_end_time = walk_start_time + timedelta(minutes=walk_extension)
speak(f"Walk extended by {additional_minutes} minutes. New end time is {new_end_time.strftime('%H:%M')}.")
else:
speak("There is no active walk to extend.")
def start_walk():
global walk_active, walk_start_time
if not walk_active:
walk_active = True
walk_start_time = datetime.now()
speak("Walk started. Have a good trip!")
else:
speak("A walk is already active.")
def end_walk():
global walk_active
if walk_active:
walk_active = False
speak("Walk has ended.")
else:
speak("No active walk to end.")
def check_geofence_status():
global lat,lng
for geofence in geofences:
if check_geofence(lat, lng, geofence):
speak(f"You are within the geofence of {geofence['name']}.")
return
speak("You are not within any known geofence.")
def get_speed():
sog = gnss.get_sog()
speak(f"Your speed is {sog} meters per second.")
def get_elevation():
alt = gnss.get_alt()
speak(f"Your elevation is {alt} meters.")
def send_my_location():
lat = gnss.get_lat()
lon = gnss.get_lon()
message = f"My current location is latitude {lat}, longitude {lon}."
send_message_over_blues(message)
def prompt_message(microphone, recognizer):
speak("What's your message?")
with microphone as source:
audio = recognizer.listen(source, timeout=5.0, phrase_time_limit=10.0)
try:
message = recognizer.recognize_google(audio)
confirm_and_send_message(message, microphone, recognizer)
except sr.UnknownValueError:
speak("I didn't catch that. Please say your message again.")
prompt_message(microphone, recognizer)
def confirm_and_send_message(message, microphone, recognizer):
speak(f"You said: {message}. Would you like to send it?")
audio = recognizer.listen(source, timeout=5.0, phrase_time_limit=10.0)
try:
# Repurpose evaluate_response so the user can say more than just yes or no
response = evaluate_response(recognizer.recognize_google(audio).lower())
# if "yes" in response:
if response:
send_message_over_blues(message)
speak("Message sent.")
# elif "no" in response:
elif response == False:
speak("Would you like to start over?")
handle_start_over(microphone, recognizer)
else:
speak("Please say yes or no.")
confirm_and_send_message(message, microphone, recognizer)
except sr.UnknownValueError:
speak("Please say yes or no.")
confirm_and_send_message(message, microphone, recognizer)
def handle_start_over(microphone, recognizer):
audio = recognizer.listen(source, timeout=5.0, phrase_time_limit=10.0)
try:
response = recognizer.recognize_google(audio).lower()
if "yes" in response:
prompt_message(microphone, recognizer)
elif "no" in response:
speak("Okay, not sending the message.")
except sr.UnknownValueError:
speak("Please say yes or no.")
handle_start_over(microphone, recognizer)
def send_message_over_blues(message):
if activate_blues_alerts:
req = {"req": "note.add", "body": {"text": message}}
notecard_port.Transaction(req)
def list_voice_commands():
commands = [
"Set home.",
"Where am I?",
"Help or SOS for emergencies",
"Extend walk by [duration].",
"Start a [duration] walk.",
"Extend walk.",
"Start walk.",
"End the walk.",
"What time is it?",
"How far am I from home?",
"How fast am I going?",
"What's my elevation?",
"Send a message.",
"Send my location.",
"Disable distance sensor.",
"Enable distance sensor.",
"What is the weather?",
"What is the temperature?",
"What is the chance of rain?",
"What is the wind speed?",
"What is the humidity?",
"List voice commands."
]
for command in commands:
speak(command)
sleep(1)
def report_distance_from_home():
if not home_set:
speak("Home location is not set.")
return
global lat,lng
current_position = (lat, lng)
home_position = (home_location["lat"], home_location["lng"])
distance = geodesic(current_position, home_position).meters
speak(f"You are approximately {distance:.2f} meters from home.")
def setup_notecard(serial_port):
card = notecard.OpenSerial(serial_port)
req = {"req": "hub.set", "product": PRODUCT_UID, "mode": "continuous"}
rsp = card.Transaction(req)
print(f"Setup response from Notecard: {rsp}")
return card
def setup_serial_connection(port, baud_rate):
try:
return serial.Serial(port, baud_rate)
except Exception as e:
print(f"Failed to open serial port: {e}")
return None
def process_location():
try:
for geofence in geofences:
if haversine_distance(lat, lng, geofence["center"]["lat"], geofence["center"]["lng"]) <= geofence["radius"]:
return f"You are in {geofence['name']}."
return f"Your coordinates are approximately {latitude:.4f}, {longitude:.4f}."
except Exception as e:
print(f"Error in process_location: {e}")
sleep(1)
def check_weather():
global all_weather_updates
while True:
try:
print("checking weather")
weather_data = fetch_latest_weather_data()
if weather_data is not None:
if all_weather_updates:
speak_weather(weather_data)
else:
check_for_bad_weather(weather_data)
sleep(900) # Check weather every 15 minutes
except Exception as e:
print(f"Error in check_weather: {e}")
sleep(90) # Try again sooner if we error out
def check_for_bad_weather(weather_data):
current = weather_data.get("current", {})
rain = current.get("rain", {}).get('1h', 0)
snow = current.get("snow", {}).get('1h', 0)
wind_speed = current.get("wind_speed", 0)
if rain > 40:
speak("Chance of rain is {rain} within the hour")
if snow > 40:
speak("Chance of snow is {snow} within the hour")
if wind_speed > 10:
speak("There will be wind speeds of {wind_speed} within the hour")
def kelvin_to_fahrenheit(kelvin):
return round((kelvin - 273.15) * 9/5 + 32, 2)
def speak_weather(weather_data):
current = weather_data.get("current", {})
temp_f = kelvin_to_fahrenheit(current.get("temp", "N/A"))
weather_description = current.get("weather", [{}])[0].get("description", "N/A")
speak(f"The current weather is {weather_description} with a temperature of {temp_f}F.")
def get_temperature():
weather_data = fetch_latest_weather_data()
temp_f = kelvin_to_fahrenheit(weather_data.get("current", {}).get("temp", "N/A"))
speak(f"The current temperature is {temp_f}F.")
def get_chance_of_rain():
weather_data = fetch_latest_weather_data()
daily = weather_data.get("daily", [])[0]
pop = daily.get("pop", "N/A")
speak(f"The chance of rain is {pop * 100}%.")
def get_wind_speed():
weather_data = fetch_latest_weather_data()
wind_speed = weather_data.get("current", {}).get("wind_speed", "N/A")
speak(f"The current wind speed is {wind_speed} meters per second.")
def get_humidity():
weather_data = fetch_latest_weather_data()
humidity = weather_data.get("current", {}).get("humidity", "N/A")
speak(f"The current humidity is {humidity}%.")
def fetch_latest_weather_data():
# print("fetch latest weather")
try:
print("lat long found for fetch weather")
global lat,lng
weather_response = get_weather_data(lat, lng)
print("weather response found")
if weather_response.get("result") == 200:
print("Good weather response received")
return weather_response.get("body", {})
else:
print("Failed to fetch weather data")
print("Response received:", weather_response)
return {}
except Exception as e:
print(f"Error in fetch_latest_weather_data: {e}")
sleep(1)
return {}
def handle_ultrasonic_sensors():
global tone, ultrasonic_sensor_up, ultrasonic_sensor_down, ultrasonic_sensor_enabled
buzz_duration = .5
max_distance_top = 100
max_distance_bottom = 50 # make the bottom one have a lower range so we arent triggering constantly
min_distance = 1
is_up_sensor = True # Start with the 'up' sensor
while True:
if ultrasonic_sensor_enabled:
try:
# Choose the current sensor based on the flag
current_sensor = ultrasonic_sensor_up if is_up_sensor else ultrasonic_sensor_down
max_distance = max_distance_top if is_up_sensor else max_distance_bottom
distance = current_sensor.distance_cm()
if LOG_ULTRASONIC_VALUES:
print(f"{'Up' if is_up_sensor else 'Down'} sensor distance: {distance} cm")
if distance is None or distance == 0:
sleep(1) # Adjust sleep time as needed
is_up_sensor = not is_up_sensor # Switch to the other sensor
continue
if distance > max_distance:
tick_threshold = 10
elif distance <= min_distance:
tick_threshold = 1
else:
tick_threshold = int(1 + 9 * (distance - min_distance) / (max_distance - min_distance))
pitch = calculate_pitch(distance, is_up_sensor, min_distance, max_distance)
# Using this setup instead of the buzzer function works way better with the threading - don't switch to the other way or it'll lock
tone.freq(pitch)
tone.on()
sleep(buzz_duration)
tone.off()
sleep(1)
is_up_sensor = not is_up_sensor # Switch to the other sensor after handling the current one
except Exception as e:
print(f"Error in ultrasonic sensor thread: {e}")
sleep(1)
else:
sleep(10)
def manage_walk_thread():
while True:
manage_walk()
time.sleep(15)
def main():
try:
initialize_blues_service()
threading.Thread(target=listen_for_voice_commands).start()
threading.Thread(target=gps_monitor).start()
threading.Thread(target=handle_ultrasonic_sensors).start()
threading.Thread(target=manage_walk_thread).start()
threading.Thread(target=button_watch).start() # You can comment this thread out - most users will prefer voice commands
threading.Thread(target=check_weather).start()
threading.Thread(target=detect_fall).start()
except KeyboardInterrupt:
logging.info("Program terminated by user.")
except Exception as e:
logging.error(f"Unhandled exception: {e}")
finally:
pygame.quit()
if __name__ == "__main__":
main()
Schematic: