donutsorelse
Published © GPL3+

Candy Throwing Robot Arm

A robot arm that detects kids and throws candy at them.

IntermediateFull instructions provided12 hours436

Things used in this project

Hardware components

Raspberry Pi 3 Model B
Raspberry Pi 3 Model B
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Ultrasonic Sensor - HC-SR04 (Generic)
Ultrasonic Sensor - HC-SR04 (Generic)
×1
usb speaker
×1
SO-ARM100
Seeed Studio SO-ARM100
×1

Story

Read more

Code

Smoother Playback

Python
This is the playback script that ran the robot a little bit more nicely.
import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig


class ProperSlowSequencer:
    def __init__(self, port="COM15"):   # this is found via the find_motors_bus_port script
        self.port = port
        self.motor_bus = None
        self.motors_config = {
            "shoulder_pan": [1, "sts3215"],
            "shoulder_lift": [2, "sts3215"],
            "elbow_flex": [3, "sts3215"],
            "wrist_flex": [4, "sts3215"],
            "wrist_roll": [5, "sts3215"],
            "gripper": [6, "sts3215"],
        }
        
    def connect(self):
        print(f"Connecting to robot on {self.port}...")
        config = FeetechMotorsBusConfig(port=self.port, motors=self.motors_config)
        self.motor_bus = FeetechMotorsBus(config)
        self.motor_bus.connect()
        
        # REMOVE ALL LIMITS first
        print("REMOVING ALL LIMITS...")
        for motor_name in self.motors_config.keys():
            try:
                self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
                self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
                print(f"   {motor_name}: Limits removed")
            except Exception as e:
                print(f"   {motor_name}: {e}")
        
        print("Connected and ALL limits removed")
        
    def disconnect(self):
        if self.motor_bus:
            self.motor_bus.disconnect()
            print("Disconnected")
    
    def set_speed_mode(self, speed_mode="slow"):
        """Set movement speed using Acceleration parameter (the correct way!)"""
        speed_settings = {
            "very_slow": {"acceleration": 50, "max_accel": 50, "description": "Very Slow (50)"},
            "slow": {"acceleration": 100, "max_accel": 100, "description": "Slow (100)"},
            "medium": {"acceleration": 150, "max_accel": 150, "description": "Medium (150)"},
            "fast": {"acceleration": 254, "max_accel": 254, "description": "Fast (254 - Official LeRobot)"},
        }
        
        if speed_mode not in speed_settings:
            speed_mode = "slow"
        
        settings = speed_settings[speed_mode]
        
        print(f"\n SETTING SPEED: {settings['description']}")
        
        for motor_name in self.motors_config.keys():
            try:
                # Apply EXACT official LeRobot settings except for speed
                self.motor_bus.write("Mode", 0, motor_name)  # Position Control
                self.motor_bus.write("P_Coefficient", 16, motor_name)  # Smooth movement
                self.motor_bus.write("I_Coefficient", 0, motor_name)
                self.motor_bus.write("D_Coefficient", 32, motor_name)
                
                # Unlock EPROM
                self.motor_bus.write("Lock", 0, motor_name)
                
                # THE KEY: Acceleration controls speed!
                self.motor_bus.write("Maximum_Acceleration", settings["max_accel"], motor_name)
                self.motor_bus.write("Acceleration", settings["acceleration"], motor_name)
                
                print(f"   {motor_name}: Speed set to {settings['description']}")
            except Exception as e:
                print(f"   {motor_name}: {e}")
    
    def torque_off(self):
        """Disable torque for manual movement"""
        self.motor_bus.write("Torque_Enable", TorqueMode.DISABLED.value)
        print("Torque OFF - move robot freely")
    
    def torque_on(self):
        """Enable torque for controlled movement"""
        self.motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
        print("Torque ON - robot under control")
    
    def get_positions(self):
        """Get current positions"""
        positions = {}
        for motor_name in self.motors_config.keys():
            pos = self.motor_bus.read("Present_Position", motor_name)
            if hasattr(pos, '__len__') and len(pos) == 1:
                pos = pos[0]
            positions[motor_name] = int(pos)
        return positions
    
    def move_to_position_correct(self, positions):
        """Move using CORRECT approach - ONLY Goal_Position, speed controlled by Acceleration"""
        for motor_name, position in positions.items():
            self.motor_bus.write("Goal_Position", int(position), motor_name)
    
    def move_to_position_fast(self, positions):
        """Move FAST using old Goal_Time approach for super fast movements"""
        time_ms = 200  # Very fast movement time
        for motor_name, position in positions.items():
            self.motor_bus.write("Goal_Time", time_ms, motor_name)
            self.motor_bus.write("Goal_Position", int(position), motor_name)
    
    def play_sequence(self, name, speed_mode="slow"):
        """Play back a recorded sequence with proper speed control"""
        file_path = Path("sequences") / f"{name}.json"
        
        if not file_path.exists():
            print(f"Sequence file not found: {file_path}")
            return
        
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        sequence = data["sequence"]
        
        print(f"\n{'='*60}")
        print(f"PROPER SLOW PLAYBACK: {name}")
        print(f"{'='*60}")
        print(f"Positions: {len(sequence)}")
        
        # Set the speed mode
        self.set_speed_mode(speed_mode)
        
        # Show what we're going to do
        for step in sequence:
            pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in step["positions"].items()])
            duration = step["duration"]
            time_desc = "START" if duration == 0 and step["position"] == 1 else f"SMOOTH (target: {duration}s)"
            print(f"   {step['position']}. {pos_str} ({time_desc})")
        
        input(f"\nPress ENTER to start PROPER SLOW playback (Speed: {speed_mode})...")
        
        self.torque_on()
        
        print(f"\n EXECUTING PROPER SLOW MOVEMENT...")
        print("=" * 50)
        
        try:
            for i, step in enumerate(sequence):
                pos = step["positions"]
                duration = step["duration"]
                position_num = step["position"]
                
                pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
                
                if position_num == 1:
                    print(f"Position {position_num}: {pos_str} (moving to start)")
                    self.move_to_position_correct(pos)
                    time.sleep(1.5)  # Short pause for start position only
                else:
                    if duration == 0:
                        print(f"Position {position_num}: {pos_str} (SUPER FAST)")
                        self.move_to_position_fast(pos)  # Use old fast method
                        time.sleep(0.4)  # Minimal pause for fast movements
                    else:
                        print(f"Position {position_num}: {pos_str} (SMOOTH ~{duration}s)")
                        self.move_to_position_correct(pos)  # Use new smooth method
                        time.sleep(0.8)  # Minimal pause for smooth movements
            
            print(f"\n PROPER SLOW SEQUENCE COMPLETE!")
            
        except KeyboardInterrupt:
            print(f"\n  PLAYBACK STOPPED")


def main():
    parser = argparse.ArgumentParser(description="Proper Slow Position Sequence Player")
    parser.add_argument("--mode", choices=["play"], required=True, help="Only playback mode")
    parser.add_argument("--name", required=True, help="Sequence name")
    parser.add_argument("--speed", choices=["very_slow", "slow", "medium", "fast"], default="slow", help="Movement speed")
    parser.add_argument("--port", default="COM15", help="Serial port")
    
    args = parser.parse_args()
    
    player = ProperSlowSequencer(args.port)
    
    try:
        player.connect()
        
        if args.mode == "play":
            player.play_sequence(args.name, args.speed)
        
    except Exception as e:
        print(f" ERROR: {e}")
        import traceback
        traceback.print_exc()
        return 1
    
    finally:
        player.disconnect()
    
    return 0


if __name__ == "__main__":
    exit(main())

Full Candy Throwing Robot

Python
This uses an ultrasonic sensor to see when someone is in range. When someone approaches, we use a frame from a webcam and send it to the openai computer vision api. We parse the response for whether to run the robot arm sequence to throw candy and playback the spoken response part.
import cv2
import time
import json
import subprocess
import threading
import base64
import requests
from pathlib import Path
import RPi.GPIO as GPIO
from openai import OpenAI
import numpy as np
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig
import json
from pathlib import Path



ROBOT_SEQUENCE = "throw_candy_at_kids" # this is your sequence name that you want to use
ROBOT_SPEED = "slow"  # very_slow, slow, medium, fast
ROBOT_PORT = "/dev/ttyACM0"  # Change based on your Pi setup

# Ultrasonic sensor pins
TRIG_PIN = 18
ECHO_PIN = 24

# Detection settings
DETECTION_DISTANCE_FEET = 10
DETECTION_DISTANCE_CM = DETECTION_DISTANCE_FEET * 30.48  # Convert to cm

# Camera settings
CAMERA_INDEX = 0  # Usually 0 for USB webcam

# OpenAI API settings (set your API key)
OPENAI_API_KEY = "your-openai-api-key-here"  # CHANGE THIS!

# Text-to-speech settings
USE_ESPEAK = True  # Set to False to use festival instead

# =============================================================================
# CANDY DISPENSER CLASS
# =============================================================================

class CandyDispenser:
    def __init__(self):
        self.client = OpenAI(api_key=OPENAI_API_KEY)
        self.camera = None
        self.setup_gpio()
        self.setup_camera()
        self.robot_busy = False
        
    def setup_gpio(self):
        """Setup GPIO pins for ultrasonic sensor"""
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(TRIG_PIN, GPIO.OUT)
        GPIO.setup(ECHO_PIN, GPIO.IN)
        print(f"GPIO setup complete - Ultrasonic sensor on pins {TRIG_PIN}/{ECHO_PIN}")
    
    def setup_camera(self):
        """Setup webcam"""
        try:
            self.camera = cv2.VideoCapture(CAMERA_INDEX)
            if not self.camera.isOpened():
                raise Exception("Camera not found")
            print(f"Camera setup complete on index {CAMERA_INDEX}")
        except Exception as e:
            print(f"Camera setup failed: {e}")
            self.camera = None
    
    def measure_distance(self):
        """Measure distance using ultrasonic sensor"""
        try:
            # Send trigger pulse
            GPIO.output(TRIG_PIN, True)
            time.sleep(0.00001)  # 10 microseconds
            GPIO.output(TRIG_PIN, False)
            
            # Measure echo time
            start_time = time.time()
            while GPIO.input(ECHO_PIN) == 0:
                pulse_start = time.time()
                if pulse_start - start_time > 0.1:  # Timeout after 100ms
                    return float('inf')
            
            while GPIO.input(ECHO_PIN) == 1:
                pulse_end = time.time()
                if pulse_end - pulse_start > 0.1:  # Timeout after 100ms
                    return float('inf')
            
            # Calculate distance
            pulse_duration = pulse_end - pulse_start
            distance_cm = pulse_duration * 17150  # Speed of sound calculation
            
            return distance_cm
            
        except Exception as e:
            print(f"Distance measurement error: {e}")
            return float('inf')
            
    def play_sequence_direct(
        self,
        name=ROBOT_SEQUENCE,
        speed_mode=ROBOT_SPEED,
        port=ROBOT_PORT,
        sequence_folder="sequences"
    ):
        """Directly play a LeRobot sequence using internal code, no subprocess or input()"""
        # This is nearly a copy of your playback code, but refactored to be self-contained.
        motors_config = {
            "shoulder_pan": [1, "sts3215"],
            "shoulder_lift": [2, "sts3215"],
            "elbow_flex": [3, "sts3215"],
            "wrist_flex": [4, "sts3215"],
            "wrist_roll": [5, "sts3215"],
            "gripper": [6, "sts3215"],
        }

        file_path = Path(sequence_folder) / f"{name}.json"
        if not file_path.exists():
            print(f"Sequence file not found: {file_path}")
            return False

        with open(file_path, "r") as f:
            data = json.load(f)
        sequence = data["sequence"]

        # Connect to motors
        config = FeetechMotorsBusConfig(port=port, motors=motors_config)
        motor_bus = FeetechMotorsBus(config)
        motor_bus.connect()

        # REMOVE ALL LIMITS
        print("REMOVING ALL LIMITS...")
        for motor_name in motors_config.keys():
            try:
                motor_bus.write("Min_Angle_Limit", 0, motor_name)
                motor_bus.write("Max_Angle_Limit", 4095, motor_name)
                print(f"   {motor_name}: Limits removed")
            except Exception as e:
                print(f"   {motor_name}: {e}")
        print("Connected and ALL limits removed")

        # Set speed mode (same as your script)
        speed_settings = {
            "very_slow": {"acceleration": 50, "max_accel": 50, "description": "Very Slow (50)"},
            "slow": {"acceleration": 100, "max_accel": 100, "description": "Slow (100)"},
            "medium": {"acceleration": 150, "max_accel": 150, "description": "Medium (150)"},
            "fast": {"acceleration": 254, "max_accel": 254, "description": "Fast (254 - Official LeRobot)"},
        }
        if speed_mode not in speed_settings:
            speed_mode = "slow"
        settings = speed_settings[speed_mode]

        print(f"\nSETTING SPEED: {settings['description']}")
        for motor_name in motors_config.keys():
            try:
                motor_bus.write("Mode", 0, motor_name)  # Position Control
                motor_bus.write("P_Coefficient", 16, motor_name)  # Smooth movement
                motor_bus.write("I_Coefficient", 0, motor_name)
                motor_bus.write("D_Coefficient", 32, motor_name)
                motor_bus.write("Lock", 0, motor_name)  # Unlock EPROM
                motor_bus.write("Maximum_Acceleration", settings["max_accel"], motor_name)
                motor_bus.write("Acceleration", settings["acceleration"], motor_name)
                print(f"   {motor_name}: Speed set to {settings['description']}")
            except Exception as e:
                print(f"   {motor_name}: {e}")

        # Torque ON
        motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
        print("Torque ON - robot under control")

        # Run sequence!
        try:
            print(f"\nPLAYING SEQUENCE: {name}")
            for i, step in enumerate(sequence):
                pos = step["positions"]
                duration = step["duration"]
                position_num = step["position"]
                pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
                if position_num == 1:
                    print(f"Position {position_num}: {pos_str} (moving to start)")
                    for motor_name, position in pos.items():
                        motor_bus.write("Goal_Position", int(position), motor_name)
                    time.sleep(1.5)
                else:
                    if duration == 0:
                        print(f"Position {position_num}: {pos_str} (SUPER FAST)")
                        for motor_name, position in pos.items():
                            motor_bus.write("Goal_Time", 200, motor_name)
                            motor_bus.write("Goal_Position", int(position), motor_name)
                        time.sleep(0.4)
                    else:
                        print(f"Position {position_num}: {pos_str} (SMOOTH ~{duration}s)")
                        for motor_name, position in pos.items():
                            motor_bus.write("Goal_Position", int(position), motor_name)
                        time.sleep(0.8)
            print(f"\nPROPER SLOW SEQUENCE COMPLETE!")
            return True
        except Exception as e:
            print(f"ERROR during playback: {e}")
            return False
        finally:
            try:
                motor_bus.disconnect()
                print("Disconnected")
            except Exception as e:
                print(f"Error on disconnect: {e}")
    
    def capture_image(self, delay_seconds=2):
        """Capture an image from the webcam, save, and return base64 string."""
        if not self.camera or not self.camera.isOpened():
            raise RuntimeError("Could not open camera")

        self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.75)
        self.camera.read()  # Dummy read to trigger auto exposure

        time.sleep(delay_seconds)

        for _ in range(5):  # Flush buffer
            self.camera.read()

        ret, frame = self.camera.read()

        if not ret:
            raise RuntimeError("Failed to capture image from camera")

        # Brightness boost (optional)
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)
        v = cv2.add(v, 50)
        v = np.clip(v, 0, 255)
        final_hsv = cv2.merge((h, s, v))
        frame = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        # Save image
        timestamp = time.strftime("%Y%m%d-%H%M%S")
        image_path = f"/home/donuts/lerobot/captures/visitor_{timestamp}.jpg"
        Path("/home/donuts/lerobot/captures").mkdir(parents=True, exist_ok=True)
        cv2.imwrite(str(image_path), frame)

        # Encode to base64
        retval, buffer = cv2.imencode('.jpg', frame)
        if not retval:
            raise RuntimeError("Failed to encode image")

        encoded_image = base64.b64encode(buffer).decode('utf-8')
        return encoded_image

    def analyze_with_chatgpt(self, image_base64):
        """Send image to OpenAI's Vision model using the new responses endpoint"""
        try:
            response = self.client.responses.create(
                model="gpt-4o",  # or "gpt-4.1" if you have access
                input=[{
                    "role": "user",
                    "content": [
                        {"type": "input_text", "text": """Analyze this image for a candy dispenser robot. 

RESPOND IN EXACTLY THIS FORMAT:
ACTION: [CANDY or NO_CANDY]
RESPONSE: [Your brief, funny response]

Rules:
- If you see a child/kid (anyone who looks under 16), respond with ACTION: CANDY
- If you see an adult or no person, respond with ACTION: NO_CANDY
- Keep RESPONSE to 1-2 sentences maximum
- Be funny if possible
- For kids: comment on their costume if applicable or just comment about something you see like attire or appearance (dont be mean)
- For adults: playfully deny them candy in a funny way
- If there are kids and adults, comment on the kids and do give them candy
- If there is no one there, you cant identify the image, or something of that sort, give no candy and comment according to what you see as best you can in the same fun way

Examples:
ACTION: CANDY
RESPONSE: Hey there little vampire! Nice cape! Here's some candy to fuel your spooky adventures!

ACTION: NO_CANDY  
RESPONSE: Sorry grown-up, no sugar rush for you today!

Be sure to stick to that format, but be creative when possible with your response, and comment on things that make it clear you're actually seeing who you're looking at.""" },
                        {
                            "type": "input_image",
                            "image_url": f"data:image/jpeg;base64,{image_base64}",
                            "detail": "high"
                        }
                    ],
                }]
            )

            return response.output_text.strip()

        except Exception as e:
            print(f"ChatGPT analysis error: {e}")
            return "ACTION: NO_CANDY\nRESPONSE: Oops! My robot brain is having a glitch. Try again in a moment!"

    def parse_chatgpt_response(self, response):
        """Parse ChatGPT response into action and speech"""
        try:
            lines = response.split('\n')
            action_line = None
            response_line = None
            
            for line in lines:
                if line.startswith('ACTION:'):
                    action_line = line.split('ACTION:')[1].strip()
                elif line.startswith('RESPONSE:'):
                    response_line = line.split('RESPONSE:')[1].strip()
            
            if not action_line or not response_line:
                print(f"Failed to parse response: {response}")
                return "NO_CANDY", "Sorry, I'm having trouble thinking right now!"
            
            should_give_candy = action_line.upper() == "CANDY"
            return "CANDY" if should_give_candy else "NO_CANDY", response_line
            
        except Exception as e:
            print(f"Response parsing error: {e}")
            return "NO_CANDY", "My circuits are a bit scrambled right now!"
    
    def speak_text(self, text):
        """Use OpenAI's TTS to generate high-quality voice and play it"""
        try:
            response = self.client.audio.speech.create(
                model="tts-1",  # Or use "tts-1-hd" for higher fidelity
                voice="echo",   # Other options: alloy, echo, fable, nova, shimmer, onyx
                input=text
            )

            audio_path = "/tmp/response.mp3"
            with open(audio_path, "wb") as f:
                f.write(response.content)

            subprocess.run(["mpg123", audio_path], check=True)
            print(f"Spoke: {text}")
        except Exception as e:
            print(f"TTS error: {e}")
            print(f"Would have said: {text}")

    def handle_visitor(self):
        """Handle a detected visitor"""
        print("Visitor detected! Analyzing...")
        
        image_base64 = self.capture_image()

        # Analyze with ChatGPT
        print("Analyzing image with ChatGPT...")
        chatgpt_response = self.analyze_with_chatgpt(image_base64)
        print(f"ChatGPT response: {chatgpt_response}")
        
        # Parse response
        action, speech_text = self.parse_chatgpt_response(chatgpt_response)
        
        # Speak the response
        self.speak_text(speech_text)
        
        # Dispense candy if appropriate
        if action == "CANDY":
            print("Giving candy!")
            success = self.play_sequence_direct()

            if not success:
                self.speak_text("Oops! My arm got stuck. The candy is stuck too, but you deserve it anyway!")
        else:
            print("No candy this time")
        
    
    def run(self):
        """Main loop"""
        print(f"\nHALLOWEEN CANDY DISPENSER ACTIVE")
        print(f"Sequence: {ROBOT_SEQUENCE} | Speed: {ROBOT_SPEED} | Port: {ROBOT_PORT}")
        print(f"Detection range: {DETECTION_DISTANCE_FEET} feet")
        print(f"Watching for trick-or-treaters...")
        print("Press Ctrl+C to stop\n")
        
        last_detection_time = 0
        cooldown_period = 50  # seconds between detections
        
        try:
            while True:
                distance_cm = self.measure_distance()
                current_time = time.time()
                
                if distance_cm <= DETECTION_DISTANCE_CM:
                    if current_time - last_detection_time > cooldown_period:
                        print(f"Person detected at {distance_cm:.1f}cm!")
                        last_detection_time = current_time
                        
                        # Handle visitor in separate thread to avoid blocking
                        visitor_thread = threading.Thread(target=self.handle_visitor)
                        visitor_thread.start()
                
                time.sleep(0.1)  # Check 10 times per second
                
        except KeyboardInterrupt:
            print("\nShutting down candy dispenser...")
        finally:
            self.cleanup()
    
    def cleanup(self):
        """Clean up resources"""
        try:
            if self.camera:
                self.camera.release()
            GPIO.cleanup()
            print("Cleanup complete")
        except Exception as e:
            print(f"Cleanup error: {e}")

# =============================================================================
# MAIN EXECUTION
# =============================================================================

if __name__ == "__main__":
    # Check if OpenAI API key is set
    if OPENAI_API_KEY == "your-openai-api-key-here":
        print("ERROR: Please set your OpenAI API key in the OPENAI_API_KEY variable!")
        exit(1)
    
    # Create and run candy dispenser
    dispenser = CandyDispenser()
    dispenser.run()

Simple Position Recorder

Python
Record one position at a time along with how long it should take to get to that position.
import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, TorqueMode
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig


class SimplePositionSequencer:
    def __init__(self, port="COM15"):
        self.port = port
        self.motor_bus = None
        self.motors_config = {
            "shoulder_pan": [1, "sts3215"],
            "shoulder_lift": [2, "sts3215"],
            "elbow_flex": [3, "sts3215"],
            "wrist_flex": [4, "sts3215"],
            "wrist_roll": [5, "sts3215"],
            "gripper": [6, "sts3215"],
        }
        
    def connect(self):
        """Connect and completely disable ALL limits"""
        print(f"Connecting to robot on {self.port}...")
        config = FeetechMotorsBusConfig(port=self.port, motors=self.motors_config)
        self.motor_bus = FeetechMotorsBus(config)
        self.motor_bus.connect()
        
        # COMPLETELY REMOVE ALL LIMITS
        print("REMOVING ALL LIMITS...")
        for motor_name in self.motors_config.keys():
            try:
                self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
                self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
                print(f"   {motor_name}: Limits removed")
            except Exception as e:
                print(f"   {motor_name}: {e}")
        
        print("Connected and ALL limits removed")
        
    def disconnect(self):
        if self.motor_bus:
            self.motor_bus.disconnect()
            print("Disconnected")
    
    def torque_off(self):
        """Disable torque for manual movement"""
        self.motor_bus.write("Torque_Enable", TorqueMode.DISABLED.value)
        print("Torque OFF - move robot freely")
    
    def torque_on(self):
        """Enable torque for controlled movement"""
        self.motor_bus.write("Torque_Enable", TorqueMode.ENABLED.value)
        print("Torque ON - robot under control")
    
    def get_positions(self):
        """Get current positions"""
        positions = {}
        for motor_name in self.motors_config.keys():
            pos = self.motor_bus.read("Present_Position", motor_name)
            if hasattr(pos, '__len__') and len(pos) == 1:
                pos = pos[0]
            positions[motor_name] = int(pos)
        return positions
    
    def move_to_position(self, positions, duration_seconds):
        """Move to position with timing - SMOOTH MOVEMENTS"""
        # Adjust timing for much smoother, slower movements
        if duration_seconds == 0:
            time_ms = 800  # Keep fast movements quick but not too fast
        else:
            # For timed movements, make them MUCH slower and smoother
            time_ms = int(duration_seconds * 1000)  
        
        for motor_name, position in positions.items():
            self.motor_bus.write("Goal_Time", time_ms, motor_name)
            self.motor_bus.write("Goal_Position", int(position), motor_name)
    
    def record_sequence(self, name):
        """Record a position sequence"""
        print(f"\n{'='*60}")
        print(f"RECORDING SEQUENCE: {name}")
        print(f"{'='*60}")
        print("INSTRUCTIONS:")
        print("1. Move robot to STARTING position")
        print("2. Press ENTER to record starting position")
        print("3. Move to next position")
        print("4. Enter seconds to reach position (0=fast, decimals OK)")
        print("5. Press ENTER to record position")
        print("6. Repeat steps 3-5 for each position")
        print("7. Press CTRL+C when done")
        
        self.torque_off()
        
        sequence = []
        position_num = 1
        
        try:
            # Starting position
            input(f"\nMove to STARTING position, press ENTER...")
            pos = self.get_positions()
            pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
            print(f"START: {pos_str}")
            
            sequence.append({
                "position": position_num,
                "positions": pos.copy(),
                "duration": 0.0  # Starting position has no timing
            })
            position_num += 1
            
            # Subsequent positions
            while True:
                print(f"\nMove to position {position_num}")
                duration_input = input(f"Enter seconds to reach position {position_num} (0=fast): ").strip()
                
                try:
                    duration = float(duration_input)
                    if duration < 0:
                        print("Duration cannot be negative")
                        continue
                except ValueError:
                    print("Invalid number")
                    continue
                
                # Record position
                pos = self.get_positions()
                pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
                time_desc = "FAST" if duration == 0 else f"{duration}s"
                print(f"Position {position_num}: {pos_str} ({time_desc})")
                
                sequence.append({
                    "position": position_num,
                    "positions": pos.copy(),
                    "duration": duration
                })
                position_num += 1
                
        except KeyboardInterrupt:
            print(f"\nRecording finished! Captured {len(sequence)} positions")
        
        # AUTOMATICALLY ADD STARTING POSITION AS FINAL POSITION
        if len(sequence) > 1:  # Only if we have more than just the start position
            start_position = sequence[0]["positions"].copy()
            sequence.append({
                "position": position_num,
                "positions": start_position,
                "duration": 1.0  # 1 second to return to start
            })
            print(f"Added return to START position as final step (1.0s)")
        
        # Save sequence
        Path("sequences").mkdir(exist_ok=True)
        file_path = Path("sequences") / f"{name}.json"
        
        data = {
            "name": name,
            "recorded_at": time.strftime("%Y-%m-%d %H:%M:%S"),
            "total_positions": len(sequence),
            "sequence": sequence
        }
        
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        print(f"Saved to: {file_path}")
        
        # Re-enable torque
        self.torque_on()
    
    def play_sequence(self, name):
        """Play back a recorded sequence"""
        file_path = Path("sequences") / f"{name}.json"
        
        if not file_path.exists():
            print(f"Sequence file not found: {file_path}")
            return
        
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        sequence = data["sequence"]
        
        print(f"\n{'='*60}")
        print(f"PLAYING SEQUENCE: {name}")
        print(f"{'='*60}")
        print(f"Positions: {len(sequence)}")
        
        # Show what we're going to do
        for step in sequence:
            pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in step["positions"].items()])
            duration = step["duration"]
            time_desc = "START" if duration == 0 and step["position"] == 1 else ("FAST" if duration == 0 else f"{duration}s")
            print(f"   {step['position']}. {pos_str} ({time_desc})")
        
        input("\nPress ENTER to start playback...")
        
        self.torque_on()
        
        # REMOVE LIMITS AGAIN before playback
        print("Ensuring limits are removed for playback...")
        for motor_name in self.motors_config.keys():
            try:
                self.motor_bus.write("Min_Angle_Limit", 0, motor_name)
                self.motor_bus.write("Max_Angle_Limit", 4095, motor_name)
            except:
                pass
        
        print(f"\nEXECUTING...")
        print("=" * 50)
        
        try:
            for i, step in enumerate(sequence):
                pos = step["positions"]
                duration = step["duration"]
                position_num = step["position"]
                
                pos_str = " | ".join([f"{motor}:{position:4d}" for motor, position in pos.items()])
                
                if position_num == 1:
                    # Go to starting position instantly
                    print(f"Position {position_num}: {pos_str} (moving to start)")
                    self.move_to_position(pos, 0)
                    time.sleep(1.2)  # Give a bit more time to reach start
                else:
                    if duration == 0:
                        print(f"Position {position_num}: {pos_str} (FAST)")
                        self.move_to_position(pos, 0)
                        time.sleep(1.0)  # Wait for fast movement 
                    else:
                        print(f"Position {position_num}: {pos_str} ({duration}s)")
                        self.move_to_position(pos, duration)
                        # Wait for the actual movement time (4x longer) plus buffer
                        actual_time = duration * 4.0
                        time.sleep(actual_time + 1.0)  # Wait for movement + larger buffer
            
            print(f"\nSEQUENCE COMPLETE!")
            
        except KeyboardInterrupt:
            print(f"\nPLAYBACK STOPPED")


def main():
    parser = argparse.ArgumentParser(description="Simple Position Sequence Recorder")
    parser.add_argument("--mode", choices=["record", "play"], required=True)
    parser.add_argument("--name", required=True, help="Sequence name")
    parser.add_argument("--port", default="COM15", help="Serial port")
    
    args = parser.parse_args()
    
    sequencer = SimplePositionSequencer(args.port)
    
    try:
        sequencer.connect()
        
        if args.mode == "record":
            sequencer.record_sequence(args.name)
        elif args.mode == "play":
            sequencer.play_sequence(args.name)
        
    except Exception as e:
        print(f"ERROR: {e}")
        import traceback
        traceback.print_exc()
        return 1
    
    finally:
        sequencer.disconnect()
    
    return 0


if __name__ == "__main__":
    exit(main())

Credits

donutsorelse
22 projects • 22 followers
I make different stuff every week of all kinds. Usually I make funny yet useful inventions.

Comments