# Import required libraries
import cv2 # For image processing
import numpy as np # For numerical operations (arrays, masks, etc.)
import math # For trigonometric calculations
import RPi.GPIO as GPIO # To interface with Raspberry Pi GPIO
import time # For timing operations like delays
import threading # To run speed monitoring in parallel
import os # For checking and accessing file system paths
# Define GPIO pin assignments for steering, drive motor, and speed sensor
STEERING_PIN = 19 # GPIO pin controlling the steering servo
DRIVE_PIN = 18 # GPIO pin for motor speed (hardware PWM)
SPEED_SENSOR_PIN = 17 # Input pin from wheel encoder (hall effect)
# Initialize GPIO pin modes and PWM control
GPIO.setmode(GPIO.BCM) # Use BCM pin numbering
#GPIO.setup(DRIVE_PIN, GPIO.OUT)
GPIO.setup(STEERING_PIN, GPIO.OUT) # Steering pin set as output
GPIO.setup(SPEED_SENSOR_PIN, GPIO.IN) # Speed sensor as input
#drive_pwm = GPIO.PWM(DRIVE_PIN, 50)
steering_pwm = GPIO.PWM(STEERING_PIN, 50) # 50Hz PWM for steering servo
#drive_pwm.start(7.5)
steering_pwm.start(7.5) # Start at 7.5% duty (neutral steering)
# Setup for hardware PWM control via Linux sysfs interface
PWM_CHIP = "/sys/class/pwm/pwmchip0" # Path to PWM chip
PWM_CHANNEL = f"{PWM_CHIP}/pwm0" # Channel 0 under that chip
# If PWM channel is not yet exported (enabled), export it manually
if not os.path.exists(PWM_CHANNEL):
with open(f"{PWM_CHIP}/export", 'w') as f:
f.write("0")
time.sleep(0.1) # Give system time to create the path
# Set PWM frequency to 50Hz (i.e., 20,000,000 nanoseconds period)
with open(f"{PWM_CHANNEL}/period", 'w') as f:
f.write("20000000") # 20ms period for 50Hz
# Set initial duty cycle to 7.5% (corresponds to neutral throttle)
with open(f"{PWM_CHANNEL}/duty_cycle", 'w') as f:
f.write(str(int(20000000 * 0.075))) # 7.5% of 20ms = 1.5ms pulse width
# Enable the hardware PWM channel to begin sending signals
with open(f"{PWM_CHANNEL}/enable", 'w') as f:
f.write("1")
# Initialize variables for wheel speed tracking
pulse_count = 0 # Number of pulses received in current interval
current_rps = 0 # Rotations per second, computed from pulses
target_rps = 2.3 # Target RPS for cruise control
wheel_magnet_count = 20 # Pulses per wheel revolution
# Callback function for counting encoder pulses
def count_pulse(channel):
global pulse_count
pulse_count += 1 # Increment pulse count every falling edge
# Register the encoder pin to trigger count_pulse() on every falling edge
GPIO.add_event_detect(SPEED_SENSOR_PIN, GPIO.FALLING, callback=count_pulse)
# Background thread to compute real-time speed (RPS)
def speed_monitor():
global pulse_count, current_rps
while True:
time.sleep(0.2) # Sample speed every 0.2s
current_rps = pulse_count / wheel_magnet_count # Compute RPS
pulse_count = 0 # Reset pulse count for next window
# Launch the speed monitor in a background thread
threading.Thread(target=speed_monitor, daemon=True).start()
# Convert input BGR image to HSV color space for easier color filtering
def convert_to_HSV(frame):
return cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Detect edges by filtering out blue lane lines, then applying Canny edge detection
def detect_edges(hsv):
lower_blue = np.array([90, 120, 0], dtype=np.uint8) # Lower HSV bound for blue
upper_blue = np.array([150, 255, 255], dtype=np.uint8) # Upper HSV bound for blue
mask = cv2.inRange(hsv, lower_blue, upper_blue) # Mask out only blue areas
return cv2.Canny(mask, 50, 100) # Apply edge detection on mask
# Restrict detection area to lower part of image (road area) using a polygon mask
def region_of_interest(edges):
height, width = edges.shape # Get image dimensions
mask = np.zeros_like(edges) # Start with a black mask
polygon = np.array([[(0, height), (0, height//1.2), # Define a trapezoidal ROI
(width, height//1.2), (width, height)]], np.int32)
cv2.fillPoly(mask, polygon, 255) # Fill ROI area with white
return cv2.bitwise_and(edges, mask) # Keep only ROI part of edges
# Use Hough Transform to extract straight line segments from edge map
def detect_line_segments(cropped_edges):
return cv2.HoughLinesP(
cropped_edges, 1, np.pi / 180, 10, np.array([]), # Angle resolution = 1, threshold = 10
minLineLength=5, maxLineGap=0) # Keep small segments, no gap tolerance
# Average left and right lane lines based on slope and position
def average_slope_intercept(frame, line_segments):
height, width, _ = frame.shape
left_fit, right_fit = [], [] # Store slope/intercepts separately
if line_segments is None: # If no lines found, return nothing
return []
for segment in line_segments:
for x1, y1, x2, y2 in segment:
if x1 == x2: # Skip vertical lines (infinite slope)
continue
slope = (y2 - y1) / (x2 - x1) # Calculate slope
intercept = y1 - slope * x1 # Calculate y-intercept
if slope < 0 and x1 < width * 2 / 3 and x2 < width * 2 / 3:
left_fit.append((slope, intercept)) # Likely a left lane line
elif slope >= 0 and x1 > width / 3 and x2 > width / 3:
right_fit.append((slope, intercept)) # Likely a right lane line
lane_lines = []
if left_fit:
lane_lines.append(make_points(frame, np.average(left_fit, axis=0))) # Average left lines
if right_fit:
lane_lines.append(make_points(frame, np.average(right_fit, axis=0))) # Average right lines
return lane_lines
# Convert line slope and intercept into actual pixel coordinates to draw
def make_points(frame, line):
slope, intercept = line
height, width, _ = frame.shape
y1 = height # Bottom of the image
y2 = int(height / 2) # Middle of the image
if slope == 0:
slope = 0.1 # Avoid division by zero
x1 = int((y1 - intercept) / slope) # Calculate x for y1
x2 = int((y2 - intercept) / slope) # Calculate x for y2
return [[x1, y1, x2, y2]]
# Draw detected lane lines on the image
def display_lines(frame, lines, color=(0, 255, 0), width=6):
line_image = np.zeros_like(frame) # Blank image to draw on
if lines:
for x1, y1, x2, y2 in [line[0] for line in lines]:
cv2.line(line_image, (x1, y1), (x2, y2), color, width) # Draw each line
return cv2.addWeighted(frame, 0.8, line_image, 1, 1) # Overlay lines on original frame
# Compute the steering angle based on lane line positions
def get_steering_angle(frame, lane_lines):
height, width, _ = frame.shape
if len(lane_lines) == 2:
_, _, lx2, _ = lane_lines[0][0] # x2 of left lane
_, _, rx2, _ = lane_lines[1][0] # x2 of right lane
x_offset = (lx2 + rx2) / 2 - width / 2 # Offset from image center
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1 # Estimate curve direction
else:
x_offset = 0 # No line detected, go straight
y_offset = height / 2 # Fixed vertical reference
angle = math.atan2(x_offset, y_offset) # Calculate angle in radians
return int(angle * 180.0 / math.pi) + 90 # Convert to degrees (center = 90)
# Draw the steering direction on the output image
def display_heading_line(frame, angle, color=(0, 0, 255), width=5):
heading_image = np.zeros_like(frame)
height, width_frame, _ = frame.shape
rad = math.radians(angle)
x1 = width_frame // 2
y1 = height
x2 = int(x1 - height / 2 / math.tan(rad)) # Project heading line
y2 = height // 2
cv2.line(heading_image, (x1, y1), (x2, y2), color, width)
return cv2.addWeighted(frame, 0.8, heading_image, 1, 1) # Overlay onto original frame
# Convert the steering angle into a PWM value for servo motor control
def angle_to_pwm(angle):
angle = max(60, min(120, angle)) # Clamp angle range
if angle >= 90:
return 7.5 - (angle - 90) / 0.7 * (1.6 / 30)
else:
return 7.5 - (angle - 90) / 1.5 * (1.5 / 30)
# PD controller configuration
last_error = 0 # Store previous speed error for derivative term
last_time = time.time() # Time of last update for derivative calculation
Kp = 0.4 # Proportional gain: amplifies current speed error
Kd = Kp * 0.75 # Derivative gain: dampens change rate of error
base_speed = 7.5 # Base PWM duty cycle (neutral driving value)
# Compute PWM throttle using PD control (based on difference between target and actual speed)
# NOTE: In the final version, this PD control is computed but not used (hardware PWM is fixed)
def compute_throttle_pwm(angle):
global last_error, last_time
now = time.time() # Get current time
dt = now - last_time # Time elapsed since last update
error = target_rps - current_rps # Compute speed error
if error < 0: # Avoid negative error (overspeeding)
error = 0.01
P = Kp * error # Proportional component
D = Kd * (error - last_error) / dt if dt > 0 else 0 # Derivative component
last_error = error # Update last error
last_time = now # Update last time
return min(7.99, max(7.9, base_speed + P + D)) # Return clamped PWM value
# Detect red regions in the frame using HSV color filtering
# Combines two HSV ranges to detect both ends of red spectrum
def detect_red_stop(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Convert image to HSV color space
# Red can appear in two HSV ranges: low (0-10) and high (160-180)
lower_red1 = np.array([0, 100, 100])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([160, 100, 100])
upper_red2 = np.array([180, 255, 255])
# Create masks for both red ranges
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
# Combine both red masks
red_mask = cv2.bitwise_or(mask1, mask2)
# Return number of red pixels (normalized)
return np.sum(red_mask) / 255
# Video stream and smoothing buffers
video = cv2.VideoCapture(0) # Initialize webcam stream
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320) # Set frame width
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240) # Set frame height
# Buffers to store previous PWM values for smoothing
steering_buffer = []
drive_buffer = []
STEERING_BUFFER_SIZE = 15
DRIVE_BUFFER_SIZE = 15
# Initialize stop detection state
red_detected_count = 0 # How many stops encountered
red_detected = False # Whether red is currently detected
# Main control loop
try:
while True:
ret, frame = video.read() # Capture a frame from camera
if not ret:
continue # Skip if frame not valid
# Function to set drive motor PWM using sysfs
def set_hardware_pwm(duty_percent):
duty_ns = int(20000000 * duty_percent / 100.0) # Convert to nanoseconds
with open(f"{PWM_CHANNEL}/duty_cycle", 'w') as f:
f.write(str(duty_ns))
# ---------- Image Processing Pipeline ----------
hsv = convert_to_HSV(frame) # Convert to HSV
edges = detect_edges(hsv) # Edge detection (blue lines)
roi = region_of_interest(edges) # Apply region mask
segments = detect_line_segments(roi) # Extract line segments
lanes = average_slope_intercept(frame, segments) # Fit left/right lane lines
angle = get_steering_angle(frame, lanes) # Determine steering angle
lane_img = display_lines(frame, lanes) # Draw lane lines
final_img = display_heading_line(lane_img, angle) # Draw heading line
# ---------- Compute PWM values ----------
target_steering_pwm = angle_to_pwm(angle) # Convert angle to PWM for steering
target_drive_pwm = compute_throttle_pwm(angle) # Compute throttle (though unused)
# ---------- Apply Moving Average ----------
steering_buffer.append(target_steering_pwm)
drive_buffer.append(target_drive_pwm)
if len(steering_buffer) > STEERING_BUFFER_SIZE:
steering_buffer.pop(0)
if len(drive_buffer) > DRIVE_BUFFER_SIZE:
drive_buffer.pop(0)
average_steering_pwm = sum(steering_buffer) / len(steering_buffer)
average_drive_pwm = sum(drive_buffer) / len(drive_buffer)
# ---------- Stop Box Detection ----------
red_area = detect_red_stop(frame)
if red_area > 15000: # Large red area detected
if not red_detected:
red_detected = True
red_detected_count += 1
if red_detected_count == 1: # First stop box
print("First stop detected: pausing 3 seconds")
time.sleep(1)
set_hardware_pwm(7.5) # Pause motor
time.sleep(3)
set_hardware_pwm(average_drive_pwm)
elif red_detected_count == 2: # Second stop box
print("Second stop detected: stopping permanently")
time.sleep(0.5)
set_hardware_pwm(7.5) # Final stop
break
else:
if red_area < 100: # Reset if red disappears
red_detected = False
# ---------- Apply PWM to Motors ----------
steering_pwm.ChangeDutyCycle(average_steering_pwm) # Steering servo
set_hardware_pwm(average_drive_pwm) # Drive motor
print(average_drive_pwm)
# ---------- Show Camera Debug View ----------
cv2.imshow("Lane Following", final_img)
cv2.imshow("roi", roi)
if cv2.waitKey(1) & 0xFF == 27: # ESC key to exit
break
# Shutdown and Cleanup
finally:
video.release()
cv2.destroyAllWindows()
steering_pwm.ChangeDutyCycle(7.5) # Reset steering to center
set_hardware_pwm(7.5) # Reset drive motor
time.sleep(1)
steering_pwm.stop()
#drive_pwm.stop()
GPIO.cleanup() # Release all GPIO resources
Comments