Sujay Samuel
Published © GPL3+

PHROG - anti pest bot!

A robot which helps those with mobility impairments in pest monitoring and control

BeginnerWork in progress9 hours219
PHROG - anti pest bot!

Things used in this project

Story

Read more

Code

Python application (without ML)

Python
import asyncio
import tkinter as tk
from tkinter import messagebox
from bleak import BleakClient, BleakScanner
import cv2
from threading import Thread
from PIL import Image, ImageTk
import threading

# BLE constants
DEVICE_NAME = "MY_LBS2"
LBS_SERVICE_UUID = "00001523-1212-efde-1523-785feabcd123"
LBS_LED_UUID = "00001525-1212-efde-1523-785feabcd123"
Motor_Control_UUID = "00001526-1212-efde-1523-785feabcd123"

# Camera URL
CAMERA_URL = "http://192.168.1.6:81/stream"

class CarControllerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("PHROG Controller")
        self.client = None
        self.cap = None
        self.stream_thread = None
        self.device = None

        # Connect button
        self.connect_button = tk.Button(root, text="Connect to PHROG", command=self.connect_to_car)
        self.connect_button.pack(pady=10)

        # Control buttons
        self.control_frame = tk.Frame(root)
        self.control_frame.pack(pady=10)

        self.forward_button = tk.Button(self.control_frame, text="Forward", command=lambda: self.control_car(0x01))
        self.forward_button.grid(row=0, column=1)

        self.left_button = tk.Button(self.control_frame, text="Left", command=lambda: self.control_car(0x03))
        self.left_button.grid(row=1, column=0)

        self.stop_button = tk.Button(self.control_frame, text="Stop", command=lambda: self.control_car(0x00))
        self.stop_button.grid(row=1, column=1)

        self.right_button = tk.Button(self.control_frame, text="Right", command=lambda: self.control_car(0x04))
        self.right_button.grid(row=1, column=2)

        self.reverse_button = tk.Button(self.control_frame, text="Reverse", command=lambda: self.control_car(0x02))
        self.reverse_button.grid(row=2, column=1)

        # Pest Control button
        self.pest_control_button = tk.Button(self.control_frame, text="Pest Control", command=self.pest_control)
        self.pest_control_button.grid(row=3, column=1)

        # Video stream frame
        self.video_frame = tk.Frame(root)
        self.video_frame.pack(pady=10)

        self.video_label = tk.Label(self.video_frame)
        self.video_label.pack()

        self.quit_button = tk.Button(root, text="Quit", command=self.quit)
        self.quit_button.pack(pady=10)

    async def find_device(self):
        print("Searching for device...")
        devices = await BleakScanner.discover()
        for device in devices:
            if DEVICE_NAME in device.name:
                print(f"Found device: {device.name} ({device.address})")
                return device
        print("Device not found")
        return None

    async def connect_to_car_async(self):
        self.device = await self.find_device()
        if not self.device:
            messagebox.showerror("Error", "Car not found!")
            return

        try:
            self.client = BleakClient(self.device.address)
            await self.client.connect()
            messagebox.showinfo("Success", "Connected to car!")
            self.start_video_stream()  # Start video stream after connecting to the car
        except Exception as e:
            messagebox.showerror("Error", f"Failed to connect: {str(e)}")

    def connect_to_car(self):
        threading.Thread(target=self.run_ble_task).start()

    def run_ble_task(self):
        asyncio.run(self.connect_to_car_async())

    def control_car(self, command):
        if self.client and self.client.is_connected:
            asyncio.run(self.client.write_gatt_char(Motor_Control_UUID, bytearray([command])))
        else:
            messagebox.showerror("Error", "Not connected to car!")

    def pest_control(self):
        # Placeholder for future pest control functionality
        print("Pest control initiated!")
        # Here you can add code to initiate pest control actions, such as triggering the camera, 
        # activating sensors, spraying repellent, etc.

    def start_video_stream(self):
        self.cap = cv2.VideoCapture(CAMERA_URL)
        self.stream_thread = Thread(target=self.update_video_stream)
        self.stream_thread.start()

    def update_video_stream(self):
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert to RGB
                img = cv2.resize(frame, (640, 480))  # Resize frame
                img = Image.fromarray(img)  # Convert to PIL Image
                img_tk = ImageTk.PhotoImage(image=img)  # Convert to ImageTk PhotoImage
                self.video_label.config(image=img_tk)  # Update Tkinter label
                self.video_label.image = img_tk  # Keep a reference to the image
            else:
                print("Failed to grab frame")
                break

    def quit(self):
        if self.client:
            asyncio.run(self.client.disconnect())
        if self.cap:
            self.cap.release()
        self.root.destroy()

if __name__ == "__main__":
    root = tk.Tk()
    app = CarControllerApp(root)
    root.mainloop()

BLE Firmware

C/C++
/*
 * Copyright (c) 2023 Nordic Semiconductor ASA
 *
 * SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
 */

#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
#include <zephyr/bluetooth/bluetooth.h>
#include <zephyr/bluetooth/gap.h>
#include <zephyr/bluetooth/uuid.h>
#include <zephyr/bluetooth/conn.h>
#include<zephyr/drivers/gpio.h>
#include <hal/nrf_gpio.h>
#include <dk_buttons_and_leds.h>
/* STEP 7 - Include the header file of MY LBS customer service */
#include "my_lbs.h"


#define GPIO_PIN_13  NRF_GPIO_PIN_MAP(1,13)
#define GPIO_PIN_12  NRF_GPIO_PIN_MAP(1,12)
#define GPIO_PIN_11  NRF_GPIO_PIN_MAP(1,11)
#define GPIO_PIN_10  NRF_GPIO_PIN_MAP(1,10)
#define NAPH_PIN     NRF_GPIO_PIN_MAP(1,8)
#define MOSQ_PIN     NRF_GPIO_PIN_MAP(1,6)

static struct bt_le_adv_param *adv_param = BT_LE_ADV_PARAM(
	(BT_LE_ADV_OPT_CONNECTABLE |
	 BT_LE_ADV_OPT_USE_IDENTITY), /* Connectable advertising and use identity address */
	800, /* Min Advertising Interval 500ms (800*0.625ms) */
	801, /* Max Advertising Interval 500.625ms (801*0.625ms) */
	NULL); /* Set to NULL for undirected advertising */

LOG_MODULE_REGISTER(Lesson4_Exercise1, LOG_LEVEL_INF);

#define DEVICE_NAME CONFIG_BT_DEVICE_NAME
#define DEVICE_NAME_LEN (sizeof(DEVICE_NAME) - 1)

#define RUN_STATUS_LED DK_LED1
#define CON_STATUS_LED DK_LED2

/* STEP 8.1 - Specify the LED to control */
#define USER_LED DK_LED3

/* STEP 9.1 - Specify the button to monitor */
#define USER_BUTTON DK_BTN1_MSK

#define RUN_LED_BLINK_INTERVAL 1000

static bool app_button_state;

static const struct bt_data ad[] = {
	BT_DATA_BYTES(BT_DATA_FLAGS, (BT_LE_AD_GENERAL | BT_LE_AD_NO_BREDR)),
	BT_DATA(BT_DATA_NAME_COMPLETE, DEVICE_NAME, DEVICE_NAME_LEN),

};

static const struct bt_data sd[] = {
	BT_DATA_BYTES(BT_DATA_UUID128_ALL, BT_UUID_LBS_VAL),
};

/* STEP 8.2 - Define the application callback function for controlling the LED */
static void app_led_cb(bool led_state)
{
	dk_set_led(USER_LED, led_state);
}

/* STEP 9.2 - Define the application callback function for reading the state of the button */
static bool app_button_cb(void)
{
	return app_button_state;
}

static void app_naph_cb(bool valve_state)
{
	if (valve_state){
	nrf_gpio_pin_set(NAPH_PIN);
	}
	else{
	nrf_gpio_pin_clear(NAPH_PIN);
	}

}

static void app_mosq_cb(bool valve_state)
{
	if (valve_state){
	nrf_gpio_pin_set(MOSQ_PIN);
	}
	else{
	nrf_gpio_pin_clear(MOSQ_PIN);
	}

}

static void app_motor_cb(int motor_state)
{
	switch (motor_state){
		case 0: // Stop
		 nrf_gpio_pin_clear(GPIO_PIN_13);
		 nrf_gpio_pin_clear(GPIO_PIN_12);
		 nrf_gpio_pin_clear(GPIO_PIN_11);
		 nrf_gpio_pin_clear(GPIO_PIN_10);
		 break;
		case 1:// Forward
		 nrf_gpio_pin_set(GPIO_PIN_13);
		 nrf_gpio_pin_clear(GPIO_PIN_12);
		 nrf_gpio_pin_set(GPIO_PIN_11);
		 nrf_gpio_pin_clear(GPIO_PIN_10);
		 break;
		case 2:// Reverse
		 nrf_gpio_pin_clear(GPIO_PIN_13);
		 nrf_gpio_pin_set(GPIO_PIN_12);
		 nrf_gpio_pin_clear(GPIO_PIN_11);
		 nrf_gpio_pin_set(GPIO_PIN_10);
		 break;
		case 3:// Left
		 nrf_gpio_pin_clear(GPIO_PIN_13);
		 nrf_gpio_pin_clear(GPIO_PIN_12);
		 nrf_gpio_pin_set(GPIO_PIN_11);
		 nrf_gpio_pin_clear(GPIO_PIN_10);
		 break;
		case 4:// Right
		 nrf_gpio_pin_set(GPIO_PIN_13);
		 nrf_gpio_pin_clear(GPIO_PIN_12);
		 nrf_gpio_pin_clear(GPIO_PIN_11);
		 nrf_gpio_pin_clear(GPIO_PIN_10);
		 break;	 
		 
	}
}


/* STEP 10 - Declare a varaible app_callbacks of type my_lbs_cb and initiate its members to the applications call back functions we developed in steps 8.2 and 9.2. */
static struct my_lbs_cb app_callbacks = {
	.led_cb = app_led_cb,
	.button_cb = app_button_cb,
	.motor_cb = app_motor_cb,
	.naph_cb = app_naph_cb,
	.mosq_cb = app_mosq_cb,
	
};

static void button_changed(uint32_t button_state, uint32_t has_changed)
{
	if (has_changed & USER_BUTTON) {
		uint32_t user_button_state = button_state & USER_BUTTON;
		app_button_state = user_button_state ? true : false;
	}
}
static void on_connected(struct bt_conn *conn, uint8_t err)
{
	if (err) {
		printk("Connection failed (err %u)\n", err);
		return;
	}

	printk("Connected\n");

	dk_set_led_on(CON_STATUS_LED);
}

static void on_disconnected(struct bt_conn *conn, uint8_t reason)
{
	printk("Disconnected (reason %u)\n", reason);

	dk_set_led_off(CON_STATUS_LED);
}

struct bt_conn_cb connection_callbacks = {
	.connected = on_connected,
	.disconnected = on_disconnected,
};

static int init_button(void)
{
	int err;

	err = dk_buttons_init(button_changed);
	if (err) {
		printk("Cannot init buttons (err: %d)\n", err);
	}

	return err;
}

int main(void)
{
	int blink_status = 0;
	int err;
    

	nrf_gpio_cfg_output(GPIO_PIN_13);
	nrf_gpio_cfg_output(GPIO_PIN_12);
	nrf_gpio_cfg_output(GPIO_PIN_11);
	nrf_gpio_cfg_output(GPIO_PIN_10);
	LOG_INF("Starting Lesson 4 - Exercise 1 \n");

	err = dk_leds_init();
	if (err) {
		LOG_ERR("LEDs init failed (err %d)\n", err);
		return -1;
	}

	err = init_button();
	if (err) {
		printk("Button init failed (err %d)\n", err);
		return -1;
	}

	err = bt_enable(NULL);
	if (err) {
		LOG_ERR("Bluetooth init failed (err %d)\n", err);
		return -1;
	}
	bt_conn_cb_register(&connection_callbacks);

	/* STEP 11 - Pass your application callback functions stored in app_callbacks to the MY LBS service */
	err = my_lbs_init(&app_callbacks);
	if (err) {
		printk("Failed to init LBS (err:%d)\n", err);
		return -1;
	}
	LOG_INF("Bluetooth initialized\n");
	err = bt_le_adv_start(adv_param, ad, ARRAY_SIZE(ad), sd, ARRAY_SIZE(sd));
	if (err) {
		LOG_ERR("Advertising failed to start (err %d)\n", err);
		return -1;
	}

	LOG_INF("Advertising successfully started\n");

	for (;;) {
		dk_set_led(RUN_STATUS_LED, (++blink_status) % 2);
		k_sleep(K_MSEC(RUN_LED_BLINK_INTERVAL));
	}
}

Python script (with ML)

Python
from keras.models import load_model  # TensorFlow is required for Keras to work
import cv2  # Install opencv-python
import numpy as np

# Disable scientific notation for clarity
np.set_printoptions(suppress=True)

# Load the model
model = load_model("keras_Model.h5", compile=False)

# Load the labels
class_names = open("labels.txt", "r").readlines()

# CAMERA can be 0 or 1 based on default camera of your computer
camera = cv2.VideoCapture(0)

while True:
    # Grab the webcamera's image.
    ret, image = camera.read()

    # Resize the raw image into (224-height,224-width) pixels
    image = cv2.resize(image, (224, 224), interpolation=cv2.INTER_AREA)

    # Show the image in a window
    cv2.imshow("Webcam Image", image)

    # Make the image a numpy array and reshape it to the models input shape.
    image = np.asarray(image, dtype=np.float32).reshape(1, 224, 224, 3)

    # Normalize the image array
    image = (image / 127.5) - 1

    # Predicts the model
    prediction = model.predict(image)
    index = np.argmax(prediction)
    class_name = class_names[index]
    confidence_score = prediction[0][index]

    # Print prediction and confidence score
    print("Class:", class_name[2:], end="")
    print("Confidence Score:", str(np.round(confidence_score * 100))[:-2], "%")

    # Listen to the keyboard for presses.
    keyboard_input = cv2.waitKey(1)

    # 27 is the ASCII for the esc key on your keyboard.
    if keyboard_input == 27:
        break

camera.release()
cv2.destroyAllWindows()

Credits

Sujay Samuel
2 projects • 2 followers
Newbie Embedded Engineer. Highly interested in the fields of Embedded systems, VLSI and Machine Learning

Comments