Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Raul Alvarez-Torrico
Published © GPL3+

BVLOS Drone Aiding Cattle Disease Diagnostics and Treatment

Transports biological samples for diagnosis and prescribed medicine from/to remote areas.

IntermediateWork in progressOver 2 days58

Things used in this project

Story

Read more

Schematics

Block diagram

block diagram

Code

Offboard navigation ROS 2 node

C/C++
/****************************************************************************
 *
 * Copyright 2020 PX4 Development Team. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this
 * list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors
 * may be used to endorse or promote products derived from this software without
 * specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 ****************************************************************************/

/**
 * @brief Offboard control example
 * @file offboard_control.cpp
 * @addtogroup examples
 * @author Mickey Cowden <info@cowden.tech>
 * @author Nuno Marques <nuno.marques@dronesolutions.io>

 * The TrajectorySetpoint message and the OFFBOARD mode in general are under an ongoing update.
 * Please refer to PR: https://github.com/PX4/PX4-Autopilot/pull/16739 for more info.
 * As per PR: https://github.com/PX4/PX4-Autopilot/pull/17094, the format
 * of the TrajectorySetpoint message shall change.
 */

#include <px4_msgs/msg/offboard_control_mode.hpp>
#include <px4_msgs/msg/trajectory_setpoint.hpp>
#include <px4_msgs/msg/vehicle_command.hpp>
#include <px4_msgs/msg/vehicle_control_mode.hpp>
#include <rclcpp/rclcpp.hpp>
#include <stdint.h>

// Subscribers
#include <px4_msgs/msg/sensor_combined.hpp>
#include <px4_msgs/msg/vehicle_local_position.hpp>
#include "std_msgs/msg/string.hpp"
#include "geometry_msgs/msg/point.hpp"
// #include <geometry_msgs/Point.h>
#include <px4_msgs/msg/vehicle_local_position.hpp>
#include <px4_msgs/msg/vehicle_attitude.hpp>
#include <geometry_msgs/msg/pose_stamped.hpp>

#include <tf2/LinearMath/Quaternion.h>
#include <tf2/LinearMath/Matrix3x3.h>

#define RAD2DEG 57.295779513

// using std::placeholders::_1;
using namespace std::placeholders;

#include <string>
#include <stdlib.h>     /* abs */
#include <iomanip>	/* For printing two-decimal floats (std) */

#include <chrono>
#include <iostream>

using namespace std::chrono;
using namespace std::chrono_literals;
using namespace px4_msgs::msg;

#define DES_START_ALTITUDE  50.0
#define MIN_AVOID_ALTITUDE  30.0

unsigned int image_height = 1080;
unsigned int image_width = 1920;
unsigned int x_ax_pos = image_height/2 - 1; // 539 NOTE: Perhaps the "- 1" is not needed
unsigned int y_ax_pos = image_width/2 - 1; // 959

unsigned int MAX_X_VEL = 5;
unsigned int MAX_Y_VEL = 3;

static const char *states_str[] =
        { "STATE_IDLE", "STATE_TAKING_OFF", "STATE_FLYING_MISSION", "STATE_AVOIDING", "STATE_LANDING" };

class OffboardControl : public rclcpp::Node
{
public:
	OffboardControl() : Node("offboard_control")
	{

		offboard_control_mode_publisher_ = this->create_publisher<OffboardControlMode>("/fmu/in/offboard_control_mode", 10);
		trajectory_setpoint_publisher_ = this->create_publisher<TrajectorySetpoint>("/fmu/in/trajectory_setpoint", 10);
		vehicle_command_publisher_ = this->create_publisher<VehicleCommand>("/fmu/in/vehicle_command", 10);

		offboard_setpoint_counter_ = 0;

		vehicle_state = STATE_TAKING_OFF;	
		last_vehicle_state = STATE_IDLE;
		is_avoiding = false;
		avoid_coord_flag = false;

		auto timer_callback = [this]() -> void {

			if(vehicle_state != last_vehicle_state) {
				// std::cout << "\n";
				// std::cout << "New Vehicle state:" << states_str[vehicle_state]  << std::endl;
				// std::cout << "\n"   << std::endl;
				last_vehicle_state = vehicle_state;

				if(vehicle_state == STATE_TAKING_OFF || vehicle_state == STATE_FLYING_MISSION) {
					offb_ctrl_mode_msg.position = false;
					offb_ctrl_mode_msg.velocity = false;
					offb_ctrl_mode_msg.acceleration = false;
					offb_ctrl_mode_msg.attitude = false;
					offb_ctrl_mode_msg.body_rate = false;
					traj_setpoint_msg.position = {NAN, NAN, NAN};
					traj_setpoint_msg.velocity = {NAN, NAN, NAN};
					traj_setpoint_msg.yaw = yaw; // [-PI:PI]
				}
			}

			switch(vehicle_state) {
				case STATE_TAKING_OFF:
					// Arm the vehicle
					this->arm();
					this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 4); // Mission
					////// this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 2); // Takeoff
					vehicle_state = STATE_FLYING_MISSION;
				break;
				case STATE_FLYING_MISSION:
					if(vehicle_local_pos.z <= -MIN_AVOID_ALTITUDE && avoid_coord_flag) {
						vehicle_state = STATE_AVOIDING;
						is_avoiding = true;
					}

				break;
				case STATE_AVOIDING:
					// Drone < Camera
					// X < X | Z < Y
					x_vel = (avoid_coord.x/y_ax_pos) * MAX_X_VEL;
					y_vel = (avoid_coord.y/x_ax_pos) * MAX_Y_VEL;

					// Compute avoidance velocities in NED coord.
					desired_vel_e = x_vel*cos(yaw);
					desired_vel_n = -x_vel*sin(yaw);

					if(vehicle_local_pos.z < (-MIN_AVOID_ALTITUDE)) { // Command lower altitud if min. not reached
						desired_vel_d = -y_vel;
					} else { // Do not go below min. altitude
						desired_vel_d = 0.0;
					}
					
					offb_ctrl_mode_msg.velocity = true;
					traj_setpoint_msg.velocity = {desired_vel_n, desired_vel_e, desired_vel_d};

					// Set last known yaw angle from Mission mode
					offb_ctrl_mode_msg.attitude = true;
					traj_setpoint_msg.yaw = yaw; // [-PI:PI]


					if (offboard_setpoint_counter_ == 10) {
						// Change to Offboard mode after 10 setpoints
						this->publish_vehicle_command(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 6); // Offboard
					}

					//offboard_control_mode needs to be paired with trajectory_setpoint
					publish_offboard_control_mode();
					publish_trajectory_setpoint();

					// stop the counter after reaching 11
					if (offboard_setpoint_counter_ < 11) {
						offboard_setpoint_counter_++;
					}

					if(!avoid_coord_flag) {
						vehicle_state = STATE_FLYING_MISSION;
						is_avoiding = false;
						this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 4); // Mission
						offboard_setpoint_counter_ = 0;
					}

				break;
				case STATE_LANDING:
				break;
				default: break;
			}
		};
		timer_ = this->create_wall_timer(100ms, timer_callback);

		auto print_timer_callback = [this]() -> void {
			std::cout << std::fixed;	// For printing two-decimal floats
    		std::cout << std::setprecision(2); // For printing two-decimal floats
			std::cout << "\n\n\n\n";
			std::cout << "++STORED VEHICLE LOCAL POSITION DATA"   << std::endl;
			std::cout << "============================="   << std::endl;
			std::cout << "vehicle_local_pos.timestamp: " << vehicle_local_pos.timestamp << std::endl;
			std::cout << "vehicle_local_pos pos (" << vehicle_local_pos.x<< ", " << vehicle_local_pos.y<< ", " << vehicle_local_pos.z << ")" << std::endl;
			std::cout << "vehicle_local_pos vel (" << vehicle_local_pos.vx<< ", " << vehicle_local_pos.vy<< ", " << vehicle_local_pos.vz << ")" << std::endl;
			std::cout << "avoid_coord: (" << avoid_coord.x << ", " << avoid_coord.y << ", " << avoid_coord.z << ")" << std::endl;
			std::cout << "avoid_coord_flag: " << avoid_coord_flag << std::endl;
			std::cout << "yaw: " << yaw * RAD2DEG << " | alpha: " << alpha * RAD2DEG << std::endl;
			// std::cout << "att_yaw_ned: " << att_yaw_ned * RAD2DEG << std::endl;
			std::cout << "att(" << att_roll*RAD2DEG << ", " << att_pitch*RAD2DEG << ", " << att_yaw*RAD2DEG << ")" << std::endl;
			std::cout << "desired_vel_e : " << desired_vel_e << " | desired_vel_n: " << desired_vel_n << std::endl;// desired_vel_e 
			std::cout << "Vehicle state:" << states_str[vehicle_state]  << std::endl;
		};
		print_timer_ = this->create_wall_timer(1000ms, print_timer_callback);

		// Subscribers
		rmw_qos_profile_t qos_profile = rmw_qos_profile_sensor_data;
		auto qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 5), qos_profile);

		vlp_subscription_ = this->create_subscription<px4_msgs::msg::VehicleLocalPosition>("/fmu/out/vehicle_local_position", qos,
		[this](const px4_msgs::msg::VehicleLocalPosition::UniquePtr msg) {
			vehicle_local_pos.timestamp = msg->timestamp;
			vehicle_local_pos.x = msg->x;	// North position in NED earth-fixed frame, (metres)
			vehicle_local_pos.y = msg->y;	// East position in NED earth-fixed frame, (metres)
			vehicle_local_pos.z = msg->z;	// Down position (negative altitude) in NED earth-fixed frame, (metres)
			vehicle_local_pos.vx = msg->vx;	// North velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.vy = msg->vy;	// East velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.vz = msg->vz;	// Down velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.heading = msg->heading; // Euler yaw angle transforming the tangent plane relative to NED earth-fixed frame, -PI..+PI,  (radians)

			yaw = vehicle_local_pos.heading;
			// Compute alpha (X coordinate angle WRT East in ENU system)
			alpha = M_PI/2 - yaw;
			alpha_deg = alpha * RAD2DEG;
		});

		avoid_coord_subscription_ = this->create_subscription<geometry_msgs::msg::Point>("/avoid_coord", qos,
		[this](const geometry_msgs::msg::Point::UniquePtr msg) {
			avoid_coord = *msg;

			if(avoid_coord.x == 0 && avoid_coord.y == 0 && avoid_coord.z == 0) {
				avoid_coord_flag = false;
			}
			else {
				avoid_coord_flag = true;
			}
		});

		mPoseSub = this->create_subscription<geometry_msgs::msg::PoseStamped>(
                    "/fmu/out/vehicle_attitude", qos,
                    std::bind(&OffboardControl::poseCallback, this, _1) );
	}

	void arm();
	void disarm();

private:
	rclcpp::TimerBase::SharedPtr timer_;
	rclcpp::TimerBase::SharedPtr print_timer_; // Periodic printing

	rclcpp::Publisher<OffboardControlMode>::SharedPtr offboard_control_mode_publisher_;
	rclcpp::Publisher<TrajectorySetpoint>::SharedPtr trajectory_setpoint_publisher_;
	rclcpp::Publisher<VehicleCommand>::SharedPtr vehicle_command_publisher_;

	std::atomic<uint64_t> timestamp_;   //!< common synced timestamped

	uint64_t offboard_setpoint_counter_;   //!< counter for the number of setpoints sent

	void publish_offboard_control_mode();
	void publish_trajectory_setpoint();
	void publish_vehicle_command(uint16_t command, float param1 = 0.0, float param2 = 0.0);
	void publish_vehicle_command2(uint16_t command, float param1 = 0.0, float param2 = 0.0, float param3 = 0.0);
	
	// Subscribers
	rclcpp::Subscription<px4_msgs::msg::SensorCombined>::SharedPtr subscription_;
	rclcpp::Subscription<px4_msgs::msg::VehicleLocalPosition>::SharedPtr vlp_subscription_;
	rclcpp::Subscription<px4_msgs::msg::VehicleAttitude>::SharedPtr va_subscription_;
	rclcpp::Subscription<std_msgs::msg::String>::SharedPtr mod_image_size_subscription_;
	rclcpp::Subscription<geometry_msgs::msg::Point>::SharedPtr avoid_coord_subscription_;
	rclcpp::Subscription<geometry_msgs::msg::PoseStamped>::SharedPtr mPoseSub;

	// rclcpp::Subscription<geometry_msgs::msg::Point>::SharedPtr avoid_coord_subscription_;

	// Vehicle data readings
	uint64_t vehicle_ts;
	// Position in local NED frame
	px4_msgs::msg::VehicleLocalPosition vehicle_local_pos;

	// string mod_image_size;
	std_msgs::msg::String::UniquePtr mod_image_size;
	// rclcpp::Subscription<std_msgs::msg::String>::SharedPtr mod_image_size;

	// geometry_msgs::msg::Point::UniquePtr avoid_coord;
	geometry_msgs::msg::Point avoid_coord;

	enum States {
		STATE_IDLE,
		STATE_TAKING_OFF,
		STATE_FLYING_MISSION,
		STATE_AVOIDING,
		STATE_LANDING
	};

	States vehicle_state;
	States last_vehicle_state;

	OffboardControlMode offb_ctrl_mode_msg{};
	TrajectorySetpoint traj_setpoint_msg{};
	bool avoid_coord_flag;

	bool is_avoiding;

	// geometry_msgs::PoseStamped local_pose;
	double roll, pitch, yaw;
	double yaw_deg, alpha, alpha_deg;

	double att_roll, att_pitch, att_yaw, att_yaw_ned;

	double x_vel;
	double y_vel;
	double desired_vel_n;
	double desired_vel_e;
	double desired_vel_d;

	void poseCallback(const geometry_msgs::msg::PoseStamped::SharedPtr msg) {
        // Camera position in map frame
        double tx = msg->pose.position.x;
        double ty = msg->pose.position.y;
        double tz = msg->pose.position.z;

        // Orientation quaternion
        tf2::Quaternion q(
                    msg->pose.orientation.x,
                    msg->pose.orientation.y,
                    msg->pose.orientation.z,
                    msg->pose.orientation.w);

        // 3x3 Rotation matrix from quaternion
        tf2::Matrix3x3 m(q);

        // Roll Pitch and Yaw from rotation matrix
        //double roll, pitch, yaw;
        m.getRPY(roll, pitch, yaw);

		std::cout << "posaCallback exec" << std::endl;

        // Output the measure
        RCLCPP_INFO(get_logger(), "Received pose in '%s' frame : X: %.2f Y: %.2f Z: %.2f - R: %.2f P: %.2f Y: %.2f - Timestamp: %u.%u sec ",
                 msg->header.frame_id.c_str(),
                 tx, ty, tz,
                 roll * RAD2DEG, pitch * RAD2DEG, yaw * RAD2DEG,
                    msg->header.stamp.sec,msg->header.stamp.nanosec);
    }
};

/**
 * @brief Send a command to Arm the vehicle
 */
void OffboardControl::arm()
{
	publish_vehicle_command(VehicleCommand::VEHICLE_CMD_COMPONENT_ARM_DISARM, 1.0);

	RCLCPP_INFO(this->get_logger(), "Arm command send");
}

/**
 * @brief Send a command to Disarm the vehicle
 */
void OffboardControl::disarm()
{
	publish_vehicle_command(VehicleCommand::VEHICLE_CMD_COMPONENT_ARM_DISARM, 0.0);

	RCLCPP_INFO(this->get_logger(), "Disarm command send");
}

/**
 * @brief Publish the offboard control mode.
 *        For this example, only position and altitude controls are active.
 */
void OffboardControl::publish_offboard_control_mode()
{
	offb_ctrl_mode_msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	offboard_control_mode_publisher_->publish(offb_ctrl_mode_msg);
}

/**
 * @brief Publish a trajectory setpoint
 *        For this example, it sends a trajectory setpoint to make the
 *        vehicle hover at 5 meters with a yaw angle of 180 degrees.
 */
void OffboardControl::publish_trajectory_setpoint()
{
	traj_setpoint_msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	trajectory_setpoint_publisher_->publish(traj_setpoint_msg);
}

/**
 * @brief Publish vehicle commands
 * @param command   Command code (matches VehicleCommand and MAVLink MAV_CMD codes)
 * @param param1    Command parameter 1
 * @param param2    Command parameter 2
 */
void OffboardControl::publish_vehicle_command(uint16_t command, float param1, float param2)
{
	VehicleCommand msg{};
	msg.param1 = param1;
	msg.param2 = param2;
	msg.command = command;
	msg.target_system = 1;
	msg.target_component = 1;
	msg.source_system = 1;
	msg.source_component = 1;
	msg.from_external = true;
	msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	vehicle_command_publisher_->publish(msg);
}

void OffboardControl::publish_vehicle_command2(uint16_t command, float param1, float param2, float param3)
{
	VehicleCommand msg{};
	msg.param1 = param1;
	msg.param2 = param2;
	msg.param3 = param3;
	msg.command = command;
	msg.target_system = 1;
	msg.target_component = 1;
	msg.source_system = 1;
	msg.source_component = 1;
	msg.from_external = true;
	msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	vehicle_command_publisher_->publish(msg);
}

int main(int argc, char *argv[])
{
	std::cout << "Starting offboard control node..." << std::endl;
	setvbuf(stdout, NULL, _IONBF, BUFSIZ);
	rclcpp::init(argc, argv);
	rclcpp::spin(std::make_shared<OffboardControl>());

	rclcpp::shutdown();
	return 0;
}

Object recognition ROS 2 node

Python
#! /usr/bin/env python3

import rclpy
from rclpy.node import Node

import cv2
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image, CompressedImage # Image is the message type
from std_msgs.msg import String
from geometry_msgs.msg import Point, Quaternion, Pose, PoseArray
import rospkg

from detector import ObjectDetectorLite, ObjectDetectorLiteExtDelegate
from visualization_utils import draw_bounding_boxes_on_image_array
from utils import load_image
import time

DEBUG_FLAG = True

DESIRED_IMAGE_WIDTH = 320 #300
DESIRED_IMAGE_HEIGHT = 320 #300

# A dictionary of general parameters derived from the camera image size,
# which will be populated later with the 'get_image_params' function
params = {'image_height':None, 'image_width': None,'resized_height':None,'resized_width': None,
        'x_ax_pos':None, 'y_ax_pos':None, 'width_scaling_factor':None, 'height_scaling_factor':None}

target_poses = PoseArray()

# (resized_width, resized_height) = (300, 300)
(resized_width, resized_height) = (320, 320)

# WARNING: file will be saved in /home/pi/.ros (hidden dir) if full path not specified
# check it with cwd = os.getcwd()
# Get an instance of RosPack with the default search paths
rospack = rospkg.RosPack()
# package_dir = rospack.get_path('avoid_pkg')
package_dir = '..'

folder = '/home/user/Videos/' # Put here your test videos

# Put here your tflite model
detect_file_dir = package_dir + '/scripts/Colab/detect_8600_steps_hgames3.tflite'
label_file_dir = package_dir + '/scripts/Colab/labelmap_hgames.txt'

# --------- Open video file
video_file = 'Avion_frontal_2_12fps.mp4' # Select the test video
# video_file = 'video_composit.mp4' # flakey

comp_obstacle = Point()
avoid_coord = Point()
image_size = Point()

# A dictionary of two empty buffers (arrays) for the Moving Average Filter
filt_buffer = {'width':[], 'height':[]}
NUM_FILT_POINTS            = 20 # Number of filtering points for the Moving Average Filter

bridge = CvBridge()

class ImagePublisher(Node):
    """
    Create an ImagePublisher class, which is a subclass of the Node class.
    """
    def __init__(self):
        """
        Class constructor to set up the node
        """
        # Initiate the Node class's constructor and give it a name
        super().__init__('image_publisher')
            
        # -------------------- Create ublishers --------------------
        # to the video_frames topic. The queue size is 10 messages.
        self.publisher_ = self.create_publisher(Image, 'video_frames', 10)
        # resz_im_frame_pub = rospy.Publisher("/resz_frame/compressed", CompressedImage, queue_size=10)
        self.resz_im_frame_pub = self.create_publisher(Image, "/resz_frame/compressed", 10)
        # self.inf_tgt_im_frame_pub = self.create_publisher(CompressedImage, "/inf_tgt_frame/compressed", 10)
        self.inf_tgt_im_frame_pub = self.create_publisher(Image, "/inf_tgt_frame/compressed", 10)
        # cap_im_frame_pub = rospy.Publisher("/cap_frame/compressed", CompressedImage, queue_size=10)
        self.cap_im_frame_pub = self.create_publisher(Image, "/cap_frame/compressed", 10)
        # self.resz_frame_size_pub = rospy.Publisher("/modified_image/size", String, queue_size=10)
        self.resz_frame_size_pub = self.create_publisher(Point, "/modified_image/size", 10)
        # target_poses_pub = rospy.Publisher("/target_poses", PoseArray, queue_size=10)
        self.target_poses_pub = self.create_publisher(PoseArray, "/target_poses", 10)
        # avoid_coord_pub = rospy.Publisher("/avoid_coord", Point, queue_size=10)
        self.avoid_coord_pub = self.create_publisher(Point, "/avoid_coord", 10)
        
        # We will publish a message every 0.1 seconds
        timer_period = 0 #0.1    # seconds
            
        # Create the timer
        self.timer = self.create_timer(timer_period, self.timer_callback)
                 
        # Used to convert between ROS and OpenCV images
        self.bridge = CvBridge()

        self.detector = ObjectDetectorLite(model_path=detect_file_dir, label_path=label_file_dir)
        # Object detector with external delegate for the NPU:
        # self.detector = ObjectDetectorLiteExtDelegate(model_path=detect_file_dir, label_path=label_file_dir)

        self.input_size = self.detector.get_input_size()
        print('self.input_size: ', self.input_size[0])

        # Create a VideoCapture object
        # The argument '0' gets the default webcam.
        # self.cap = cv2.VideoCapture(0)

        ## ----- NOTE: Navqplus doesn't work with VideoCapture(0), so we are using
        # a video file instead:
        VIDEO_STREAM = folder + video_file
        # filename, file_extension = os.path.splitext(VIDEO_STREAM)
        # VIDEO_STREAM_OUT = filename + "_result" + file_extension
        # print(VIDEO_STREAM_OUT)

        self.cap = cv2.VideoCapture(VIDEO_STREAM)
        (grabbed, cap_frame) = self.cap.read()
        if not grabbed:
            print ("Init - Not grabbed.")
            return

        print('-- Camera opened successfully')

        # Compute general parameters
        get_image_params(self.cap)
        # Override size:
        params['resized_width'] = self.input_size[0]
        params['resized_height'] = self.input_size[1]
        print(f"-- Original image width, height: {params['image_width']}, {params['image_height']}")
        print(f"-- Resized image width, height: {params['resized_width']}, {params['resized_height']}")

        cap_fps = self.cap.get(cv2.CAP_PROP_FPS)
        print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(cap_fps))
        cap_period = 1/cap_fps
        print("Frames period (s) : {0}".format(cap_period))

        detection_time = None
     
    def timer_callback(self):
        """
        Callback function.
        This function gets called every 0.1 seconds.
        """
        global target_poses

        # Capture frame-by-frame
        # This method returns True/False as well
        # as the video frame.
        print("-------------------------")
        cur_frame_num = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
        ret, cap_frame = self.cap.read()

        if not ret:
           print('no video')
           self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
           ret, cap_frame = self.cap.read()

        # start_time = time.time()

        # Resize to make processing faster
        dimension = (params['resized_width'], params['resized_height'])
        # dimension = (resized_width, resized_height)
        resz_frame = cv2.resize(cap_frame, dimension, interpolation = cv2.INTER_AREA)

        # image = load_image(resz_frame)
        # tgt_left, tgt_right, tgt_top, tgt_bottom = load_image2(resz_frame)

        image = load_image(resz_frame, dimension)
        tgt_left, tgt_right, tgt_top, tgt_bottom = load_image2(resz_frame, dimension)

        # print(f"image.shape: {image.shape}") # (300, 300, 3)
        # print(f"tgt_left: {tgt_left}, tgt_right: {tgt_right}, tgt_top: {tgt_top}, tgt_bottom: {tgt_bottom}") # 0, 300, 0, 300

        fr_height, fr_width, _ = resz_frame.shape
        im_height, im_width, _ = image.shape # Image shape

        start_time = time.time()
        boxes, scores, classes = self.detector.detect(image, 0.13) #0.02 #0.21 #0.13
        # print('++++++++++ boxes: ', boxes)
        # print('++++++++++ scores: ', scores)
        # print('++++++++++ classes: ', classes)
        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("tflite detection time: " + str(detection_time ))

        print("boxes_: ", boxes)

        for label, score in zip(classes, scores):
            print(label, score)
                    
        center_points = None
        if len(boxes) > 0:
            draw_bounding_boxes_on_image_array(image, boxes, display_str_list=classes)
            # print("classes: ", classes)
            # center_points = draw_bounding_boxes_on_image_array2(image, boxes, display_str_list=classes)

        # image= cv2.rectangle(image, (0, 0), (300, 300), color=(255, 0, 255), thickness=2) # Red
        image= cv2.rectangle(image, (0, 0), dimension, color=(255, 0, 255), thickness=2) # Red
        try:
            # self.inf_tgt_im_frame_pub.publish(bridge.cv2_to_compressed_imgmsg(image))
            self.inf_tgt_im_frame_pub.publish(bridge.cv2_to_imgmsg(image))
        except CvBridgeError as e:
            print('Exception inf_tgt_im_frame_pub: ', e)

        # delta_time = time.time() - start_time
        # detection_time = round(delta_time, 3)
        # print("Detection time: " + str(detection_time ))
        
        boxes_shape = boxes.shape
        num_det_objects = boxes_shape[0]

        print(f"num_det_objects: {num_det_objects}")
        waypoints = []
        comp_obstacle_w = 0
        comp_obstacle_h = 0
        for i in range(num_det_objects):
            (ymin, xmin, ymax, xmax) = (boxes[i, 0], boxes[i, 1], boxes[i, 2], boxes[i, 3])
            (left, right, top, bottom) = (xmin * im_width, xmax * im_width, ymin * im_height, ymax * im_height)
            tgt_cx = int(round(left + (right-left)/2))
            tgt_cy = int(round(top + (bottom-top)/2))

            orig_left = int(round(left/params['width_scaling_factor']))
            orig_right = int(round(right/params['width_scaling_factor']))
            orig_top = int(round(top/params['height_scaling_factor']))
            orig_bottom = int(round(bottom/params['height_scaling_factor']))

            # (orig_left, orig_right, orig_top, orig_bottom) = (xmin * fr_width, xmax * fr_width, ymin * fr_height, ymax * fr_height)

            orig_cx = round(tgt_cx/params['width_scaling_factor'])
            orig_cy = round(tgt_cy/params['height_scaling_factor'])
            comp_obstacle_w += orig_cx
            comp_obstacle_h += orig_cy

            # point_msg = Point(orig_cx, orig_cy, 0)
            point_msg = Point()
            point_msg.x = float(orig_cx)
            point_msg.y = float(orig_cy)
            point_msg.z = float(0)
            # quaternion_msg = Quaternion(0, 0, 0, 0)
            quaternion_msg = Quaternion()
            quaternion_msg.x = float(0)
            quaternion_msg.y = float(0)
            quaternion_msg.z = float(0)
            quaternion_msg.w = float(0)

            pose_msg = Pose()
            pose_msg.position = point_msg
            pose_msg.orientation = quaternion_msg
            waypoints.append(pose_msg)

            # print(f" ymin: {ymin}, xmin: {xmin}, , ymax: {ymax}, xmax: {xmax}")
            # print(f" left: {left}, right: {right}, top: {top}, bottom: {bottom}")
            # print(f" orig_left: {orig_left}, orig_right: {orig_right}, orig_top: {orig_top}, orig_bottom: {orig_bottom}")

            # Print centroids of all detected objects in image (inference image)
            image = cv2.circle(image, (tgt_cx, tgt_cy), radius=2, color=(0, 255, 0), thickness=2) # Green
            
            # Print centroids of all detected objects in cap_frame (original size frame)
            cap_frame = cv2.circle(cap_frame, (orig_cx, orig_cy), radius=5, color=(0, 0, 255), thickness=4) # Red
            # Print enclosing rectangles of detected objects in cap_frame (original size frame)
            cap_frame = cv2.rectangle(cap_frame, (int(orig_left), int(orig_top)), (int(orig_right), int(orig_bottom)), color=(0, 0, 255), thickness=4) # Red

        if num_det_objects > 0:
            comp_obstacle_w = comp_obstacle_w/num_det_objects
            comp_obstacle_h = comp_obstacle_h/num_det_objects
            # Apply Moving Average filter to target camera coordinates
            tgt_cam_coord = {'width':comp_obstacle_w, 'height':comp_obstacle_h}
            tgt_filt_cam_coord = moving_average_filter(tgt_cam_coord)
        else:
            comp_obstacle_w = params['y_ax_pos']
            comp_obstacle_h = params['x_ax_pos']
            flush_filter_buffer()
            tgt_filt_cam_coord = {'width':params['y_ax_pos'], 'height':params['x_ax_pos']}

        print(f">>> comp_obstacle wh: {comp_obstacle_w}, {comp_obstacle_h}")
        
        # Compute obstacle Cartesian centroid without filtering
        # comp_obstacle.x = comp_obstacle_w - params['y_ax_pos']
        # comp_obstacle.y = params['x_ax_pos'] - comp_obstacle_h

        # Compute obstacle Cartesian centroid with filtering
        comp_obstacle.x = float(tgt_filt_cam_coord['width'] - params['y_ax_pos'])
        comp_obstacle.y = float(params['x_ax_pos'] - tgt_filt_cam_coord['height'])
        print(f">>> comp_obstacle xy: {comp_obstacle.x}, {comp_obstacle.y}")
        
        # Compute avoid_coord as same-magnitude mirror of the comp_obstacle
        # avoid_coord.x = -comp_obstacle.x
        # avoid_coord.y = -comp_obstacle.y

        # Compute avoid_coord as magnitude-complement mirror of the comp_obstacle
        if comp_obstacle.y != 0 and comp_obstacle.x != 0:
            # Compute the slope of the rect defind by (0, 0) and comp_obstacle
            m = comp_obstacle.y / comp_obstacle.x 
            
            if comp_obstacle.x > 0: # If the 'x' coordinate is positive
                mirror_x = -params['y_ax_pos'] # Make mirror_x negative (opposite side in the Cartesian plane)
            elif comp_obstacle.x < 0: # If the 'x' coordinate is positive
                mirror_x = params['y_ax_pos']+1 # Make mirror_x positive(opposite side in the Cartesian plane)
            mirror_y = m * mirror_x # Compute mirror_y

            if mirror_y < -params['x_ax_pos']: # If mirror_y is down the bottom of the frame
                mirror_y = -params['x_ax_pos']-1 # Make mirror_y = to the bottom of the frame
                mirror_x = mirror_y / m # And compute a new mirror_x
            elif mirror_y > params['x_ax_pos']: # If mirror_y is over the top of the frame
                mirror_y = params['x_ax_pos'] # Make mirror_y = to the top of the frame
                mirror_x = mirror_y / m # And compute a new mirror_x

            # Add comp_obstacle to mirror, to find the magnitude-complement
            avoid_coord.x = float(mirror_x + comp_obstacle.x)
            avoid_coord.y = float(mirror_y + comp_obstacle.y)
        else:
            avoid_coord.x = float(0)
            avoid_coord.y = float(0)

            # # Draw avoid point
            # print(f"... avoid limit : {x}, {y}")
            # cap_frame = cv2.circle(cap_frame, (int(x), int(y)), radius=20, color=(255, 0, 255), thickness=4)

        print(f">>> avoid_coord xy: {avoid_coord.x}, {avoid_coord.y}")

        avoid_coord_w = avoid_coord.x + params['y_ax_pos']
        avoid_coord_h = params['x_ax_pos'] - avoid_coord.y
        # print(f">>> avoid_coord wh: {avoid_coord_w}, {avoid_coord_h}")

        # Draw Cartesian center
        cap_frame = cv2.circle(cap_frame, (params['y_ax_pos'], params['x_ax_pos']), radius=4, color=(255, 0, 255), thickness=4) # Green
        # Draw obstacle_ position vector
        cap_frame = cv2.line(cap_frame, (params['y_ax_pos'], params['x_ax_pos']),
        (int(comp_obstacle_w), int(comp_obstacle_h)), color=(255, 0, 255), thickness=4) # Red
        # Draw avoid_ position vector
        cap_frame = cv2.line(cap_frame, (params['y_ax_pos'], params['x_ax_pos']),
        (int(avoid_coord_w), int(avoid_coord_h)), color=(0, 255, 0), thickness=4) # Red

        # FOR DEBUG: Left - right inference image limits
        resz_frame = cv2.line(resz_frame, (tgt_left, 0), (tgt_left, resized_height), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (tgt_right, 0), (tgt_right, resized_height), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (0, tgt_top), (resized_width, tgt_top), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (0, tgt_bottom), (resized_width, tgt_bottom), color=(0, 0, 255), thickness=2) # Red

        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("Detection & process time: " + str(detection_time ))

        try:
            self.avoid_coord_pub.publish(avoid_coord)
        except CvBridgeError as e:
            print('Exception avoid_coord_pub: ', e)
        
        if DEBUG_FLAG:
            # Downsize original image frame for topic publication
            pcent = 0.25 #0.25
            dim = (int(pcent*params['image_width']), int(pcent*params['image_height']))
            cap_frame_pub = cv2.resize(cap_frame, dim, interpolation = cv2.INTER_AREA)
            try:
                self.cap_im_frame_pub.publish(bridge.cv2_to_imgmsg(cap_frame_pub))
            except CvBridgeError as e:
                print('Exception cap_im_frame_pub: ', e)
            
            # poses = PoseArray()
            target_poses.header.frame_id = 'frame_' + str(int(cur_frame_num))
            print(f"frame_id: {target_poses.header.frame_id}")
            target_poses.poses = waypoints
            try:
                self.target_poses_pub.publish(target_poses)
            except CvBridgeError as e:
                print('Exception target_poses_pub: ', e)

            # Publish detection frame
            try:
                self.resz_im_frame_pub.publish(bridge.cv2_to_imgmsg(resz_frame))
            except CvBridgeError as e:
                print('Exception resz_im_frame_pub: ', e)

            # try:
            #     inf_tgt_im_frame_pub.publish(bridge.cv2_to_compressed_imgmsg(image))
            # except CvBridgeError as e:
            #     print('Exception inf_tgt_im_frame_pub: ', e)
            
            # Publish resz_frame_width
            resz_frame_height, resz_frame_width, channels = resz_frame.shape
            try:
                # str_bytes = bytes(str(resz_frame_width) + "," +  str(resz_frame_height), 'utf-8')
                # str_bytes = str(resz_frame_width) + "," +  str(resz_frame_height)
                # mess = bytes("22,33", 'utf-8')
                image_size.x = float(resz_frame_width)
                image_size.y = float(resz_frame_height)
                self.resz_frame_size_pub.publish(image_size)
            except CvBridgeError as e:
                print('Exception resz_frame_size_pub: ', e)
            
            # if ret == True:
            #     # Publish the image.
            #     # The 'cv2_to_imgmsg' method converts an OpenCV
            #     # image to a ROS 2 image message
            #     cap_frame = cv2.resize(cap_frame, (100, 100), interpolation = cv2.INTER_AREA)
            #     self.publisher_.publish(self.bridge.cv2_to_imgmsg(cap_frame))
 
        # Display the message on the console
        # self.get_logger().info('Publishing video frame')

        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("Detection & process & pub time: " + str(detection_time ))

def get_image_params(vid_cam):
    """ Computes useful general parameters derived from the camera image size."""

    # Grab a frame and get its size
    is_grabbed, frame = vid_cam.read()
    params['image_height'], params['image_width'], _ = frame.shape

    # Compute the scaling factor to scale the image to a desired size
    if params['image_height'] != DESIRED_IMAGE_HEIGHT:
        params['width_scaling_factor'] = round((DESIRED_IMAGE_WIDTH / params['image_width']), 4) # Rounded scaling factor
        params['height_scaling_factor'] = round((DESIRED_IMAGE_HEIGHT / params['image_height']), 4) # Rounded scaling factor
    else:
        params['width_scaling_factor'] = 1
        params['height_scaling_factor'] = 1

    print("params['width_scaling_factor']: ", params['width_scaling_factor'])

    # Compute resized width and height and resize the image
    params['resized_width'] = int(round(params['image_width'] * params['width_scaling_factor']))
    params['resized_height'] = int(round(params['image_height'] * params['height_scaling_factor']))
    dimension = (params['resized_width'], params['resized_height'])
    frame = cv2.resize(frame, dimension, interpolation = cv2.INTER_AREA)

    # Compute the position for the X and Y Cartesian coordinates in camera pixel units
    params['x_ax_pos'] = int(round(params['image_height']/2 - 1)) # NOTE: Perhaps the "- 1" is not needed
    params['y_ax_pos'] = int(round(params['image_width']/2 - 1)) 

    return

def load_image2(frame, new_size=(300, 300)):
  # Get the dimensions
  height, width, _ = frame.shape # Image shape
  new_width, new_height = new_size # Target shape 

  # Calculate the target image coordinates
  left = (width - new_width) // 2
  top = (height - new_height) // 2
  right = (width + new_width) // 2
  bottom = (height + new_height) // 2
  # print(f"-- frame.shape: {frame.shape}")
  # print(f"-- new_size: {new_size}")
  # print(f"-- left: {left}, top: {top}, right: {right}, bottom: {bottom}")

  # image = frame[left: right, top: bottom, :]
  # return image
  return left, right, top, bottom

def moving_average_filter(coord):
    """ Applies Low-Pass Moving Average Filter to a pair of coordinates."""

    # Append new coordinates to filter buffers
    filt_buffer['width'].append(coord['width'])
    filt_buffer['height'].append(coord['height'])

    # If the filters were full already with a number of NUM_FILT_POINTS values, 
    # discard the oldest value (FIFO buffer)
    if len(filt_buffer['width']) > NUM_FILT_POINTS:
        filt_buffer['width'] = filt_buffer['width'][1:]
        filt_buffer['height'] = filt_buffer['height'][1:]
    
    # Compute filtered camera coordinates
    N = len(filt_buffer['width']) # Get the number of values in buffers (will be < NUM_FILT_POINTS at the start)

    # Sum all values for each coordinate
    w_sum = sum( filt_buffer['width'] )
    h_sum = sum( filt_buffer['height'] )
    # Compute the average
    w_filt = int(round(w_sum / N))
    h_filt = int(round(h_sum / N))

    # Return filtered coordinates as a dictionary
    return {'width':w_filt, 'height':h_filt}

def flush_filter_buffer():
    global filt_buffer

    filt_buffer = {'width':[], 'height':[]}

def main(args=None):
    
    # Initialize the rclpy library
    rclpy.init(args=args)
    
    # Create the node
    image_publisher = ImagePublisher()
    
    # Spin the node so the callback function is called.
    rclpy.spin(image_publisher)
    
    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    image_publisher.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()
    
if __name__ == '__main__':
    main()

Tensorflow Lite detector

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from tflite_runtime.interpreter import Interpreter, load_delegate

from visualization_utils import *
from utils import *
import cv2
import numpy as np
class ObjectDetectorLite:
    def __init__(self, model_path, label_path):
        """
            Builds Tensorflow graph, load model and labels
        """

        # Load label_map
        self._load_label(label_path)

        # Define lite graph and Load Tensorflow Lite model into memory
        self.interpreter = Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        # Get input size
        input_shape = self.input_details[0]['shape']
        print(input_shape)
        print("type:", self.input_details[0]['dtype'])
        self.size = input_shape[:2] if len(input_shape) == 3 else input_shape[1:3]

        self.float_input = (self.input_details[0]['dtype'] == np.float32)

    def get_input_size(self):
        return self.size

    def detect(self, image, threshold=0.1):
        """
            Predicts person in frame with threshold level of confidence
            Returns list with top-left, bottom-right coordinates and list with labels, confidence in %
        """
        # image = cv2.resize(image, (320, 320), interpolation = cv2.INTER_AREA) # For my own tflite
        # image = image.astype(np.float32) # For my own tflite
        # Add a batch dimension
        # frame = np.expand_dims(image, axis=0)
        frame = image

        # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
        input_mean = 127.5
        input_std = 127.5
        if self.float_input:
            frame = (np.float32(frame) - input_mean) / input_std

        # Check if the input type is quantized, then rescale input data to uint8
        if self.input_details[0]['dtype'] == np.uint8:
            input_scale, input_zero_point = self.input_details[0]["quantization"]
            frame = frame / input_scale + input_zero_point
        frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])
        
        # run model
        self.interpreter.set_tensor(self.input_details[0]['index'], frame)
        self.interpreter.invoke()

        # get results
        # boxes = self.interpreter.get_tensor(self.output_details[0]['index'])
        # classes = self.interpreter.get_tensor(self.output_details[1]['index'])
        # scores = self.interpreter.get_tensor(self.output_details[2]['index'])
        # num = self.interpreter.get_tensor(self.output_details[3]['index'])

        # eIQ 
        # boxes = np.squeeze(self.interpreter.get_tensor(self.output_details[0]['index']))
        # classes = np.squeeze(self.interpreter.get_tensor(self.output_details[1]['index']))
        # scores = np.squeeze(self.interpreter.get_tensor(self.output_details[2]['index']))
        # num = np.squeeze(self.interpreter.get_tensor(self.output_details[3]['index']))

        # COLAB: Retrieve detection results
        boxes = self.interpreter.get_tensor(self.output_details[1]['index'])#[0] # Bounding box coordinates of detected objects
        classes = self.interpreter.get_tensor(self.output_details[3]['index'])#[0] # Class index of detected objects
        scores = self.interpreter.get_tensor(self.output_details[0]['index'])#[0] # Confidence of detected objects

        if self.output_details[0]['dtype'] == np.float32:
            print('++++++++++ float32 boxes: ', boxes)
            print('++++++++++ float32 scores: ', scores)
            print('++++++++++ float32 classes: ', classes)

            # Find detected boxes coordinates
            return self._boxes_coordinates(image,
                                            np.squeeze(boxes[0]),
                                            # np.squeeze(classes[0]+1).astype(np.int32),
                                            np.squeeze(classes[0]),
                                            np.squeeze(scores[0]),
                                            min_score_thresh=threshold)
        
        elif self.output_details[0]['dtype'] == np.uint8:
            boxes_ = boxes[0]*1.0/255.0
            classes_ = classes[0]*3.0/255.0
            scores_ = scores[0]*1.0/255.0
            print('++++++++++ uint8 boxes_: ', boxes_)
            print('++++++++++ uint8 scores_: ', scores_)
            print('++++++++++ uint8 classes_: ', classes_)

            return self._boxes_coordinates(image,
                                            np.squeeze(boxes_),
                                            np.squeeze(classes_),
                                            np.squeeze(scores_),
                                            min_score_thresh=threshold)
        
        # # boxes[0] = boxes[0]*1.0/255.0
        # # classes[0] = classes[0]*3.0/255.0
        # # scores[0] = (scores[0]*1.0/255.0).astype(np.int32)
        # # print('++++++++++ boxes[0]: ', boxes[0])
        # # print('++++++++++ scores[0]: ', scores[0])
        # # print('++++++++++ classes[0]: ', classes[0])

        # boxes_ = boxes[0]*1.0/255.0
        # classes_ = classes[0]*3.0/255.0
        # scores_ = scores[0]*1.0/255.0
        # print('++++++++++ boxes[0]: ', boxes_)
        # print('++++++++++ scores[0]: ', scores_)
        # print('++++++++++ classes[0]: ', classes_)

        # # Find detected boxes coordinates
        # # return self._boxes_coordinates(image,
        # #                                 np.squeeze(boxes[0]),
        # #                                 np.squeeze(classes[0]+1).astype(np.int32),
        # #                                 np.squeeze(scores[0]),
        # #                                 min_score_thresh=threshold)

        # return self._boxes_coordinates(image,
        #                                 np.squeeze(boxes_),
        #                                 np.squeeze(classes_),
        #                                 np.squeeze(scores_),
        #                                 min_score_thresh=threshold)
    
    def close(self):
        pass
      
    def _boxes_coordinates(self,
                        image,
                        boxes,
                        classes,
                        scores,
                        max_boxes_to_draw=20,
                        min_score_thresh=.5):
        """
        This function groups boxes that correspond to the same location
        and creates a display string for each detection 

        Args:
          image: uint8 numpy array with shape (img_height, img_width, 3)
          boxes: a numpy array of shape [N, 4]
          classes: a numpy array of shape [N]
          scores: a numpy array of shape [N] or None.  If scores=None, then
            this function assumes that the boxes to be plotted are groundtruth
            boxes and plot all boxes as black with no classes or scores.
          max_boxes_to_draw: maximum number of boxes to visualize.  If None, draw
            all boxes.
          min_score_thresh: minimum score threshold for a box to be visualized
        """

        if not max_boxes_to_draw:
            max_boxes_to_draw = boxes.shape[0]
        number_boxes = min(max_boxes_to_draw, boxes.shape[0])
        detected_boxes = []
        probabilities = []
        categories = []
        print('self.category_index:', self.category_index)
        for i in range(number_boxes):
            if scores is None or scores[i] > min_score_thresh:
                box = tuple(boxes[i].tolist())
                detected_boxes.append(box)
                probabilities.append(scores[i])
                categories.append(self.category_index[classes[i]])
        return np.array(detected_boxes), probabilities, categories

    def _load_label(self, path):
        """
            Loads labels
        """
        categories = load_labelmap(path)
        self.category_index = create_category_index(categories)

class ObjectDetectorLiteExtDelegate:
    def __init__(self, model_path, label_path):
        """
            Builds Tensorflow graph, load model and labels
        """

        # Load label_map
        self._load_label(label_path)

        # Loading external delegate from /lib/libvx_delegate.so with args: {} num threads: None
        # load external delegate
        ext_delegate = '/lib/libvx_delegate.so'
        ext_delegate_options = ''
        # if args.ext_delegate is not None:
        print("Loading external delegate from {} with args: {}".format(ext_delegate, ext_delegate_options))
        ext_delegate = [ load_delegate(ext_delegate, ext_delegate_options) ]

        self.interpreter = Interpreter(
                model_path=model_path, experimental_delegates=ext_delegate, num_threads=None)
        self.interpreter.allocate_tensors()

        # Define lite graph and Load Tensorflow Lite model into memory
        # self.interpreter = Interpreter(model_path=model_path)
        # self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        # Get input size
        input_shape = self.input_details[0]['shape']
        print(input_shape)
        print("type:", self.input_details[0]['dtype'])
        self.size = input_shape[:2] if len(input_shape) == 3 else input_shape[1:3]

        self.float_input = (self.input_details[0]['dtype'] == np.float32)

    def get_input_size(self):
        return self.size

    def detect(self, image, threshold=0.1):
        """
            Predicts person in frame with threshold level of confidence
            Returns list with top-left, bottom-right coordinates and list with labels, confidence in %
        """
        # image = cv2.resize(image, (640, 640), interpolation = cv2.INTER_AREA) # For my own tflite
        # image = image.astype(np.float32) # For my own tflite
        # Add a batch dimension
        # frame = np.expand_dims(image, axis=0)
        frame = image
        # frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])

        # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
        # input_mean = 127.5
        # input_std = 127.5
        # if self.float_input:
        #     frame = (np.float32(frame) - input_mean) / input_std

        # Check if the input type is quantized, then rescale input data to uint8
        if self.input_details[0]['dtype'] == np.uint8:
            input_scale, input_zero_point = self.input_details[0]["quantization"]
            frame = frame / input_scale + input_zero_point
        frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])

        # output = interpreter.get_tensor(output_details["index"])[0]
        
        # run model
        self.interpreter.set_tensor(self.input_details[0]['index'], frame)
        self.interpreter.invoke()

        # get results
        # boxes = self.interpreter.get_tensor(self.output_details[0]['index'])
        # classes = self.interpreter.get_tensor(self.output_details[1]['index'])
        # scores = self.interpreter.get_tensor(self.output_details[2]['index'])
        # num = self.interpreter.get_tensor(self.output_details[3]['index'])

        # COLAB: Retrieve detection results
        boxes = self.interpreter.get_tensor(self.output_details[1]['index'])#[0] # Bounding box coordinates of detected objects
        classes = self.interpreter.get_tensor(self.output_details[3]['index'])#[0] # Class index of detected objects
        scores = self.interpreter.get_tensor(self.output_details[0]['index'])#[0] # Confidence of detected objects
        
        if self.output_details[0]['dtype'] == np.float32:
            print('++++++++++ float32 boxes: ', boxes)
            print('++++++++++ float32 scores: ', scores)
            print('++++++++++ float32 classes: ', classes)

            # Find detected boxes coordinates
            return self._boxes_coordinates(image,
                                            np.squeeze(boxes[0]),
                                            # np.squeeze(classes[0]+1).astype(np.int32),
                                            np.squeeze(classes[0]),
                                            np.squeeze(scores[0]),
                                            min_score_thresh=threshold)
        
        elif self.output_details[0]['dtype'] == np.uint8:
            boxes_ = boxes[0]*1.0/255.0
            classes_ = classes[0]*3.0/255.0
            scores_ = scores[0]*1.0/255.0
            print('++++++++++ uint8 boxes_: ', boxes_)
            print('++++++++++ uint8 scores_: ', scores_)
            print('++++++++++ uint8 classes_: ', classes_)

            return self._boxes_coordinates(image,
                                            np.squeeze(boxes_),
                                            np.squeeze(classes_),
                                            np.squeeze(scores_),
                                            min_score_thresh=threshold)

    def close(self):
        pass
      
    def _boxes_coordinates(self,
                        image,
                        boxes,
                        classes,
                        scores,
                        max_boxes_to_draw=20,
                        min_score_thresh=.5):
        """
        This function groups boxes that correspond to the same location
        and creates a display string for each detection 

        Args:
          image: uint8 numpy array with shape (img_height, img_width, 3)
          boxes: a numpy array of shape [N, 4]
          classes: a numpy array of shape [N]
          scores: a numpy array of shape [N] or None.  If scores=None, then
            this function assumes that the boxes to be plotted are groundtruth
            boxes and plot all boxes as black with no classes or scores.
          max_boxes_to_draw: maximum number of boxes to visualize.  If None, draw
            all boxes.
          min_score_thresh: minimum score threshold for a box to be visualized
        """

        if not max_boxes_to_draw:
            max_boxes_to_draw = boxes.shape[0]
        number_boxes = min(max_boxes_to_draw, boxes.shape[0])
        detected_boxes = []
        probabilities = []
        categories = []

        for i in range(number_boxes):
            if scores is None or scores[i] > min_score_thresh:
                box = tuple(boxes[i].tolist())
                detected_boxes.append(box)
                probabilities.append(scores[i])
                categories.append(self.category_index[classes[i]])
        return np.array(detected_boxes), probabilities, categories

    def _load_label(self, path):
        """
            Loads labels
        """
        categories = load_labelmap(path)
        self.category_index = create_category_index(categories)

Detector utilities

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Image and label map utility functions."""
import logging
import numpy as np


def create_category_index(categories):
  """Creates dictionary of COCO compatible categories keyed by category id.

  Args:
    categories: a list of dicts, each of which has the following keys:
      'id': (required) an integer id uniquely identifying this category.
      'name': (required) string representing category name
        e.g., 'cat', 'dog'.

  Returns:
    category_index: a dict containing the same entries as categories, but keyed
      by the 'id' field of each category.
  """
  category_index = {}
  for index, cat in enumerate(categories):
    category_index[index] = cat
  return category_index


def load_labelmap(path):

  with open(path, 'r') as fid:
    label_map = list(map(str.strip, fid.readlines()))
  return label_map


def get_label_map_dict(label_map_path):
  """Reads a label map and returns a dictionary of label names to id.

  Args:
    label_map_path: path to label_map.

  Returns:
    A dictionary mapping label names to id.
  """
  label_map = load_labelmap(label_map_path)
  label_map_dict = {}
  for item in label_map.item:
    label_map_dict[item.name] = item.id
  return label_map_dict


def load_image(frame, new_size=(300, 300)):
  # Get the dimensions
  height, width, _ = frame.shape # Image shape
  new_width, new_height = new_size # Target shape 

  # Calculate the target image coordinates
  left = (width - new_width) // 2
  top = (height - new_height) // 2
  right = (width + new_width) // 2
  bottom = (height + new_height) // 2
  # print(f"-- frame.shape: {frame.shape}")
  # print(f"-- new_size: {new_size}")
  # print(f"LOAD: left: {left}, top: {top}, right: {right}, bottom: {bottom}")

  image = frame[left: right, top: bottom, :]
  return image

Visualization utilities

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""## Visualization Utilities"""

import numpy as np
from PIL import Image, ImageDraw, ImageFont

def draw_bounding_boxes_on_image_array(image,
                                       boxes,
                                       color='red',
                                       thickness=4,
                                       display_str_list=()):
  """Draws bounding boxes on image (numpy array).
  Args:
    image: a numpy array object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list_list: a list of strings for each bounding box.
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  image_pil = Image.fromarray(image)
  draw_bounding_boxes_on_image(image_pil, boxes, color, thickness,
                               display_str_list)
  np.copyto(image, np.array(image_pil))
  

def draw_bounding_boxes_on_image(image,
                                 boxes,
                                 color='red',
                                 thickness=4,
                                 display_str_list=()):
  """Draws bounding boxes on image.
  Args:
    image: a PIL.Image object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: a list of strings for each bounding box.
                           
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  boxes_shape = boxes.shape
  if not boxes_shape:
    return
  if len(boxes_shape) != 2 or boxes_shape[1] != 4:
    raise ValueError('Input must be of size [N, 4]')
  for i in range(boxes_shape[0]):
    draw_bounding_box_on_image(image, boxes[i, 0], boxes[i, 1], boxes[i, 2],
                               boxes[i, 3], color, thickness, display_str_list[i])
        
def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color='red',
                               thickness=4,
                               display_str=None,
                               use_normalized_coordinates=True):
  """Adds a bounding box to an image.
  Bounding box coordinates can be specified in either absolute (pixel) or
  normalized coordinates by setting the use_normalized_coordinates argument.
  Args:
    image: a PIL.Image object.
    ymin: ymin of bounding box.
    xmin: xmin of bounding box.
    ymax: ymax of bounding box.
    xmax: xmax of bounding box.
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: string to display in box
    use_normalized_coordinates: If True (default), treat coordinates
      ymin, xmin, ymax, xmax as relative to the image.  Otherwise treat
      coordinates as absolute.
  """
  draw = ImageDraw.Draw(image)
  im_width, im_height = image.size
  print(f"use_normalized_coordinates: {use_normalized_coordinates}")
  if use_normalized_coordinates:
    (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                  ymin * im_height, ymax * im_height)
  else:
    (left, right, top, bottom) = (xmin, xmax, ymin, ymax)
  draw.line([(left, top), (left, bottom), (right, bottom),
             (right, top), (left, top)], width=thickness, fill=color)
  # c_x = int(round(left + (right-left)/2))
  # c_y = int(round(top + (bottom-top)/2))
  # draw.ellipse((c_x-2, c_y-2, c_x+2, c_y+2), fill = color, outline ='green')
  try:
    font = ImageFont.truetype('arial.ttf', 24)
  except IOError:
    font = ImageFont.load_default()
  
  display_str_height = font.getsize(display_str)[1]

  if top > display_str_height:
    text_bottom = top
  else:
    text_bottom = bottom + display_str_height
    
  text_width, text_height = font.getsize(display_str)
  margin = np.ceil(0.05 * text_height)
  draw.rectangle(
      [(left, text_bottom - text_height - 2 * margin), (left + text_width,
                                                        text_bottom)],
      fill=color)

  draw.text(
      (left + margin, text_bottom - text_height - margin),
      display_str,
      fill='black',
      font=font)

Credits

Raul Alvarez-Torrico
7 projects • 11 followers
BEng in electronics, founder of TecBolivia (physical computing & educational robotics in Bolivia). WSNs, robotics, artificial intelligence.
Contact

Comments

Please log in or sign up to comment.

1
2
3
Hey stranger! Sign up to access unlimited projects featuring OpenCV – Open Source Computer Vision Library and more – it's free.
Not now