import time
import cv2
import numpy as np
import math
import sys
import os
import RPi.GPIO as GPIO
import matplotlib.pyplot as plt
# === Team & Setup ===
# Ben Welchman, Junho Kim, Aaron Juarez
GPIO.setmode(GPIO.BCM)
# === GPIO SETUP ===
throttle_pin = 19 # ESC throttle
steer_pin = 21 # servo steering
GPIO.setup(throttle_pin, GPIO.OUT)
GPIO.setup(steer_pin, GPIO.OUT)
pwm_esc = GPIO.PWM(throttle_pin, 50)
pwm_serv = GPIO.PWM(steer_pin, 50)
pwm_esc.start(7.5) # neutral throttle
pwm_serv.start(7.5) # straight steering
# === Encoder & Speed PD Settings ===
mods = [m for m in os.listdir("/sys/module") if m.startswith("encoder")]
encoder_path = f"/sys/module/{mods[0]}/parameters/speed_ms"
print("Using encoder_path:", encoder_path)
desired_ms = 15.0 # target interval between pulses
kp, kd = 0.1, 0.045
base_speed = 8.1 # nominal running duty
throttle_min = 7.7 # full stop duty
throttle_max = 8.3 # full forward duty
_prev_err = None
_prev_time = None
# === Steering PD Settings ===
kp_s, kd_s = 0.3, 0.5
zero_turn = 7.5
servo_min = 6.0
servo_max = 9.0
# === Camera Settings (resolution size) ===
frame_w, frame_h = 250, 120
cam_idx = 0
# === Lane‑line EMA ===
ema_alpha = 0.2
prev_left = None
prev_right = None
# === Stop‑sign for Red‑floor ===
RED_THRESHOLD = 750 # pixel threshold for stopping at red light
# === Variables for plotting ===
p_vals = []
d_vals = []
err_vals = []
speed_vals = []
steer_vals = []
def read_encoder():
try:
with open(encoder_path, "r") as f:
return float(f.read().strip()) # read speed in ms
except Exception as e:
print(f"[read_encoder] ERROR: {e}", file=sys.stderr)
return None
def manage_speed():
"""
Adjust throttle duty cycle using PD control of encoder interval.
"""
global _prev_err, _prev_time
enc = read_encoder()
now = time.time()
if enc is None:
return base_speed
err = enc - desired_ms # positive if too slow
p_term = kp * err
if _prev_err is None:
d_term = 0.0
else:
dt = now - _prev_time
d_term = kd * ((err - _prev_err) / dt) if dt > 0 else 0.0
_prev_err, _prev_time = err, now
thr = base_speed + p_term + d_term # adjust throttle with PD control
thr = max(throttle_min, min(throttle_max, thr))
# append for plotting
err_vals.append(err)
p_vals.append(p_term)
d_vals.append(d_term)
speed_vals.append(thr)
# print for debugging
print(f"[manage_speed] enc={enc:.1f}ms err={err:.1f} p={p_term:.3f} d={d_term:.3f} → thr={thr:.3f}")
return thr
def detect_red_light(frame):
h, w, _ = frame.shape
y0 = int(h * 0.95)
roi = frame[y0:, :] # checks lower part of frame
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# red pixels
r1 = cv2.inRange(hsv, (0,120,120), (10,255,255))
r2 = cv2.inRange(hsv, (160,120,120), (180,255,255))
mask = cv2.bitwise_or(r1, r2) # combines two red hue ranges
return cv2.countNonZero(mask)
def detect_edges(frame):
# Apply hsv for better color contrast, thus better lane detection
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # blue for tape lanes
mask = cv2.inRange(hsv, (90,120,0), (120,255,255))
return cv2.Canny(mask, 50, 100)
def region_of_interest(edges):
h, w = edges.shape
mask = np.zeros_like(edges)
poly = np.array([[(0, int(h*0.1)), (w, int(h*0.1)), (w,h), (0,h)]], np.int32)
# try to get the the lane detection near car (not other regions)
cv2.fillPoly(mask, poly, 255)
return edges & mask
def detect_line_segments(cropped):
return cv2.HoughLinesP( # finds straight lines in cropped frame
cropped, 1, np.pi/180, 10,
np.array([]), minLineLength=5, maxLineGap=150
)
def make_points(frame, line):
h, _, _ = frame.shape
m, b = line # uses lane's start and end to map lines
if m == 0: m = 0.1
y1, y2 = h, h//2
x1 = int((y1 - b) / m)
x2 = int((y2 - b) / m)
return [[x1, y1, x2, y2]]
def average_slope_intercept(frame, segs):
global prev_left, prev_right
# if line detection fails (mainly due to camera frame rate)
if segs is None or len(segs) == 0: # uses previous lines
lanes = []
if prev_left is not None: lanes.append(make_points(frame, prev_left))
if prev_right is not None: lanes.append(make_points(frame, prev_right))
return lanes
h, w, _ = frame.shape
left_fit = []
right_fit = []
boundary = w * (1/3) # divides frame into thirds
for seg in segs:
x1, y1, x2, y2 = seg[0]
if x1 == x2: continue # skip vertical lines
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - slope * x1
if abs(slope) > 0.3 and x1 < boundary and x2 < boundary:
left_fit.append((slope, intercept))
elif abs(slope) > 0.3 and x1 > 2*boundary and x2 > 2*boundary:
right_fit.append((slope, intercept))
lanes = [] # smooths result using exp moving average
if left_fit:
avg_left = np.average(left_fit, axis=0)
prev_left = avg_left if prev_left is None else ema_alpha*avg_left + (1-ema_alpha)*prev_left
if right_fit:
avg_right = np.average(right_fit, axis=0)
prev_right = avg_right if prev_right is None else ema_alpha*avg_right + (1-ema_alpha)*prev_right
# averaged slopes and intercepts into line coordinates
if prev_left is not None: lanes.append(make_points(frame, prev_left))
if prev_right is not None: lanes.append(make_points(frame, prev_right))
return lanes
def display_lines(frame, lanes):
overlay = np.zeros_like(frame) # draws green line for lanes
for ln in lanes:
x1, y1, x2, y2 = ln[0]
cv2.line(overlay, (x1,y1), (x2,y2), (0,255,0), 6)
return cv2.addWeighted(frame, 0.8, overlay, 1, 1)
def get_steering_angle(frame, lanes):
# calculate angle between center of car and canter of lane
h, w, _ = frame.shape
if len(lanes) == 2:
_,_,lx2,_ = lanes[0][0]
_,_,rx2,_ = lanes[1][0]
off = ((lx2 + rx2) / 2) - (w / 2) # midpoint of lane
elif len(lanes) == 1:
x1, _, x2, _ = lanes[0][0]
off = x2 - x1
else:
off = 0 # no lane detected
return int(math.degrees(math.atan2(off, h/2))) + 90
def display_heading_line(frame, angle):
# draws red line showing car direction
overlay = np.zeros_like(frame)
h, w, _ = frame.shape
r = math.radians(angle)
x1, y1 = w//2, h
x2 = int(x1 - (h/2)/math.tan(r)) # projects angle up
y2 = h//2
cv2.line(overlay, (x1,y1), (x2,y2), (0,0,255), 5)
return cv2.addWeighted(frame, 0.8, overlay, 1, 1)
def main():
cap = cv2.VideoCapture(cam_idx) # set camera idx and frame
cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_h)
stopped, stop0 = False, 0
last_s, last_t = 0, time.time()
try:
while True:
ret, frame = cap.read()
if not ret:
break # exit if frame not captured
frame = cv2.flip(frame, 1)
# stop at red‑light
red_count = detect_red_light(frame)
print(f"[red_count] {red_count} (th={RED_THRESHOLD})")
if red_count > RED_THRESHOLD and not stopped:
pwm_esc.ChangeDutyCycle(throttle_min)
time.sleep(3) # first stop
stopped, stop0 = True, time.time()
edges = detect_edges(frame) # lane detection
roi = region_of_interest(edges)
segs = detect_line_segments(roi)
lanes = average_slope_intercept(frame, segs)
angle = get_steering_angle(frame, lanes) # angle calc
vis = display_heading_line(display_lines(frame, lanes), angle)
cv2.imshow("heading", vis)
# throttle control
thr = manage_speed()
print(f"[throttle PWM] {thr:.2f}%")
pwm_esc.ChangeDutyCycle(thr)
# steering PD
now = time.time()
dt = now - last_t
e_s = -(angle - 90)
raw = kp_s * e_s + kd_s * (e_s - last_s) / dt
raw = max(min(raw, 0.5), -1.0)
duty = zero_turn + raw * 2.6 # adjust to scale steer sensitivity
duty = max(servo_min, min(servo_max, duty))
pwm_serv.ChangeDutyCycle(duty)
last_s, last_t = e_s, now
# append steering duty for plotting
steer_vals.append(duty)
key = cv2.waitKey(1) & 0xFF
if key == 27 or key == ord('s'):
break
except KeyboardInterrupt:
print("\nprogram stopped...")
finally:
start_frame = 50
frames = range(start_frame, len(err_vals))
# plot 1
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.plot(frames, steer_vals[start_frame:], label='Steering Duty Cycle')
ax1.plot(frames, speed_vals[start_frame:], label='Speed Duty Cycle')
ax2.plot(frames, err_vals[start_frame:], linestyle='--', label='Error')
ax1.set_xlabel('Frame Number')
ax1.set_ylabel('Duty Cycle')
ax2.set_ylabel('Error')
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
plt.title('Duty Cycles & Error vs Frame Number (from frame 10)')
plt.savefig('plt1.png')
plt.close()
# plot 2
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.plot(frames, p_vals[start_frame:], linestyle='-', label='Proportional Response')
ax1.plot(frames, d_vals[start_frame:], linestyle='-.', label='Derivative Response')
ax2.plot(frames, err_vals[start_frame:], color='gray', linestyle='--', label='Error')
ax1.set_xlabel('Frame Number')
ax1.set_ylabel('PD Response')
ax2.set_ylabel('Error')
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
plt.title('PD Responses & Error vs Frame Number (from frame 10)')
plt.savefig('plt2.png')
plt.close()
# cleanup
pwm_esc.ChangeDutyCycle(throttle_min)
pwm_serv.ChangeDutyCycle(zero_turn)
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
Comments