# Modules
import cv2
import numpy as np
import math
import time
import os
import RPi.GPIO as GPIO
import matplotlib.pyplot as plt
"""
This code is based on:
User raja_961, “Autonomous Lane-Keeping Car Using Raspberry
Pi and OpenCV”. Instructables. URL:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
sing-Raspberry-Pi-and/
Team graDudes, https://www.hackster.io/439751/gradudes-9d9a6e
Team M.E.G.G, https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89
"""
encoder_path = "/sys/module/gpiod_driver/parameters/encoder_val" # To access encoder values from driver
encoder_values = []
max_rotate = 5000000
min_rotate = 8500000
delta = 0.0001
curr_speed = 0
# GPIO setup
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# Steering Motor Pins
steering_enable = 19
in1, in2 = 17, 27
# Speed Motor Pins
speed_enable = 18
in3, in4 = 23, 24
# Configure GPIO pins
GPIO.setup(in1, GPIO.OUT)
GPIO.setup(in2, GPIO.OUT)
GPIO.setup(in3, GPIO.OUT)
GPIO.setup(in4, GPIO.OUT)
GPIO.setup(speed_enable, GPIO.OUT)
GPIO.setup(steering_enable, GPIO.OUT)
# Initialize PWMs
gpio_servo_direction = (GPIO.HIGH, GPIO.LOW)
GPIO.output(in1, gpio_servo_direction[0])
GPIO.output(in2, gpio_servo_direction[1])
steering = GPIO.PWM(steering_enable, 50) # 50Hz for servo
GPIO.output(in3, GPIO.HIGH)
GPIO.output(in4, GPIO.LOW)
speed = GPIO.PWM(speed_enable, 50) # 50Hz for motor
NEUTRAL_STEERING = 7.5 # servo neutral duty
FORWARD_SPEED = 7.4 # motor constant speed duty
STEERING_TRIM = -0.1 # adjust mechanical bias
Kp = 0.06 # proportional steering gain
Kd = 0.6 # derivative steering gain ###
# Stop-sign detection state
passed_first = False
passed_second = False
red_prev = False
cooldown = 30 # frames to ignore after a stop
last_stop_tick = -999 # frame index of last stop
# Start PWMs: steering centered, motor off until move()
steering.start(NEUTRAL_STEERING + STEERING_TRIM)
speed.start(7) # start stopped
curr_speed = 7
# for stop sign detection
def isRedFloorVisible(frame):
# Gets color of image
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Color bounds
lower1 = np.array([0,10,30], np.uint8) # dark grey
upper1 = np.array([10,255,255], np.uint8) # light red
lower2 = np.array([170,10,30], np.uint8) # another gray
upper2 = np.array([180,255,255], np.uint8) # light blue
# Masks image (makes 2 seperate then combines)
mask1 = cv2.inRange(hsv, lower1, upper1)
mask2 = cv2.inRange(hsv, lower2, upper2)
mask = cv2.bitwise_or(mask1, mask2) # isolates red and blue
# Counts percentage of red on the screen after masking
pct = np.count_nonzero(mask) * 100.0 / mask.size
return (pct > 10), mask
# image processing
# Masks only the blue part of the image to see lanes
def detect_mask(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_blue = np.array([90,50,50]) # lower boundary
upper_blue = np.array([130,255,255]) # upper boundary
return cv2.inRange(hsv, lower_blue, upper_blue)
# detects and returns a picture with edges
def detect_edges(mask):
blur = cv2.GaussianBlur(mask, (5,5), 0)
return cv2.Canny(blur, 50, 150)
# isolates the image so the pi only looks at lower region of image
# this is to only look at a small portion of the lanes to make it easier
def region_of_interest(edges):
h, w = edges.shape
mask = np.zeros_like(edges)
poly = np.array([[(0,h),(0,h//2),(w,h//2),(w,h)]], np.int32)
cv2.fillPoly(mask, poly, 255)
return cv2.bitwise_and(edges, mask)
# Gives line segments that are in the direction of the lanes using the ROI edges picture
def detect_line_segments(roi):
return cv2.HoughLinesP(roi, rho=1, theta=np.pi/180,
threshold=20, minLineLength=20, maxLineGap=15)
# Gets the mathematical formulas (slope and intercept) for the left and right lines
def average_slope_intercept(frame, segments):
lines = []
# If there are no segments, just return the lines
if segments is None:
return lines
# Get image height & width, and then find vertical line in middle of image
# This division is to divide the picture in 2 to seperate left and right of image
h, w, _ = frame.shape
mid = w * 0.5
# This next block of code gets the slope and intercept for each line
left_fits, right_fits = [], []
# Iterates through each line segment
for seg in segments:
for x1,y1,x2,y2 in seg:
if x2==x1: continue # if line is vertical, skip
slope = (y2-y1)/(x2-x1) # calculates slope
intercept = y1 - slope*x1 # calculates intercept
# Checks if segment is in left side or right side
# Puts in respective array afterwards
if slope<0 and x2<mid:
left_fits.append((slope, intercept))
elif slope>0 and x1>mid:
right_fits.append((slope, intercept))
# Gets average slope and intercept for each line segment to get an average line
if left_fits:
ls, li = np.mean(left_fits, axis=0)
lines.append([[int((h-li)/ls), h, int((h/2-li)/ls), h//2]]) # Puts left line in final lines array
if right_fits:
rs, ri = np.mean(right_fits, axis=0)
lines.append([[int((h-ri)/rs), h, int((h/2-ri)/rs), h//2]]) # Puts right line in final lines array
return lines
# Calculates the angle of new trajectory based off the lines
def get_steering_angle(frame, lines):
# Same thing as last function, divides image into 2
h, w, _ = frame.shape
mid_x = w/2
# Gets center of image
lookahead = h/2
#
if len(lines)==2:
_,_,l2,_ = lines[0][0]
_,_,r2,_ = lines[1][0]
x_offset = ((l2+r2)/2) - mid_x
elif len(lines)==1:
x1,_,x2,_ = lines[0][0]
x_offset = x2 - x1
else:
x_offset = 0
angle = math.degrees(math.atan2(x_offset, lookahead)) + 90
return angle
# returns how much to adjust the speed by
def manage_speed():
# Opens encoder using driver and gets value of speed
f = open(encoder_path, "r")
encode_val = int(f.readline())
f.close()
# Puts in array of speeds
encoder_values.append(encode_val)
print("Calculated speed")
print(encode_val)
# Logic to change speed based on encoder value
if encode_val <= max_rotate:
print("reduce speed")
ret_val = -delta
elif encode_val >= min_rotate:
print("increase speed")
ret_val = delta
else:
print("maintain speed")
ret_val = 0 # Default if within range
return ret_val
# Puts lines displayed onto the webcam image on our computer
def display_lines(frame, lines):
img = np.zeros_like(frame)
for l in lines:
x1,y1,x2,y2 = l[0]
cv2.line(img,(x1,y1),(x2,y2),(0,255,0),4)
return cv2.addWeighted(frame,0.8,img,1,1)
# Puts the heading line (where the car is supposed to go) on our computer webcam image
def display_heading_line(frame, angle):
img = np.zeros_like(frame)
h, w, _ = frame.shape
# Cool math stuff to calculate heading
length = h//2
rad = math.radians(angle-90)
x1,y1 = w//2, h
x2 = int(x1 + length*math.sin(rad))
y2 = int(y1 - length*math.cos(rad))
# Displays line
cv2.line(img,(x1,y1),(x2,y2),(0,0,255),4)
return cv2.addWeighted(frame,0.8,img,1,1)
# Plots p, d and error vs frame number
def plot_pd(p_vals, d_vals, error, show_img=False):
fig, ax = plt.subplots()
t = np.arange(len(p_vals))
ax.plot(t, p_vals, '-', label="P values")
ax.plot(t, d_vals, '-', label="D values")
ax.plot(t, error, '--r', label="Error")
ax.set_xlabel("Frame #") # x axis label
# y axis labels
ax.set_ylabel("PD Output")
ax.set_ylabel("Error (deg)")
plt.title("PD Control Data")
fig.legend()
fig.tight_layout()
out = os.path.expanduser('~/pd_plot.png')
try:
plt.savefig(out) # saves the plot png
print(f"Saved PD plot to {out}")
except Exception as e:
print(f"Error saving PD plot: {e}")
if show_img:
plt.show()
plt.clf()
# Plots motor speed and steering duty vs frame #
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
# Plot the speed steering and the error
fig, ax1 = plt.subplots()
t_ax = np.arange(len(speed_pwms))
ax1.plot(t_ax, speed_pwms, '-', label="Speed")
ax1.plot(t_ax, turn_pwms, '-', label="Steering")
ax2 = ax1.twinx()
ax1.plot(t_ax, error / np.max(error), '--r', label="Error")
# axis labels
ax1.set_xlabel("Frames")
ax1.set_ylabel("Speed and Steer Duty Cycle")
ax2.set_ylabel("Percent Error Value")
plt.title("Speed and Steer Duty Cycle, and error v.s. time")
fig.legend()
out = os.path.expanduser('~/pwm_plot.png')
try:
# saves the pwm plot to current directory
plt.savefig(out)
print(f"Saved PWM plot to {out}")
except Exception as e:
print(f"Error saving PWM plot: {e}")
if show_img:
plt.show()
plt.clf()
def stop(): # stops motors and re-centers steering
speed.ChangeDutyCycle(0)
steering.ChangeDutyCycle(NEUTRAL_STEERING + STEERING_TRIM)
def move(): # starts moving forward at base speed
speed.ChangeDutyCycle(FORWARD_SPEED)
steering.ChangeDutyCycle(NEUTRAL_STEERING + STEERING_TRIM)
# main loop
if __name__=='__main__':
cap = cv2.VideoCapture(0) # opens camera
cap.set(cv2.CAP_PROP_FRAME_WIDTH,160) # video width
cap.set(cv2.CAP_PROP_FRAME_HEIGHT,120) # video height
print("Warming up camera...")
time.sleep(10) # waits 10s to ensure the camera is ready
# for PD plotting
proportional_vals = []
derivative_vals = []
error_vals = []
last_error = 0.0
last_time = time.time()
# for speed/steering plotting
speed_pwms = []
turn_pwms = []
move() # start driving
curr_speed = FORWARD_SPEED
frame_count = 0
red_prev = False
while True:
ret, frame = cap.read() #capture a frame
if not ret: # breaks if failed to capture
break
# checks for red stop sign every 15 frames
if frame_count % 15 == 0:
detected, _ = isRedFloorVisible(frame)
if detected and not red_prev and (frame_count - last_stop_tick > cooldown):
if not passed_first:
stop()
time.sleep(4) # wait at 1st stop sign for 4s
passed_first=True
last_stop_tick=frame_count
move() # start moving again
curr_speed = FORWARD_SPEED # set speed back
elif not passed_second:
stop() # stop the car permanently
passed_second=True
break
red_prev = detected
# Lane detection
mask = detect_mask(frame)
edges = detect_edges(mask)
roi = region_of_interest(edges)
segs = detect_line_segments(roi)
lanes = average_slope_intercept(frame, segs)
# Steering pd calculation
now = time.time() ###
dt = max(now - last_time, 1e3) ###
angle = get_steering_angle(frame, lanes)
error = angle - 90
proportional = Kp * error
derivative = Kd * (error - last_error) / dt ###
# log for plotting
proportional_vals.append(proportional)
derivative_vals.append(derivative)
error_vals.append(error)
# pwm plotting
speed_pwms.append(curr_speed)
# Compute and apply steering
turn = NEUTRAL_STEERING + STEERING_TRIM + proportional + derivative
# update with speed encoder every 10 ticks
if frame_count % 10 == 0:
# adjust speed
temp_speed = manage_speed() + curr_speed
# if the change in speed is not 0, update the speed
if temp_speed != curr_speed:
speed.ChangeDutyCycle(temp_speed)
curr_speed = temp_speed
turn = max(6.0, min(9.0, turn)) # servo limits
turn_pwms.append(turn)
steering.ChangeDutyCycle(turn)
# visualization
disp = display_lines(frame, lanes)
disp = display_heading_line(disp, angle)
cv2.imshow('Lane+Heading', disp)
if cv2.waitKey(1)==27:
break
frame_count += 1 # increment frame
# After loop ends, save PD plot
plot_pd(proportional_vals, derivative_vals, error_vals)
plot_pwm(speed_pwms, turn_pwms, error_vals)
# Cleanup
stop()
steering.stop()
speed.stop()
GPIO.cleanup()
cap.release()
cv2.destroyAllWindows()
Comments