"""
This code is a simple implementation of an autonomous RC car using a Raspberry Pi and OpenCV for computer vision.
This code derove inspiration from the following instructables:
User raja_961, “Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV”. Instructables.
URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
This code also derives inspiration from previous ELEC 424 Final Projects:
Team graDudes “graDudes”
URL: https://www.hackster.io/439751/gradudes-9d9a6e
Team Fish Fear Us “Diet Cyber Truck”
URL: https://www.hackster.io/fish-fear-us/diet-cyber-truck-483a7d
Team 8 “Autonomous Lane Keeping RC Car”
URL: https://www.hackster.io/team-8-not-elecs/autonomous-lane-keeping-rc-car-b52ee7
"""
import RPi.GPIO as GPIO # import the GPIO library
import time # import the time library
import cv2 # import the OpenCV library
import numpy as np # import the NumPy library
import math # import the math library
import matplotlib.pyplot as plt # import the Matplotlib library
from collections import deque # import the deque class from the collections module
GPIO.setwarnings(False) # ignore warnings
GPIO.setmode(GPIO.BCM) # set the GPIO mode to BCM
speed_pin = 18 # throttle control
servo_pin = 19 # steering control
GPIO.setup(speed_pin, GPIO.OUT) # set the GPIO pin for speed control as output
GPIO.setup(servo_pin, GPIO.OUT) # set the GPIO pin for steering control as output
speed = GPIO.PWM(speed_pin, 50) # set the switching frequency to 50 Hz
servo = GPIO.PWM(servo_pin, 50)
NEUTRAL_THROTTLE = 7.5 # idle (no movement)
OP_SPEED = 7.9 # normal operating speed
DEBOUNCE_TIME = 0.5 # seconds
RAMP_STEP = 0.05 # throttle increment/decrement step
SYSFS_RPM_PATH = '/sys/devices/platform/speedEncoderDevice/rpm' # path to the RPM file
BOOST_RPM_THRESHOLD = 200 # RPM above which we “boost”
BOOST_EXIT_THRESHOLD = BOOST_RPM_THRESHOLD * 0.9 # RPM below which we stop boosting
BOOST_DUTY_CYCLE = 8.1 # duty-cycle % when boosting
# Steering PD gains
Kp_steer = 0.1 # 0.1 good
Kd_steer = 0.005 # 0.001 good
# Steering PWM limits
y_steer_neutral = 7.5 # neutral
STEER_DELTA = 2.0 # ±2.5% around neutral
last_steer_error = 0.0 # last steering error
TARGET_PERIOD = 1/50.0 # target period in seconds
MAX_TURN_ANGLE_FOR_BOOST = 30 # degrees
last_time = time.time()
p_vals = [] # P values
d_vals = [] # D values
errors = [] # error values
speed_pwms = [] # Speed PWM values
turn_pwms = [] # Steering PWM values
# Start PWMs
speed.start(NEUTRAL_THROTTLE)
servo.start(y_steer_neutral)
time.sleep(2) # wait for 2 seconds
speed_pwm = OP_SPEED # initial speed PWM
rpm_hist = deque(maxlen=5) # Initialize the deque with a fixed size of 5 to store the last 5 RPM values, helps smooth out the RPM readings
def read_rpm():
# Read the RPM from the sysfs file
try:
with open(SYSFS_RPM_PATH, 'r') as f: # open the file in read mode
return int(f.read().strip()) # read the RPM value and convert it to an integer
except Exception as e: # handle any exceptions that occur
print("RPM read failed:", e) # print the error message
r = 0 # if the read fails, set the RPM to 0
rpm_hist.append(r) # append the RPM value to the deque
return sum(rpm_hist) / len(rpm_hist) # return the average of the last 5 RPM values
def detect_edges(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # convert the frame to HSV color space
lower_blue = np.array([90, 50, 50]) # define the lower blue color range
upper_blue = np.array([130, 255, 255]) # define the upper blue color range
mask = cv2.inRange(hsv, lower_blue, upper_blue) # create a mask for the blue color
#cv2.imshow("Blue Mask", mask) # display the mask
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5)) # create a rectangular kernel
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) # close small gaps
mask = cv2.GaussianBlur(mask, (5,5), 0) # smooth edges
edges = cv2.Canny(mask, 50, 150) # detect edges using Canny edge detection
#cv2.imshow("Edges", edges) # display the edges
return edges # return the edges
def detect_stop(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # convert the frame to HSV color space
lower_red1 = np.array([0, 100, 100]) # define the lower1 red color range
upper_red1 = np.array([10, 255, 255]) # define the upper1 red color range
lower_red2 = np.array([160, 100, 100]) # define the lower2 red color range
upper_red2 = np.array([179, 255, 255]) # define the upper2 red color range
mask1 = cv2.inRange(hsv, lower_red1, upper_red1) # create a mask1 for the red color
mask2 = cv2.inRange(hsv, lower_red2, upper_red2) # create a mask2 for the red color
red_mask = cv2.bitwise_or(mask1, mask2) # combine the two masks
#cv2.imshow("Red Mask", red_mask) # display the mask
num_red_pixels = cv2.countNonZero(red_mask) # count the number of red pixels in the mask
# print(f"Number of red pixels: {num_red_pixels}") # print the number of red pixels
return num_red_pixels >= 10000 # return True if the number of red pixels is greater than 10000, indicating a stop sign
def region_of_interest(edges):
height, width = edges.shape # get the height and width of the edges
mask = np.zeros_like(edges) # create a mask of the same size as the edges
polygon = np.array([[
(0, height),
(0, height//2),
(width, height//2),
(width, height),
]], dtype=np.int32) # define the polygon for the region of interest
cv2.fillPoly(mask, polygon, 255) # fill the polygon with white color
masked_edges = cv2.bitwise_and(edges, mask) # apply the mask to the edges
cv2.imshow("Masked Edges", masked_edges) # display the masked edges
return masked_edges # return the masked edges
def detect_line_segments(cropped_edges):
rho = 1 # distance resolution in pixels
theta = np.pi / 180 # angular resolution in radians
min_threshold = 10 # minimum number of votes to consider a line segment
lines = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold, np.array([]), minLineLength=5, maxLineGap=0) # detect line segments using Hough transform
return lines # return the detected line segments
def average_slope_intercept(frame, lines):
if lines is None or len(lines)==0: # check if any lines were detected
return [] # if no lines were detected, return an empty list
height, width, _ = frame.shape # get the height and width of the frame
left_fit, right_fit = [], [] # initialize lists to store left and right line segments
# classify
for x1,y1,x2,y2 in lines[:,0]: # iterate through the detected lines
slope = (y2-y1)/(x2-x1) if x2!=x1 else 999 # calculate the slope of the line segment
if abs(slope) < 0.5: # ignore horizontal lines
continue
intercept = y1 - slope*x1 # calculate the y-intercept of the line segment
if slope<0 and max(x1,x2)< width//2: # left line
left_fit.append((slope,intercept)) # append the slope and intercept to the left_fit list
elif slope>0 and min(x1,x2)> width//2: # right line
right_fit.append((slope,intercept)) # append the slope and intercept to the right_fit list
lane_lines = [] # initialize a list to store the lane lines
if left_fit: # if any left lines were detected
avg = np.mean(left_fit, axis=0) # calculate the average slope and intercept
lane_lines.append(make_points(frame, avg)) # append the lane line to the lane_lines list
if right_fit: # if any right lines were detected
avg = np.mean(right_fit, axis=0) # calculate the average slope and intercept
lane_lines.append(make_points(frame, avg)) # append the lane line to the lane_lines list
return lane_lines # return the detected lane lines
def make_points(frame, line):
height, width, _ = frame.shape # get the height and width of the frame
slope, intercept = line # unpack the slope and intercept
y1 = height # set the starting point at the bottom of the frame
y2 = int(y1 * 0.6) # set the ending point at 60% of the height
if slope == 0: # if the slope is 0, set the slope to a small value
slope = 0.01
x1 = int((y1 - intercept) / slope) # calculate the x-coordinate of the starting point
x2 = int((y2 - intercept) / slope) # calculate the x-coordinate of the ending point
return np.array([x1, y1, x2, y2]) # return the points of the line
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=10):
line_image = np.zeros_like(frame) # create an empty image of the same size as the frame
if lines is not None: # check if any lines were detected
for line in lines: # iterate through the detected lines
x1, y1, x2, y2 = line.reshape(4) # reshape the line points
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width) # draw the line on the image
return cv2.addWeighted(frame, 0.8, line_image, 1, 1) # blend the original frame with the line image
def get_steering_angle(frame, lane_lines):
height, width, _ = frame.shape # get the height and width of the frame
# if we have two lines, do the normal midpoint method
if len(lane_lines) == 2:
left_x2 = lane_lines[0][2] # left line x2
right_x2 = lane_lines[1][2] # right line x2
mid = width // 2 # midpoint of the frame
x_offset = (left_x2 + right_x2)/2 - mid # calculate the offset from the midpoint
# ** if only one line, do a “hard” corrective turn **
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0] # unpack the line points
# slope = positive → right line; slope = negative → left line
slope = (lane_lines[0][3] - lane_lines[0][1]) / (lane_lines[0][2] - lane_lines[0][0] + 1e-6) # calculate the slope
if slope < 0:
# Detected *left* lane only → steer hard *right*
x_offset = + (width * 0.5)
else:
# Detected *right* lane only → steer hard *left*
x_offset = - (width * 0.5)
# no lines at all: go straight
else:
x_offset = 0
y_offset = height / 2 # y-coordinate of the center of the frame
angle_rad = math.atan2(x_offset, y_offset) # calculate the angle in radians
return int(math.degrees(angle_rad) + 90) # convert the angle to degrees and add 90 degrees to center the steering angle
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
heading_image = np.zeros_like(frame) # create an empty image of the same size as the frame
height, width, _ = frame.shape # get the height and width of the frame
steering_angle_radian = steering_angle / 180.0 * math.pi # convert the steering angle to radians
x1 = int(width / 2) # x-coordinate of the center of the frame
y1 = height # y-coordinate of the center of the frame
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian)) # calculate the x-coordinate of the end point
y2 = int(height / 2) # y-coordinate of the end point
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width) # draw the line on the image
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1) # blend the original frame with the heading image
return heading_image # display the heading image
def plot_pd(p_vals, d_vals, error, show_img):
# Plot the PD
fig, ax1 = plt.subplots() # create a new figure and axis
t_ax = np.arange(len(p_vals)) # create an array of frame indices
ax1.plot(t_ax, p_vals, '-', label="P values") # plot the P values
ax1.plot(t_ax, d_vals, '-', label="D values") # plot the D values
ax2 = ax1.twinx() # create a second y-axis
ax2.plot(t_ax, error, '--r', label="Error") # plot the error values
ax1.set_xlabel("Frames") # set the x-axis label
ax1.set_ylabel("PD Value") # set the y-axis label for P and D values
ax2.set_ylim(-90, 90) # set the y-axis limits for the error values
ax2.set_ylabel("Error Value") # set the y-axis label for error values
plt.title("PD Values over time") # set the title of the plot
fig.legend() # add a legend to the plot
fig.tight_layout() # adjust the layout to prevent overlap
plt.savefig("./pd_plot.png") # save the plot to a file
#if show_img:
# plt.show()
plt.clf() # clear the current figure
def plot_pwm(speed_pwms, turn_pwms, error, show_img):
# Plot the PWM
fig, ax1 = plt.subplots() # create a new figure and axis
t_ax = np.arange(len(speed_pwms)) # create an array of frame indices
ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM") # plot the speed PWM values
ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM") # plot the steering PWM values
ax2 = ax1.twinx() # create a second y-axis
ax2.plot(t_ax, error, '--r', label="Error") # plot the error values
ax1.set_xlabel("Frames") # set the x-axis label
ax1.set_ylabel("PWM Values") # set the y-axis label for PWM values
ax2.set_ylabel("Error Value") # set the y-axis label for error values
plt.title("PWM Values over time") # set the title of the plot
fig.legend() # add a legend to the plot
plt.savefig("./pwm_plot.png") # save the plot to a file
#if show_img:
# plt.show()
plt.clf() # clear the current figure
video = cv2.VideoCapture(0) # open the camera
video.set(cv2.CAP_PROP_FRAME_WIDTH,320) # set the frame width to 320
video.set(cv2.CAP_PROP_FRAME_HEIGHT,120) # set the frame height to 120
last_stop_time = 0.0 # last time a stop sign was detected
STOP_COOLDOWN = 7.5 # seconds
STOP_DURATION = 3.0 # seconds
stopping = False # flag to indicate if the car is stopping
stop_start = 0.0 # start time of the stop
stop_count = 0 # number of stops made
last_time = time.time() # last time the loop was executed
above_start = None # start time of the boost
while True:
t_start = time.time() # start time of the loop
ret,frame = video.read() # read a frame from the camera
# frame = cv2.flip(frame,-1)
#cv2.imshow("raw frame", frame)
frame = cv2.resize(frame, (320, 120)) # resize the frame to 320x240, maybe also try cutting this in half to 160x120
if not ret: # check if the frame was read successfully
print("Camera frame grab failed")
break
#Calling the functions
edges = detect_edges(frame) # detect edges in the frame
roi = region_of_interest(edges) # define the region of interest
line_segments = detect_line_segments(roi) # detect line segments in the region of interest
lane_lines = average_slope_intercept(frame,line_segments) # calculate the average slope and intercept of the lane lines
lane_lines_image = display_lines(frame,lane_lines) # display the lane lines on the frame
steering_angle = get_steering_angle(frame, lane_lines) # calculate the steering angle based on the lane lines
heading_image = display_heading_line(lane_lines_image,steering_angle) # display the heading line on the frame
cv2.imshow("heading", heading_image) # display the heading image
if detect_stop(frame) and (t_start - last_stop_time) > STOP_COOLDOWN and not stopping: # detect a stop sign
stopping = True # set the stopping flag to True
stop_start = t_start # set the start time of the stop
last_stop_time = t_start # update the last stop time
stop_count += 1 # increment the stop count
print(f"{['First','Second'][stop_count-1]} stop: pausing for {STOP_DURATION}s") # print the stop message
if stopping: # if the car is stopping
if t_start - stop_start < STOP_DURATION: # if the stop duration has not expired
speed.ChangeDutyCycle(NEUTRAL_THROTTLE) # set the speed to neutral
servo.ChangeDutyCycle(y_steer_neutral) # set the steering to neutral
continue
# stop duration expired
stopping = False # reset stopping flag
last_time = t_start # reset last time
last_steer_error = steering_angle - 90 # reset last steering error
servo.ChangeDutyCycle(y_steer_neutral) # set the steering to neutral
if stop_count == 2: # if this is the second stop
print("Second stop complete: exiting loop") # exit the loop
speed.ChangeDutyCycle(NEUTRAL_THROTTLE) # disarm ESC
plot_pd(p_vals, d_vals, errors, show_img=True) # plot PD values
plot_pwm(speed_pwms, turn_pwms, errors, show_img=True) # plot PWM values
break
else:
print("First stop complete: resuming") # print the resume message
speed.ChangeDutyCycle(OP_SPEED) # set the speed to normal operating speed
# PD steering
now = time.time() # current time
last_time = now # last time the loop was executed
dev = steering_angle - 90 # calculate the deviation from the center
derivative = (dev - last_steer_error) / TARGET_PERIOD # calculate the derivative of the deviation
p_term = Kp_steer * dev # calculate the proportional term
d_term = Kd_steer * derivative # calculate the derivative term
pd = p_term + d_term # calculate the PD term
pd = max(-STEER_DELTA, min(STEER_DELTA, pd)) # limit the PD term to ±2.5%
servo_pwm = y_steer_neutral + pd # calculate the steering PWM value
servo_pwm = max(6.0, min(9.0, servo_pwm)) # limit the steering PWM value to 6.0-9.0
servo.ChangeDutyCycle(servo_pwm) # set the steering PWM value
print(f"Servo PWM = {servo_pwm}") # print the steering PWM value
target = speed_pwm # target speed PWM value
rpm = read_rpm() # read the RPM value
print(f"Raw history: {list(rpm_hist)}, Smoothed RPM: {rpm:.1f}") # print the smoothed RPM value
if abs(dev) < MAX_TURN_ANGLE_FOR_BOOST: # if the steering angle is within the boost range
if rpm < BOOST_RPM_THRESHOLD: # if the RPM is below the boost threshold
if above_start is None: # if the boost start time is not set
above_start = time.time() # set the boost start time
elif time.time() - above_start >= DEBOUNCE_TIME: # if the boost duration has expired
target = BOOST_DUTY_CYCLE # set the target speed to the boost duty cycle
else: # if the boost duration has not expired
target = speed_pwm # keep the target speed at the current speed
elif rpm > BOOST_EXIT_THRESHOLD: # if the RPM is above the boost exit threshold
above_start = None # reset the boost start time
target = OP_SPEED # set the target speed to normal operating speed
else: # if the RPM is within the boost range
target = speed_pwm # keep the target speed at the current speed
else: # if the steering angle is outside the boost range
above_start = None # reset the boost start time
target = OP_SPEED # set the target speed to normal operating speed
if speed_pwm < target: # if the current speed is less than the target speed
speed_pwm = min(speed_pwm + RAMP_STEP, target) # increase the speed PWM value
elif speed_pwm > target: # if the current speed is greater than the target speed
speed_pwm = max(speed_pwm - RAMP_STEP, target) # decrease the speed PWM value
print(f"Speed PWM = {speed_pwm}") # print the speed PWM value
speed.ChangeDutyCycle(speed_pwm) # set the speed PWM value
# Log for plotting
p_vals.append(p_term) # P term
d_vals.append(d_term) # D term
errors.append(dev) # error value
turn_pwms.append(servo_pwm) # Steering PWM value
speed_pwms.append(speed_pwm) # Speed PWM value
# Update state
last_steer_error = dev # update the last steering error
last_time = t_start # update the last time
if cv2.waitKey(1) == 27: # check if the ESC key is pressed
break
elapsed = time.time() - t_start # calculate the elapsed time
if elapsed < 1/50.0: # if the elapsed time is less than the target period
time.sleep(1/50.0 - elapsed) # sleep for the remaining time
video.release() # release the camera
cv2.destroyAllWindows() # destroy all OpenCV windows
speed.stop() # stop the speed PWM
servo.stop() # stop the steering PWM
GPIO.cleanup() # clean up the GPIO pins
Comments