A robot arm that detects kids and throws candy at them.
Supplies:
SO-100 robot arm
Raspberry Pi 3 Model B
Webcam
Ultrasonic Sensor
USB Speaker
Introduction
I saw the SEEED Interactive Signage contest and the first thing that came to mind was all the booths at conventions, festivals, and so on. What actually works to lure people over? Is it a pretty sign? Is it a fancy hat? No, the answer is of course... Candy!
They have those bowls of candy. But, it always feels so awkward. Who can have some? How many do you take? Are you allowed to dive in face-first like you're bobbing for apples or is that frowned upon? It seems like not only do we have some social hurdles to overcome, but we also have room for improvement. Candy is great, but there's no zazz.
It seems like we need a way to up the ante, and conveniently I have just the robot arm for the job.
Prepping the Robot Arm
I put together a quick little unboxing video as I uncovered the secrets of the SO-100 but I wanted to convey the findings for anyone putting their own together.

First off, they have 2 arms in the package. It's a "follower" and "leader" arm, where you are intended to move the leader and the follower copies its movements. As many others I'm sure will, I saw this as an opportunity to have 2 robot arms that can each move autonomously, as opposed to using a leader-arm. The parts are open source and can just be 3d printed, and the follower and leader are the same until the part that acts as a hand. That means that at some point I'll have to fix my 3d printer (again) but for now, one arm is plenty and I'm happy with it.
For the most part assembly was super easy. They have it setup where you need to program each servo 1-6, and what I did (and recommend) was just put a piece of tape on each and label them to ensure nothing wonky happens later. By far the biggest issue I had was from the 2 times I did something slightly wrong. Removing a screw once it's in is not easy, so ensuring there's no need to do that in the first place, the assembly is easy.
There were 2 other main learnings I had for the setup that were worth noting. There is a minimum and maximum threshold that the robot is limited within. If you exceed these values, the behavior gets really weird. However, the other learning, is that this most likely only happens if something messes up the calibration in the first place. My logic was that it's capable of moving to the values I'm physically moving it to and it can handle that with the servos, but re-calibrating resolved a lot of headaches. So, if there are weird behaviors for you, give that a check.
Doing the Candy Throwing Thing
Now for the part that makes the project interesting. The goal was to throw candy, so my thinking was that I'd record all the movements as I walked the robot through the process. Then, I'd play them all back and we'd be all set. As hinted at, I had a couple points of confusion between the calibration and the threshold and tried quite a few iterations before simplifying things.
I made it such that I recorded a few key points instead, where it'd raise up, move over to the candy, grab it, lean back, and throw it. At first, it was either ridiculously fast or just doing seemingly random nonsense. Through that time, I tried a lot of various ideas. Trying to just slow it down, trying to interpolate the positions except for the fast part where it throws it, etc. There were quite a lot of iterations to make this work smoothly but since the journey is only interesting when it's interesting, the point is we got there and the code that worked is shared at the end.
There are 3 scripts included. The sequence recorder does as described and allows you to move the robot to each position you want it to hit, but also lets you say how much time to get to each position. When you want it to go as fast as possible, put a 0. For my successful sequence, I had the throwing part broken into 2 segments, both set at a 0 for how long I want it to take. At the top of the throw, the hand is still closed, and then at the end the hand is fully open. This ensured that the candy was held long enough to actually successfully get a good throw.
The simple sequence playback one does as described and runs the sequence nicely. This was handy for testing and getting everything working as intended.
Once those aspects worked, I moved it over to a Raspberry pi. I had started on one, but iterating on a Pi is a bit painful, so I moved to a desktop till everything cooperated. You can just copy-paste your working project onto the raspberry pi and everything works how it's been working, so this was an excellent time saver. But, I wanted to add some extra elements from there.

The 3rd included script is what runs our full, fun candy robot process. As for what that entails...
Making it Interesting
The premise came from the interactive signage idea, but this is very clearly something that makes sense for Halloween, so it already had me thinking. Wouldn't it be more interesting if we're throwing candy at kids specifically? Programmatically, it's more fun and involved, in practice it lures parents over to the booth in question (for the current purpose), and it's always fun to give kids sugar.
Even without these specifics, the plan was to add a webcam and leverage computer vision to comment about what we see in addition to giving or denying candy. This is actually something that is built into the code of the Lerobot robotic arm we're working with here, but since I also want commentary I used the openai api. I have it not process a frame from our webcam, but give or deny candy based on whether it sees a kid, and comment according to some basic rules. Since we have a specific response set we require, we can process this and run our robot arm process we trained previously whenever we do give candy.
Now that we have a paid api involved, it made sense to keep it cheap by adding an ultrasonic sensor. This kept it such that it would only start the process of sending a frame from our webcam to a vision api if someone had walked in proximity to begin with. I also limited the loop to a short-ish timeframe wherein it could be triggered, and found everything to be working well. So, it was time to throw candy at some kids. Conveniently, 2 live at my house, so it was time to give them a sugar rush.
A Robot Threw Candy at My Kids
I took this outside, in a non-bar-fighty-way. I setup a bowl of candy and plugged in the candy robot. Now that I'd positioned it well and slowed it down just enough to stay interesting but not be a safety hazard, it was time to let my kids face the judgment of AI-Vision (spoilers, they're kids so they got candy). They were thrilled. The robot worked very consistently and would reach in, grab something, and hurl candy at the happy kiddos.
Of course, now that I'd added the kid-specific aspects, I had to test the adult part. I had brought my son up to test the setup to ensure it'd throw candy at kids (he was rewarded for his efforts, of course), but I hadn't tested against adults. It was time for the moment of truth. My wife and I were both denied candy, though she was complimented and I was just denied - fair enough honestly.
Conclusion
Now I had a fully functioning robot that would identify kids and throw candy at them. I think it goes without saying that this will go beyond interactive signage and make a reappearance at Halloween. Hopefully you enjoyed the journey, and let me know what you think I should add to make the Halloween iteration a bit more spicy.
Have a good one.

import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig
class SimplePositionSequencer:
def __init__(self, port="COM15"):
self.port = port
self.motor_bus = None
self.motors_config = {
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
}
def connect(self):
"""Connect and completely disable ALL limits"""
print(f"Connecting to robot on {self.port}...")
config = FeetechMotorsBusConfig(port=self.port, motors=self.motors_config)
self.motor_bus = FeetechMotorsBus(config)
self.motor_bus.connect()
# COMPLETELY REMOVE ALL LIMITS
print("REMOVING ALL LIMITS...")
for motor_name in self.motors_config.keys():
try:
self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
print(f" {motor_name}: Limits removed")
except Exception as e:
print(f" {motor_name}: {e}")
print("Connected and ALL limits removed")
def disconnect(self):
if self.motor_bus:
self.motor_bus.disconnect()
print("Disconnected")
def torque_off(self):
"""Disable torque for manual movement"""
self.motor_bus.write("Torque_Enable", TorqueMode.DISABLED.value)
print("Torque OFF - move robot freely")
def torque_on(self):
"""Enable torque for controlled movement"""
self.motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
print("Torque ON - robot under control")
def get_positions(self):
"""Get current positions"""
positions = {}
for motor_name in self.motors_config.keys():
pos = self.motor_bus.read("Present_Position", motor_name)
if hasattr(pos, '__len__') and len(pos) == 1:
pos = pos[0]
positions[motor_name] = int(pos)
return positions
def move_to_position(self, positions, duration_seconds):
"""Move to position with timing - SMOOTH MOVEMENTS"""
# Adjust timing for much smoother, slower movements
if duration_seconds == 0:
time_ms = 800 # Keep fast movements quick but not too fast
else:
# For timed movements, make them MUCH slower and smoother
time_ms = int(duration_seconds * 1000)
for motor_name, position in positions.items():
self.motor_bus.write("Goal_Time", time_ms, motor_name)
self.motor_bus.write("Goal_Position", int(position), motor_name)
def record_sequence(self, name):
"""Record a position sequence"""
print(f"\n{'='*60}")
print(f"RECORDING SEQUENCE: {name}")
print(f"{'='*60}")
print("INSTRUCTIONS:")
print("1. Move robot to STARTING position")
print("2. Press ENTER to record starting position")
print("3. Move to next position")
print("4. Enter seconds to reach position (0=fast, decimals OK)")
print("5. Press ENTER to record position")
print("6. Repeat steps 3-5 for each position")
print("7. Press CTRL+C when done")
self.torque_off()
sequence = []
position_num = 1
try:
# Starting position
input(f"\nMove to STARTING position, press ENTER...")
pos = self.get_positions()
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
print(f"START: {pos_str}")
sequence.append({
"position": position_num,
"positions": pos.copy(),
"duration": 0.0 # Starting position has no timing
})
position_num += 1
# Subsequent positions
while True:
print(f"\nMove to position {position_num}")
duration_input = input(f"Enter seconds to reach position {position_num} (0=fast): ").strip()
try:
duration = float(duration_input)
if duration < 0:
print("Duration cannot be negative")
continue
except ValueError:
print("Invalid number")
continue
# Record position
pos = self.get_positions()
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
time_desc = "FAST" if duration == 0 else f"{duration}s"
print(f"Position {position_num}: {pos_str} ({time_desc})")
sequence.append({
"position": position_num,
"positions": pos.copy(),
"duration": duration
})
position_num += 1
except KeyboardInterrupt:
print(f"\nRecording finished! Captured {len(sequence)} positions")
# AUTOMATICALLY ADD STARTING POSITION AS FINAL POSITION
if len(sequence) > 1: # Only if we have more than just the start position
start_position = sequence[0]["positions"].copy()
sequence.append({
"position": position_num,
"positions": start_position,
"duration": 1.0 # 1 second to return to start
})
print(f"Added return to START position as final step (1.0s)")
# Save sequence
Path("sequences").mkdir(exist_ok=True)
file_path = Path("sequences") / f"{name}.json"
data = {
"name": name,
"recorded_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"total_positions": len(sequence),
"sequence": sequence
}
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"Saved to: {file_path}")
# Re-enable torque
self.torque_on()
def play_sequence(self, name):
"""Play back a recorded sequence"""
file_path = Path("sequences") / f"{name}.json"
if not file_path.exists():
print(f"Sequence file not found: {file_path}")
return
with open(file_path, 'r') as f:
data = json.load(f)
sequence = data["sequence"]
print(f"\n{'='*60}")
print(f"PLAYING SEQUENCE: {name}")
print(f"{'='*60}")
print(f"Positions: {len(sequence)}")
# Show what we're going to do
for step in sequence:
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in step["positions"].items()])
duration = step["duration"]
time_desc = "START" if duration == 0 and step["position"] == 1 else ("FAST" if duration == 0 else f"{duration}s")
print(f" {step['position']}. {pos_str} ({time_desc})")
input("\nPress ENTER to start playback...")
self.torque_on()
# REMOVE LIMITS AGAIN before playback
print("Ensuring limits are removed for playback...")
for motor_name in self.motors_config.keys():
try:
self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
except:
pass
print(f"\nEXECUTING...")
print("=" * 50)
try:
for i, step in enumerate(sequence):
pos = step["positions"]
duration = step["duration"]
position_num = step["position"]
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
if position_num == 1:
# Go to starting position instantly
print(f"Position {position_num}: {pos_str} (moving to start)")
self.move_to_position(pos, 0)
time.sleep(1.2) # Give a bit more time to reach start
else:
if duration == 0:
print(f"Position {position_num}: {pos_str} (FAST)")
self.move_to_position(pos, 0)
time.sleep(1.0) # Wait for fast movement
else:
print(f"Position {position_num}: {pos_str} ({duration}s)")
self.move_to_position(pos, duration)
# Wait for the actual movement time (4x longer) plus buffer
actual_time = duration * 4.0
time.sleep(actual_time + 1.0) # Wait for movement + larger buffer
print(f"\nSEQUENCE COMPLETE!")
except KeyboardInterrupt:
print(f"\nPLAYBACK STOPPED")
def main():
parser = argparse.ArgumentParser(description="Simple Position Sequence Recorder")
parser.add_argument("--mode", choices=["record", "play"], required=True)
parser.add_argument("--name", required=True, help="Sequence name")
parser.add_argument("--port", default="COM15", help="Serial port")
args = parser.parse_args()
sequencer = SimplePositionSequencer(args.port)
try:
sequencer.connect()
if args.mode == "record":
sequencer.record_sequence(args.name)
elif args.mode == "play":
sequencer.play_sequence(args.name)
except Exception as e:
print(f"ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
sequencer.disconnect()
return 0
if __name__ == "__main__":
exit(main())
import cv2
import time
import json
import subprocess
import threading
import base64
import requests
from pathlib import Path
import RPi.GPIO as GPIO
from openai import OpenAI
import numpy as np
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig
import json
from pathlib import Path
ROBOT_SEQUENCE = "throw_candy_at_kids" # this is your sequence name that you want to use
ROBOT_SPEED = "slow" # very_slow, slow, medium, fast
ROBOT_PORT = "/dev/ttyACM0" # Change based on your Pi setup
# Ultrasonic sensor pins
TRIG_PIN = 18
ECHO_PIN = 24
# Detection settings
DETECTION_DISTANCE_FEET = 10
DETECTION_DISTANCE_CM = DETECTION_DISTANCE_FEET * 30.48 # Convert to cm
# Camera settings
CAMERA_INDEX = 0 # Usually 0 for USB webcam
# OpenAI API settings (set your API key)
OPENAI_API_KEY = "your-openai-api-key-here" # CHANGE THIS!
# Text-to-speech settings
USE_ESPEAK = True # Set to False to use festival instead
# =============================================================================
# CANDY DISPENSER CLASS
# =============================================================================
class CandyDispenser:
def __init__(self):
self.client = OpenAI(api_key=OPENAI_API_KEY)
self.camera = None
self.setup_gpio()
self.setup_camera()
self.robot_busy = False
def setup_gpio(self):
"""Setup GPIO pins for ultrasonic sensor"""
GPIO.setmode(GPIO.BCM)
GPIO.setup(TRIG_PIN, GPIO.OUT)
GPIO.setup(ECHO_PIN, GPIO.IN)
print(f"GPIO setup complete - Ultrasonic sensor on pins {TRIG_PIN}/{ECHO_PIN}")
def setup_camera(self):
"""Setup webcam"""
try:
self.camera = cv2.VideoCapture(CAMERA_INDEX)
if not self.camera.isOpened():
raise Exception("Camera not found")
print(f"Camera setup complete on index {CAMERA_INDEX}")
except Exception as e:
print(f"Camera setup failed: {e}")
self.camera = None
def measure_distance(self):
"""Measure distance using ultrasonic sensor"""
try:
# Send trigger pulse
GPIO.output(TRIG_PIN, True)
time.sleep(0.00001) # 10 microseconds
GPIO.output(TRIG_PIN, False)
# Measure echo time
start_time = time.time()
while GPIO.input(ECHO_PIN) == 0:
pulse_start = time.time()
if pulse_start - start_time > 0.1: # Timeout after 100ms
return float('inf')
while GPIO.input(ECHO_PIN) == 1:
pulse_end = time.time()
if pulse_end - pulse_start > 0.1: # Timeout after 100ms
return float('inf')
# Calculate distance
pulse_duration = pulse_end - pulse_start
distance_cm = pulse_duration * 17150 # Speed of sound calculation
return distance_cm
except Exception as e:
print(f"Distance measurement error: {e}")
return float('inf')
def play_sequence_direct(
self,
name=ROBOT_SEQUENCE,
speed_mode=ROBOT_SPEED,
port=ROBOT_PORT,
sequence_folder="sequences"
):
"""Directly play a LeRobot sequence using internal code, no subprocess or input()"""
# This is nearly a copy of your playback code, but refactored to be self-contained.
motors_config = {
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
}
file_path = Path(sequence_folder) / f"{name}.json"
if not file_path.exists():
print(f"Sequence file not found: {file_path}")
return False
with open(file_path, "r") as f:
data = json.load(f)
sequence = data["sequence"]
# Connect to motors
config = FeetechMotorsBusConfig(port=port, motors=motors_config)
motor_bus = FeetechMotorsBus(config)
motor_bus.connect()
# REMOVE ALL LIMITS
print("REMOVING ALL LIMITS...")
for motor_name in motors_config.keys():
try:
motor_bus.write("Min_Angle_Limit", 0, motor_name)
motor_bus.write("Max_Angle_Limit", 4095, motor_name)
print(f" {motor_name}: Limits removed")
except Exception as e:
print(f" {motor_name}: {e}")
print("Connected and ALL limits removed")
# Set speed mode (same as your script)
speed_settings = {
"very_slow": {"acceleration": 50, "max_accel": 50, "description": "Very Slow (50)"},
"slow": {"acceleration": 100, "max_accel": 100, "description": "Slow (100)"},
"medium": {"acceleration": 150, "max_accel": 150, "description": "Medium (150)"},
"fast": {"acceleration": 254, "max_accel": 254, "description": "Fast (254 - Official LeRobot)"},
}
if speed_mode not in speed_settings:
speed_mode = "slow"
settings = speed_settings[speed_mode]
print(f"\nSETTING SPEED: {settings['description']}")
for motor_name in motors_config.keys():
try:
motor_bus.write("Mode", 0, motor_name) # Position Control
motor_bus.write("P_Coefficient", 16, motor_name) # Smooth movement
motor_bus.write("I_Coefficient", 0, motor_name)
motor_bus.write("D_Coefficient", 32, motor_name)
motor_bus.write("Lock", 0, motor_name) # Unlock EPROM
motor_bus.write("Maximum_Acceleration", settings["max_accel"], motor_name)
motor_bus.write("Acceleration", settings["acceleration"], motor_name)
print(f" {motor_name}: Speed set to {settings['description']}")
except Exception as e:
print(f" {motor_name}: {e}")
# Torque ON
motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
print("Torque ON - robot under control")
# Run sequence!
try:
print(f"\nPLAYING SEQUENCE: {name}")
for i, step in enumerate(sequence):
pos = step["positions"]
duration = step["duration"]
position_num = step["position"]
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
if position_num == 1:
print(f"Position {position_num}: {pos_str} (moving to start)")
for motor_name, position in pos.items():
motor_bus.write("Goal_Position", int(position), motor_name)
time.sleep(1.5)
else:
if duration == 0:
print(f"Position {position_num}: {pos_str} (SUPER FAST)")
for motor_name, position in pos.items():
motor_bus.write("Goal_Time", 200, motor_name)
motor_bus.write("Goal_Position", int(position), motor_name)
time.sleep(0.4)
else:
print(f"Position {position_num}: {pos_str} (SMOOTH ~{duration}s)")
for motor_name, position in pos.items():
motor_bus.write("Goal_Position", int(position), motor_name)
time.sleep(0.8)
print(f"\nPROPER SLOW SEQUENCE COMPLETE!")
return True
except Exception as e:
print(f"ERROR during playback: {e}")
return False
finally:
try:
motor_bus.disconnect()
print("Disconnected")
except Exception as e:
print(f"Error on disconnect: {e}")
def capture_image(self, delay_seconds=2):
"""Capture an image from the webcam, save, and return base64 string."""
if not self.camera or not self.camera.isOpened():
raise RuntimeError("Could not open camera")
self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.75)
self.camera.read() # Dummy read to trigger auto exposure
time.sleep(delay_seconds)
for _ in range(5): # Flush buffer
self.camera.read()
ret, frame = self.camera.read()
if not ret:
raise RuntimeError("Failed to capture image from camera")
# Brightness boost (optional)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
v = cv2.add(v, 50)
v = np.clip(v, 0, 255)
final_hsv = cv2.merge((h, s, v))
frame = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)
# Save image
timestamp = time.strftime("%Y%m%d-%H%M%S")
image_path = f"/home/donuts/lerobot/captures/visitor_{timestamp}.jpg"
Path("/home/donuts/lerobot/captures").mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(image_path), frame)
# Encode to base64
retval, buffer = cv2.imencode('.jpg', frame)
if not retval:
raise RuntimeError("Failed to encode image")
encoded_image = base64.b64encode(buffer).decode('utf-8')
return encoded_image
def analyze_with_chatgpt(self, image_base64):
"""Send image to OpenAI's Vision model using the new responses endpoint"""
try:
response = self.client.responses.create(
model="gpt-4o", # or "gpt-4.1" if you have access
input=[{
"role": "user",
"content": [
{"type": "input_text", "text": """Analyze this image for a candy dispenser robot.
RESPOND IN EXACTLY THIS FORMAT:
ACTION: [CANDY or NO_CANDY]
RESPONSE: [Your brief, funny response]
Rules:
- If you see a child/kid (anyone who looks under 16), respond with ACTION: CANDY
- If you see an adult or no person, respond with ACTION: NO_CANDY
- Keep RESPONSE to 1-2 sentences maximum
- Be funny if possible
- For kids: comment on their costume if applicable or just comment about something you see like attire or appearance (dont be mean)
- For adults: playfully deny them candy in a funny way
- If there are kids and adults, comment on the kids and do give them candy
- If there is no one there, you cant identify the image, or something of that sort, give no candy and comment according to what you see as best you can in the same fun way
Examples:
ACTION: CANDY
RESPONSE: Hey there little vampire! Nice cape! Here's some candy to fuel your spooky adventures!
ACTION: NO_CANDY
RESPONSE: Sorry grown-up, no sugar rush for you today!
Be sure to stick to that format, but be creative when possible with your response, and comment on things that make it clear you're actually seeing who you're looking at.""" },
{
"type": "input_image",
"image_url": f"data:image/jpeg;base64,{image_base64}",
"detail": "high"
}
],
}]
)
return response.output_text.strip()
except Exception as e:
print(f"ChatGPT analysis error: {e}")
return "ACTION: NO_CANDY\nRESPONSE: Oops! My robot brain is having a glitch. Try again in a moment!"
def parse_chatgpt_response(self, response):
"""Parse ChatGPT response into action and speech"""
try:
lines = response.split('\n')
action_line = None
response_line = None
for line in lines:
if line.startswith('ACTION:'):
action_line = line.split('ACTION:')[1].strip()
elif line.startswith('RESPONSE:'):
response_line = line.split('RESPONSE:')[1].strip()
if not action_line or not response_line:
print(f"Failed to parse response: {response}")
return "NO_CANDY", "Sorry, I'm having trouble thinking right now!"
should_give_candy = action_line.upper() == "CANDY"
return "CANDY" if should_give_candy else "NO_CANDY", response_line
except Exception as e:
print(f"Response parsing error: {e}")
return "NO_CANDY", "My circuits are a bit scrambled right now!"
def speak_text(self, text):
"""Use OpenAI's TTS to generate high-quality voice and play it"""
try:
response = self.client.audio.speech.create(
model="tts-1", # Or use "tts-1-hd" for higher fidelity
voice="echo", # Other options: alloy, echo, fable, nova, shimmer, onyx
input=text
)
audio_path = "/tmp/response.mp3"
with open(audio_path, "wb") as f:
f.write(response.content)
subprocess.run(["mpg123", audio_path], check=True)
print(f"Spoke: {text}")
except Exception as e:
print(f"TTS error: {e}")
print(f"Would have said: {text}")
def handle_visitor(self):
"""Handle a detected visitor"""
print("Visitor detected! Analyzing...")
image_base64 = self.capture_image()
# Analyze with ChatGPT
print("Analyzing image with ChatGPT...")
chatgpt_response = self.analyze_with_chatgpt(image_base64)
print(f"ChatGPT response: {chatgpt_response}")
# Parse response
action, speech_text = self.parse_chatgpt_response(chatgpt_response)
# Speak the response
self.speak_text(speech_text)
# Dispense candy if appropriate
if action == "CANDY":
print("Giving candy!")
success = self.play_sequence_direct()
if not success:
self.speak_text("Oops! My arm got stuck. The candy is stuck too, but you deserve it anyway!")
else:
print("No candy this time")
def run(self):
"""Main loop"""
print(f"\nHALLOWEEN CANDY DISPENSER ACTIVE")
print(f"Sequence: {ROBOT_SEQUENCE} | Speed: {ROBOT_SPEED} | Port: {ROBOT_PORT}")
print(f"Detection range: {DETECTION_DISTANCE_FEET} feet")
print(f"Watching for trick-or-treaters...")
print("Press Ctrl+C to stop\n")
last_detection_time = 0
cooldown_period = 50 # seconds between detections
try:
while True:
distance_cm = self.measure_distance()
current_time = time.time()
if distance_cm <= DETECTION_DISTANCE_CM:
if current_time - last_detection_time > cooldown_period:
print(f"Person detected at {distance_cm:.1f}cm!")
last_detection_time = current_time
# Handle visitor in separate thread to avoid blocking
visitor_thread = threading.Thread(target=self.handle_visitor)
visitor_thread.start()
time.sleep(0.1) # Check 10 times per second
except KeyboardInterrupt:
print("\nShutting down candy dispenser...")
finally:
self.cleanup()
def cleanup(self):
"""Clean up resources"""
try:
if self.camera:
self.camera.release()
GPIO.cleanup()
print("Cleanup complete")
except Exception as e:
print(f"Cleanup error: {e}")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
# Check if OpenAI API key is set
if OPENAI_API_KEY == "your-openai-api-key-here":
print("ERROR: Please set your OpenAI API key in the OPENAI_API_KEY variable!")
exit(1)
# Create and run candy dispenser
dispenser = CandyDispenser()
dispenser.run()
import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig
class ProperSlowSequencer:
def __init__(self, port="COM15"): # this is found via the find_motors_bus_port script
self.port = port
self.motor_bus = None
self.motors_config = {
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
}
def connect(self):
print(f"Connecting to robot on {self.port}...")
config = FeetechMotorsBusConfig(port=self.port, motors=self.motors_config)
self.motor_bus = FeetechMotorsBus(config)
self.motor_bus.connect()
# REMOVE ALL LIMITS first
print("REMOVING ALL LIMITS...")
for motor_name in self.motors_config.keys():
try:
self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
print(f" {motor_name}: Limits removed")
except Exception as e:
print(f" {motor_name}: {e}")
print("Connected and ALL limits removed")
def disconnect(self):
if self.motor_bus:
self.motor_bus.disconnect()
print("Disconnected")
def set_speed_mode(self, speed_mode="slow"):
"""Set movement speed using Acceleration parameter (the correct way!)"""
speed_settings = {
"very_slow": {"acceleration": 50, "max_accel": 50, "description": "Very Slow (50)"},
"slow": {"acceleration": 100, "max_accel": 100, "description": "Slow (100)"},
"medium": {"acceleration": 150, "max_accel": 150, "description": "Medium (150)"},
"fast": {"acceleration": 254, "max_accel": 254, "description": "Fast (254 - Official LeRobot)"},
}
if speed_mode not in speed_settings:
speed_mode = "slow"
settings = speed_settings[speed_mode]
print(f"\n SETTING SPEED: {settings['description']}")
for motor_name in self.motors_config.keys():
try:
# Apply EXACT official LeRobot settings except for speed
self.motor_bus.write("Mode", 0, motor_name) # Position Control
self.motor_bus.write("P_Coefficient", 16, motor_name) # Smooth movement
self.motor_bus.write("I_Coefficient", 0, motor_name)
self.motor_bus.write("D_Coefficient", 32, motor_name)
# Unlock EPROM
self.motor_bus.write("Lock", 0, motor_name)
# THE KEY: Acceleration controls speed!
self.motor_bus.write("Maximum_Acceleration", settings["max_accel"], motor_name)
self.motor_bus.write("Acceleration", settings["acceleration"], motor_name)
print(f" {motor_name}: Speed set to {settings['description']}")
except Exception as e:
print(f" {motor_name}: {e}")
def torque_off(self):
"""Disable torque for manual movement"""
self.motor_bus.write("Torque_Enable", TorqueMode.DISABLED.value)
print("Torque OFF - move robot freely")
def torque_on(self):
"""Enable torque for controlled movement"""
self.motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
print("Torque ON - robot under control")
def get_positions(self):
"""Get current positions"""
positions = {}
for motor_name in self.motors_config.keys():
pos = self.motor_bus.read("Present_Position", motor_name)
if hasattr(pos, '__len__') and len(pos) == 1:
pos = pos[0]
positions[motor_name] = int(pos)
return positions
def move_to_position_correct(self, positions):
"""Move using CORRECT approach - ONLY Goal_Position, speed controlled by Acceleration"""
for motor_name, position in positions.items():
self.motor_bus.write("Goal_Position", int(position), motor_name)
def move_to_position_fast(self, positions):
"""Move FAST using old Goal_Time approach for super fast movements"""
time_ms = 200 # Very fast movement time
for motor_name, position in positions.items():
self.motor_bus.write("Goal_Time", time_ms, motor_name)
self.motor_bus.write("Goal_Position", int(position), motor_name)
def play_sequence(self, name, speed_mode="slow"):
"""Play back a recorded sequence with proper speed control"""
file_path = Path("sequences") / f"{name}.json"
if not file_path.exists():
print(f"Sequence file not found: {file_path}")
return
with open(file_path, 'r') as f:
data = json.load(f)
sequence = data["sequence"]
print(f"\n{'='*60}")
print(f"PROPER SLOW PLAYBACK: {name}")
print(f"{'='*60}")
print(f"Positions: {len(sequence)}")
# Set the speed mode
self.set_speed_mode(speed_mode)
# Show what we're going to do
for step in sequence:
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in step["positions"].items()])
duration = step["duration"]
time_desc = "START" if duration == 0 and step["position"] == 1 else f"SMOOTH (target: {duration}s)"
print(f" {step['position']}. {pos_str} ({time_desc})")
input(f"\nPress ENTER to start PROPER SLOW playback (Speed: {speed_mode})...")
self.torque_on()
print(f"\n EXECUTING PROPER SLOW MOVEMENT...")
print("=" * 50)
try:
for i, step in enumerate(sequence):
pos = step["positions"]
duration = step["duration"]
position_num = step["position"]
pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
if position_num == 1:
print(f"Position {position_num}: {pos_str} (moving to start)")
self.move_to_position_correct(pos)
time.sleep(1.5) # Short pause for start position only
else:
if duration == 0:
print(f"Position {position_num}: {pos_str} (SUPER FAST)")
self.move_to_position_fast(pos) # Use old fast method
time.sleep(0.4) # Minimal pause for fast movements
else:
print(f"Position {position_num}: {pos_str} (SMOOTH ~{duration}s)")
self.move_to_position_correct(pos) # Use new smooth method
time.sleep(0.8) # Minimal pause for smooth movements
print(f"\n PROPER SLOW SEQUENCE COMPLETE!")
except KeyboardInterrupt:
print(f"\n PLAYBACK STOPPED")
def main():
parser = argparse.ArgumentParser(description="Proper Slow Position Sequence Player")
parser.add_argument("--mode", choices=["play"], required=True, help="Only playback mode")
parser.add_argument("--name", required=True, help="Sequence name")
parser.add_argument("--speed", choices=["very_slow", "slow", "medium", "fast"], default="slow", help="Movement speed")
parser.add_argument("--port", default="COM15", help="Serial port")
args = parser.parse_args()
player = ProperSlowSequencer(args.port)
try:
player.connect()
if args.mode == "play":
player.play_sequence(args.name, args.speed)
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
player.disconnect()
return 0
if __name__ == "__main__":
exit(main())
