Aaron JuarezBen WelchmanJunho KimJoshua Kang
Published

Joseph's Youngsters

An autonomous car using a Raspberry Pi with camera vision and real-time encoder feedback to stay in its lane.

IntermediateShowcase (no instructions)218
Joseph's Youngsters

Things used in this project

Hardware components

Raspberry Pi 5
Raspberry Pi 5
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Optical Encoder
×1
Portable Charger
×1
Screw
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Tape, Painters Tape
Tape, Painters Tape
Tape, Duct
Tape, Duct
Multitool, Screwdriver
Multitool, Screwdriver

Story

Read more

Code

encoder_overlay.dts

C/C++
Modified device tree source file for our optical encoder.
/dts-v1/;
/plugin/;

 / {

     compatible = "brcm,bcm2835"; // Specifies compatibility with Pi
     fragment@0 {
         target-path = "/";
         __overlay__ {
             encoder_device {
                 compatible = "encoder,elec424,youngsters";  // Match the .c file's match table
                 encoder-gpios = <&gpio 20 0>;    // GPIO 20 as input, no flags
             };
         };
     };
 };

encoder.c

C/C++
Driver code for our optical encoder.
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/of.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>

// Encoder pin info and timestamp tracking
static unsigned int irq_number;
static struct gpio_desc *encoder_gpio;
static volatile u64 prev_time_ns;
static volatile u64 curr_time_ns;

// Time between encoder pulses, in ms (used to calculate speed)
static int speed_ms = 0;
module_param(speed_ms, int, 0644);

// === IRQ Handler ===
static irqreturn_t encoder_irq_handler(int irq, void *dev_id) {
    int signal_level = gpiod_get_value(encoder_gpio);
    curr_time_ns = ktime_get_ns();

    unsigned long elapsed_ns = curr_time_ns - prev_time_ns;

    // Update speed if signal is high and debounce threshold passed
    if (signal_level == 1 && elapsed_ns > 1000000) {
        prev_time_ns = curr_time_ns;
        speed_ms = (int)(elapsed_ns / 1000000);  // Convert ns -> ms
        printk(KERN_INFO "encoder_irq_handler: interval = %d ms\n", speed_ms);
    }

    return IRQ_HANDLED;
}

// === Driver Probe ===
// Call when device is initialized
static int encoder_probe(struct platform_device *pdev) {
    struct device *dev = &pdev->dev;

    printk(KERN_INFO "encoder_probe: Initializing...\n");

    // Gets GPIO descriptor from device tree
    encoder_gpio = gpiod_get(dev, "encoder", GPIOD_IN);
    if (IS_ERR(encoder_gpio)) {
        printk(KERN_ERR "encoder_probe: Failed to acquire GPIO\n");
        return PTR_ERR(encoder_gpio);
    }

    gpiod_set_debounce(encoder_gpio, 1000000);  // 1 ms debounce

    // Gets the IRQ number
    irq_number = gpiod_to_irq(encoder_gpio);
    if (request_irq(irq_number, encoder_irq_handler, IRQF_TRIGGER_RISING, "encoder_irq", NULL)) {
        printk(KERN_ERR "encoder_probe: IRQ request failed\n");
        return -1;
    }

    // Time tracking
    prev_time_ns = ktime_get_ns();
    printk(KERN_INFO "encoder_probe: Success\n");
    return 0;
}

// === Driver Remove ===
// Called for when module unloaded
static void encoder_remove(struct platform_device *pdev) {
    free_irq(irq_number, NULL);
    printk(KERN_INFO "encoder_remove: Module unloaded\n");
}

// === Device Tree Match Table ===
// To tell kernel which device tree entries are supported
static const struct of_device_id encoder_of_match[] = {
    { .compatible = "encoder,elec424,youngsters", },
    { }
};
MODULE_DEVICE_TABLE(of, encoder_of_match);

// === Platform Driver ===
// Registers encoder driver with kernel
static struct platform_driver encoder_driver = {
    .probe  = encoder_probe,
    .remove = encoder_remove,
    .driver = {
        .name = "encoder_driver",
        .owner = THIS_MODULE,
        .of_match_table = of_match_ptr(encoder_of_match),
    },
};

module_platform_driver(encoder_driver); // platform driver registered

MODULE_LICENSE("GPL v2");
MODULE_AUTHOR("Joseph's Youngsters");
MODULE_DESCRIPTION("Optical Encoder Driver");
MODULE_ALIAS("platform:encoder_driver");

run.py

Python
Our main run script that handles everything from lane detection, steering and speed control, encoder feedback, and stop zone detection.
import time
import cv2
import numpy as np
import math
import sys
import os
import RPi.GPIO as GPIO
import matplotlib.pyplot as plt

# === Team & Setup ===
# Ben Welchman, Junho Kim, Aaron Juarez

GPIO.setmode(GPIO.BCM)

# === GPIO SETUP ===
throttle_pin = 19  # ESC throttle
steer_pin = 21  # servo steering

GPIO.setup(throttle_pin, GPIO.OUT)
GPIO.setup(steer_pin,    GPIO.OUT)

pwm_esc = GPIO.PWM(throttle_pin, 50)
pwm_serv = GPIO.PWM(steer_pin,    50)

pwm_esc.start(7.5)  # neutral throttle
pwm_serv.start(7.5) # straight steering

# === Encoder & Speed PD Settings ===
mods = [m for m in os.listdir("/sys/module") if m.startswith("encoder")]
encoder_path = f"/sys/module/{mods[0]}/parameters/speed_ms"
print("Using encoder_path:", encoder_path)

desired_ms = 15.0  # target interval between pulses
kp, kd = 0.1, 0.045
base_speed = 8.1   # nominal running duty
throttle_min = 7.7   # full stop duty
throttle_max = 8.3   # full forward duty
_prev_err = None
_prev_time = None

# === Steering PD Settings ===
kp_s, kd_s = 0.3, 0.5
zero_turn = 7.5
servo_min = 6.0
servo_max = 9.0

# === Camera Settings (resolution size) ===
frame_w, frame_h = 250, 120
cam_idx = 0

# === Lane‑line EMA ===
ema_alpha = 0.2
prev_left = None
prev_right = None

# === Stop‑sign for Red‑floor ===
RED_THRESHOLD = 750  # pixel threshold for stopping at red light

# === Variables for plotting ===
p_vals = []
d_vals = []
err_vals = []
speed_vals = []
steer_vals = []

def read_encoder():
    try:
        with open(encoder_path, "r") as f:
            return float(f.read().strip())  # read speed in ms
    except Exception as e:
        print(f"[read_encoder] ERROR: {e}", file=sys.stderr)
        return None

def manage_speed():
    """
    Adjust throttle duty cycle using PD control of encoder interval.
    """
    global _prev_err, _prev_time
    enc = read_encoder()
    now = time.time()
    if enc is None:
        return base_speed

    err = enc - desired_ms  # positive if too slow
    p_term = kp * err

    if _prev_err is None:
        d_term = 0.0
    else:
        dt = now - _prev_time 
        d_term = kd * ((err - _prev_err) / dt) if dt > 0 else 0.0

    _prev_err, _prev_time = err, now

    thr = base_speed + p_term + d_term # adjust throttle with PD control
    thr = max(throttle_min, min(throttle_max, thr))

    # append for plotting
    err_vals.append(err)
    p_vals.append(p_term)
    d_vals.append(d_term)
    speed_vals.append(thr)

    # print for debugging
    print(f"[manage_speed] enc={enc:.1f}ms err={err:.1f} p={p_term:.3f} d={d_term:.3f} → thr={thr:.3f}")
    
    return thr

def detect_red_light(frame):
    h, w, _ = frame.shape
    y0 = int(h * 0.95)
    roi = frame[y0:, :] # checks lower part of frame
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    
    # red pixels
    r1 = cv2.inRange(hsv, (0,120,120),   (10,255,255))
    r2 = cv2.inRange(hsv, (160,120,120), (180,255,255))
    mask = cv2.bitwise_or(r1, r2) # combines two red hue ranges
    return cv2.countNonZero(mask)

def detect_edges(frame):
    # Apply hsv for better color contrast, thus better lane detection
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # blue for tape lanes
    mask = cv2.inRange(hsv, (90,120,0), (120,255,255))
    return cv2.Canny(mask, 50, 100)

def region_of_interest(edges):
    h, w = edges.shape
    mask = np.zeros_like(edges)
    poly = np.array([[(0, int(h*0.1)), (w, int(h*0.1)), (w,h), (0,h)]], np.int32)
    
    # try to get the the lane detection near car (not other regions)
    cv2.fillPoly(mask, poly, 255)
    return edges & mask

def detect_line_segments(cropped):
    return cv2.HoughLinesP( # finds straight lines in cropped frame
        cropped, 1, np.pi/180, 10,
        np.array([]), minLineLength=5, maxLineGap=150
    )

def make_points(frame, line):
    h, _, _ = frame.shape
    m, b = line # uses lane's start and end to map lines
    if m == 0: m = 0.1
    y1, y2  = h, h//2
    x1 = int((y1 - b) / m)
    x2 = int((y2 - b) / m)
    return [[x1, y1, x2, y2]]

def average_slope_intercept(frame, segs):
    global prev_left, prev_right
    
    # if line detection fails (mainly due to camera frame rate)
    if segs is None or len(segs) == 0: # uses previous lines
        lanes = []
        if prev_left is not None:  lanes.append(make_points(frame, prev_left))
        if prev_right is not None: lanes.append(make_points(frame, prev_right))
        return lanes

    h, w, _ = frame.shape
    left_fit = []
    right_fit = []
    boundary = w * (1/3) # divides frame into thirds

    for seg in segs:
        x1, y1, x2, y2 = seg[0]
        if x1 == x2: continue # skip vertical lines
        slope = (y2 - y1) / (x2 - x1)
        intercept = y1 - slope * x1

        if abs(slope) > 0.3 and x1 < boundary and x2 < boundary:
            left_fit.append((slope, intercept))
        elif abs(slope) > 0.3 and x1 > 2*boundary and x2 > 2*boundary:
            right_fit.append((slope, intercept))

    lanes = [] # smooths result using exp moving average
    if left_fit:
        avg_left = np.average(left_fit, axis=0)
        prev_left = avg_left if prev_left is None else ema_alpha*avg_left + (1-ema_alpha)*prev_left
    if right_fit:
        avg_right = np.average(right_fit, axis=0)
        prev_right = avg_right if prev_right is None else ema_alpha*avg_right + (1-ema_alpha)*prev_right
    # averaged slopes and intercepts into line coordinates
    if prev_left is not None:  lanes.append(make_points(frame, prev_left))
    if prev_right is not None: lanes.append(make_points(frame, prev_right))
    return lanes

def display_lines(frame, lanes):
    overlay = np.zeros_like(frame) # draws green line for lanes
    
    for ln in lanes:
        x1, y1, x2, y2 = ln[0]
        cv2.line(overlay, (x1,y1), (x2,y2), (0,255,0), 6)
        
    return cv2.addWeighted(frame, 0.8, overlay, 1, 1)

def get_steering_angle(frame, lanes):
    # calculate angle between center of car and canter of lane
    h, w, _ = frame.shape
    if len(lanes) == 2:
        _,_,lx2,_ = lanes[0][0]
        _,_,rx2,_ = lanes[1][0]
        off = ((lx2 + rx2) / 2) - (w / 2) # midpoint of lane
    elif len(lanes) == 1:
        x1, _, x2, _ = lanes[0][0]
        off = x2 - x1
    else:
        off = 0 # no lane detected
    return int(math.degrees(math.atan2(off, h/2))) + 90

def display_heading_line(frame, angle):
    # draws red line showing car direction
    overlay = np.zeros_like(frame)
    h, w, _ = frame.shape
    r = math.radians(angle)
    x1, y1 = w//2, h
    x2 = int(x1 - (h/2)/math.tan(r)) # projects angle up
    y2 = h//2
    cv2.line(overlay, (x1,y1), (x2,y2), (0,0,255), 5)
    return cv2.addWeighted(frame, 0.8, overlay, 1, 1)

def main():
    cap = cv2.VideoCapture(cam_idx)  # set camera idx and frame
    cap.set(cv2.CAP_PROP_FRAME_WIDTH,  frame_w)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_h)

    stopped, stop0 = False, 0
    last_s, last_t = 0, time.time()

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break # exit if frame not captured
            frame = cv2.flip(frame, 1)

            # stop at red‑light
            red_count = detect_red_light(frame)
            print(f"[red_count] {red_count} (th={RED_THRESHOLD})")
            if red_count > RED_THRESHOLD and not stopped:
                pwm_esc.ChangeDutyCycle(throttle_min)
                time.sleep(3) # first stop
                stopped, stop0 = True, time.time()

            edges = detect_edges(frame) # lane detection
            roi = region_of_interest(edges)
            segs = detect_line_segments(roi)
            lanes = average_slope_intercept(frame, segs)
            angle = get_steering_angle(frame, lanes) # angle calc
            vis = display_heading_line(display_lines(frame, lanes), angle)
            cv2.imshow("heading", vis)

            # throttle control
            thr = manage_speed()
            print(f"[throttle PWM] {thr:.2f}%")
            pwm_esc.ChangeDutyCycle(thr)

            # steering PD
            now = time.time()
            dt  = now - last_t
            e_s = -(angle - 90)
            raw = kp_s * e_s + kd_s * (e_s - last_s) / dt
            raw = max(min(raw, 0.5), -1.0)
            duty = zero_turn + raw * 2.6  # adjust to scale steer sensitivity
            duty = max(servo_min, min(servo_max, duty))
            pwm_serv.ChangeDutyCycle(duty)
            last_s, last_t = e_s, now

            # append steering duty for plotting
            steer_vals.append(duty)

            key = cv2.waitKey(1) & 0xFF
            if key == 27 or key == ord('s'):
                break

    except KeyboardInterrupt:
        print("\nprogram stopped...")

    finally:
        start_frame = 50
        frames = range(start_frame, len(err_vals))

        # plot 1
        fig, ax1 = plt.subplots()
        ax2 = ax1.twinx()
        ax1.plot(frames, steer_vals[start_frame:], label='Steering Duty Cycle')
        ax1.plot(frames, speed_vals[start_frame:], label='Speed Duty Cycle')
        ax2.plot(frames, err_vals[start_frame:],   linestyle='--', label='Error')
        ax1.set_xlabel('Frame Number')
        ax1.set_ylabel('Duty Cycle')
        ax2.set_ylabel('Error')
        lines1, labels1 = ax1.get_legend_handles_labels()
        lines2, labels2 = ax2.get_legend_handles_labels()
        ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
        plt.title('Duty Cycles & Error vs Frame Number (from frame 10)')
        plt.savefig('plt1.png')
        plt.close()

        # plot 2
        fig, ax1 = plt.subplots()
        ax2 = ax1.twinx()
        ax1.plot(frames, p_vals[start_frame:], linestyle='-',  label='Proportional Response')
        ax1.plot(frames, d_vals[start_frame:], linestyle='-.', label='Derivative Response')
        ax2.plot(frames, err_vals[start_frame:], color='gray', linestyle='--', label='Error')
        ax1.set_xlabel('Frame Number')
        ax1.set_ylabel('PD Response')
        ax2.set_ylabel('Error')
        lines1, labels1 = ax1.get_legend_handles_labels()
        lines2, labels2 = ax2.get_legend_handles_labels()
        ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
        plt.title('PD Responses & Error vs Frame Number (from frame 10)')
        plt.savefig('plt2.png')
        plt.close()

        # cleanup
        pwm_esc.ChangeDutyCycle(throttle_min)
        pwm_serv.ChangeDutyCycle(zero_turn)
        cap.release()
        cv2.destroyAllWindows()


if __name__ == "__main__":
    main()

Credits

Aaron Juarez
1 project • 1 follower
Ben Welchman
1 project • 0 followers
Junho Kim
1 project • 1 follower
Joshua Kang
1 project • 0 followers

Comments