Raul Alvarez-Torrico
Published © GPL3+

BVLOS Drone Aiding Cattle Disease Diagnostics and Treatment

Transports biological samples for diagnosis and prescribed medicine from/to remote areas.

IntermediateWork in progressOver 2 days68

Things used in this project

Hardware components

KIT-HGDRONEK66
NXP KIT-HGDRONEK66
×1
RDDRONE-FMUK66
NXP RDDRONE-FMUK66
×1
LiPo battery 5100mAh, 3S, 8C
×1

Software apps and online services

OpenCV
OpenCV
QGroundControl
PX4 QGroundControl
PX4
PX4
Robot Operating System
ROS Robot Operating System
TensorFlow
TensorFlow

Story

Read more

Schematics

Block diagram

block diagram

Code

Offboard navigation ROS 2 node

C/C++
/****************************************************************************
 *
 * Copyright 2020 PX4 Development Team. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this
 * list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors
 * may be used to endorse or promote products derived from this software without
 * specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 ****************************************************************************/

/**
 * @brief Offboard control example
 * @file offboard_control.cpp
 * @addtogroup examples
 * @author Mickey Cowden <info@cowden.tech>
 * @author Nuno Marques <nuno.marques@dronesolutions.io>

 * The TrajectorySetpoint message and the OFFBOARD mode in general are under an ongoing update.
 * Please refer to PR: https://github.com/PX4/PX4-Autopilot/pull/16739 for more info.
 * As per PR: https://github.com/PX4/PX4-Autopilot/pull/17094, the format
 * of the TrajectorySetpoint message shall change.
 */

#include <px4_msgs/msg/offboard_control_mode.hpp>
#include <px4_msgs/msg/trajectory_setpoint.hpp>
#include <px4_msgs/msg/vehicle_command.hpp>
#include <px4_msgs/msg/vehicle_control_mode.hpp>
#include <rclcpp/rclcpp.hpp>
#include <stdint.h>

// Subscribers
#include <px4_msgs/msg/sensor_combined.hpp>
#include <px4_msgs/msg/vehicle_local_position.hpp>
#include "std_msgs/msg/string.hpp"
#include "geometry_msgs/msg/point.hpp"
// #include <geometry_msgs/Point.h>
#include <px4_msgs/msg/vehicle_local_position.hpp>
#include <px4_msgs/msg/vehicle_attitude.hpp>
#include <geometry_msgs/msg/pose_stamped.hpp>

#include <tf2/LinearMath/Quaternion.h>
#include <tf2/LinearMath/Matrix3x3.h>

#define RAD2DEG 57.295779513

// using std::placeholders::_1;
using namespace std::placeholders;

#include <string>
#include <stdlib.h>     /* abs */
#include <iomanip>	/* For printing two-decimal floats (std) */

#include <chrono>
#include <iostream>

using namespace std::chrono;
using namespace std::chrono_literals;
using namespace px4_msgs::msg;

#define DES_START_ALTITUDE  50.0
#define MIN_AVOID_ALTITUDE  30.0

unsigned int image_height = 1080;
unsigned int image_width = 1920;
unsigned int x_ax_pos = image_height/2 - 1; // 539 NOTE: Perhaps the "- 1" is not needed
unsigned int y_ax_pos = image_width/2 - 1; // 959

unsigned int MAX_X_VEL = 5;
unsigned int MAX_Y_VEL = 3;

static const char *states_str[] =
        { "STATE_IDLE", "STATE_TAKING_OFF", "STATE_FLYING_MISSION", "STATE_AVOIDING", "STATE_LANDING" };

class OffboardControl : public rclcpp::Node
{
public:
	OffboardControl() : Node("offboard_control")
	{

		offboard_control_mode_publisher_ = this->create_publisher<OffboardControlMode>("/fmu/in/offboard_control_mode", 10);
		trajectory_setpoint_publisher_ = this->create_publisher<TrajectorySetpoint>("/fmu/in/trajectory_setpoint", 10);
		vehicle_command_publisher_ = this->create_publisher<VehicleCommand>("/fmu/in/vehicle_command", 10);

		offboard_setpoint_counter_ = 0;

		vehicle_state = STATE_TAKING_OFF;	
		last_vehicle_state = STATE_IDLE;
		is_avoiding = false;
		avoid_coord_flag = false;

		auto timer_callback = [this]() -> void {

			if(vehicle_state != last_vehicle_state) {
				// std::cout << "\n";
				// std::cout << "New Vehicle state:" << states_str[vehicle_state]  << std::endl;
				// std::cout << "\n"   << std::endl;
				last_vehicle_state = vehicle_state;

				if(vehicle_state == STATE_TAKING_OFF || vehicle_state == STATE_FLYING_MISSION) {
					offb_ctrl_mode_msg.position = false;
					offb_ctrl_mode_msg.velocity = false;
					offb_ctrl_mode_msg.acceleration = false;
					offb_ctrl_mode_msg.attitude = false;
					offb_ctrl_mode_msg.body_rate = false;
					traj_setpoint_msg.position = {NAN, NAN, NAN};
					traj_setpoint_msg.velocity = {NAN, NAN, NAN};
					traj_setpoint_msg.yaw = yaw; // [-PI:PI]
				}
			}

			switch(vehicle_state) {
				case STATE_TAKING_OFF:
					// Arm the vehicle
					this->arm();
					this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 4); // Mission
					////// this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 2); // Takeoff
					vehicle_state = STATE_FLYING_MISSION;
				break;
				case STATE_FLYING_MISSION:
					if(vehicle_local_pos.z <= -MIN_AVOID_ALTITUDE && avoid_coord_flag) {
						vehicle_state = STATE_AVOIDING;
						is_avoiding = true;
					}

				break;
				case STATE_AVOIDING:
					// Drone < Camera
					// X < X | Z < Y
					x_vel = (avoid_coord.x/y_ax_pos) * MAX_X_VEL;
					y_vel = (avoid_coord.y/x_ax_pos) * MAX_Y_VEL;

					// Compute avoidance velocities in NED coord.
					desired_vel_e = x_vel*cos(yaw);
					desired_vel_n = -x_vel*sin(yaw);

					if(vehicle_local_pos.z < (-MIN_AVOID_ALTITUDE)) { // Command lower altitud if min. not reached
						desired_vel_d = -y_vel;
					} else { // Do not go below min. altitude
						desired_vel_d = 0.0;
					}
					
					offb_ctrl_mode_msg.velocity = true;
					traj_setpoint_msg.velocity = {desired_vel_n, desired_vel_e, desired_vel_d};

					// Set last known yaw angle from Mission mode
					offb_ctrl_mode_msg.attitude = true;
					traj_setpoint_msg.yaw = yaw; // [-PI:PI]


					if (offboard_setpoint_counter_ == 10) {
						// Change to Offboard mode after 10 setpoints
						this->publish_vehicle_command(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 6); // Offboard
					}

					//offboard_control_mode needs to be paired with trajectory_setpoint
					publish_offboard_control_mode();
					publish_trajectory_setpoint();

					// stop the counter after reaching 11
					if (offboard_setpoint_counter_ < 11) {
						offboard_setpoint_counter_++;
					}

					if(!avoid_coord_flag) {
						vehicle_state = STATE_FLYING_MISSION;
						is_avoiding = false;
						this->publish_vehicle_command2(VehicleCommand::VEHICLE_CMD_DO_SET_MODE, 1, 4, 4); // Mission
						offboard_setpoint_counter_ = 0;
					}

				break;
				case STATE_LANDING:
				break;
				default: break;
			}
		};
		timer_ = this->create_wall_timer(100ms, timer_callback);

		auto print_timer_callback = [this]() -> void {
			std::cout << std::fixed;	// For printing two-decimal floats
    		std::cout << std::setprecision(2); // For printing two-decimal floats
			std::cout << "\n\n\n\n";
			std::cout << "++STORED VEHICLE LOCAL POSITION DATA"   << std::endl;
			std::cout << "============================="   << std::endl;
			std::cout << "vehicle_local_pos.timestamp: " << vehicle_local_pos.timestamp << std::endl;
			std::cout << "vehicle_local_pos pos (" << vehicle_local_pos.x<< ", " << vehicle_local_pos.y<< ", " << vehicle_local_pos.z << ")" << std::endl;
			std::cout << "vehicle_local_pos vel (" << vehicle_local_pos.vx<< ", " << vehicle_local_pos.vy<< ", " << vehicle_local_pos.vz << ")" << std::endl;
			std::cout << "avoid_coord: (" << avoid_coord.x << ", " << avoid_coord.y << ", " << avoid_coord.z << ")" << std::endl;
			std::cout << "avoid_coord_flag: " << avoid_coord_flag << std::endl;
			std::cout << "yaw: " << yaw * RAD2DEG << " | alpha: " << alpha * RAD2DEG << std::endl;
			// std::cout << "att_yaw_ned: " << att_yaw_ned * RAD2DEG << std::endl;
			std::cout << "att(" << att_roll*RAD2DEG << ", " << att_pitch*RAD2DEG << ", " << att_yaw*RAD2DEG << ")" << std::endl;
			std::cout << "desired_vel_e : " << desired_vel_e << " | desired_vel_n: " << desired_vel_n << std::endl;// desired_vel_e 
			std::cout << "Vehicle state:" << states_str[vehicle_state]  << std::endl;
		};
		print_timer_ = this->create_wall_timer(1000ms, print_timer_callback);

		// Subscribers
		rmw_qos_profile_t qos_profile = rmw_qos_profile_sensor_data;
		auto qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 5), qos_profile);

		vlp_subscription_ = this->create_subscription<px4_msgs::msg::VehicleLocalPosition>("/fmu/out/vehicle_local_position", qos,
		[this](const px4_msgs::msg::VehicleLocalPosition::UniquePtr msg) {
			vehicle_local_pos.timestamp = msg->timestamp;
			vehicle_local_pos.x = msg->x;	// North position in NED earth-fixed frame, (metres)
			vehicle_local_pos.y = msg->y;	// East position in NED earth-fixed frame, (metres)
			vehicle_local_pos.z = msg->z;	// Down position (negative altitude) in NED earth-fixed frame, (metres)
			vehicle_local_pos.vx = msg->vx;	// North velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.vy = msg->vy;	// East velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.vz = msg->vz;	// Down velocity in NED earth-fixed frame, (metres/sec)
			vehicle_local_pos.heading = msg->heading; // Euler yaw angle transforming the tangent plane relative to NED earth-fixed frame, -PI..+PI,  (radians)

			yaw = vehicle_local_pos.heading;
			// Compute alpha (X coordinate angle WRT East in ENU system)
			alpha = M_PI/2 - yaw;
			alpha_deg = alpha * RAD2DEG;
		});

		avoid_coord_subscription_ = this->create_subscription<geometry_msgs::msg::Point>("/avoid_coord", qos,
		[this](const geometry_msgs::msg::Point::UniquePtr msg) {
			avoid_coord = *msg;

			if(avoid_coord.x == 0 && avoid_coord.y == 0 && avoid_coord.z == 0) {
				avoid_coord_flag = false;
			}
			else {
				avoid_coord_flag = true;
			}
		});

		mPoseSub = this->create_subscription<geometry_msgs::msg::PoseStamped>(
                    "/fmu/out/vehicle_attitude", qos,
                    std::bind(&OffboardControl::poseCallback, this, _1) );
	}

	void arm();
	void disarm();

private:
	rclcpp::TimerBase::SharedPtr timer_;
	rclcpp::TimerBase::SharedPtr print_timer_; // Periodic printing

	rclcpp::Publisher<OffboardControlMode>::SharedPtr offboard_control_mode_publisher_;
	rclcpp::Publisher<TrajectorySetpoint>::SharedPtr trajectory_setpoint_publisher_;
	rclcpp::Publisher<VehicleCommand>::SharedPtr vehicle_command_publisher_;

	std::atomic<uint64_t> timestamp_;   //!< common synced timestamped

	uint64_t offboard_setpoint_counter_;   //!< counter for the number of setpoints sent

	void publish_offboard_control_mode();
	void publish_trajectory_setpoint();
	void publish_vehicle_command(uint16_t command, float param1 = 0.0, float param2 = 0.0);
	void publish_vehicle_command2(uint16_t command, float param1 = 0.0, float param2 = 0.0, float param3 = 0.0);
	
	// Subscribers
	rclcpp::Subscription<px4_msgs::msg::SensorCombined>::SharedPtr subscription_;
	rclcpp::Subscription<px4_msgs::msg::VehicleLocalPosition>::SharedPtr vlp_subscription_;
	rclcpp::Subscription<px4_msgs::msg::VehicleAttitude>::SharedPtr va_subscription_;
	rclcpp::Subscription<std_msgs::msg::String>::SharedPtr mod_image_size_subscription_;
	rclcpp::Subscription<geometry_msgs::msg::Point>::SharedPtr avoid_coord_subscription_;
	rclcpp::Subscription<geometry_msgs::msg::PoseStamped>::SharedPtr mPoseSub;

	// rclcpp::Subscription<geometry_msgs::msg::Point>::SharedPtr avoid_coord_subscription_;

	// Vehicle data readings
	uint64_t vehicle_ts;
	// Position in local NED frame
	px4_msgs::msg::VehicleLocalPosition vehicle_local_pos;

	// string mod_image_size;
	std_msgs::msg::String::UniquePtr mod_image_size;
	// rclcpp::Subscription<std_msgs::msg::String>::SharedPtr mod_image_size;

	// geometry_msgs::msg::Point::UniquePtr avoid_coord;
	geometry_msgs::msg::Point avoid_coord;

	enum States {
		STATE_IDLE,
		STATE_TAKING_OFF,
		STATE_FLYING_MISSION,
		STATE_AVOIDING,
		STATE_LANDING
	};

	States vehicle_state;
	States last_vehicle_state;

	OffboardControlMode offb_ctrl_mode_msg{};
	TrajectorySetpoint traj_setpoint_msg{};
	bool avoid_coord_flag;

	bool is_avoiding;

	// geometry_msgs::PoseStamped local_pose;
	double roll, pitch, yaw;
	double yaw_deg, alpha, alpha_deg;

	double att_roll, att_pitch, att_yaw, att_yaw_ned;

	double x_vel;
	double y_vel;
	double desired_vel_n;
	double desired_vel_e;
	double desired_vel_d;

	void poseCallback(const geometry_msgs::msg::PoseStamped::SharedPtr msg) {
        // Camera position in map frame
        double tx = msg->pose.position.x;
        double ty = msg->pose.position.y;
        double tz = msg->pose.position.z;

        // Orientation quaternion
        tf2::Quaternion q(
                    msg->pose.orientation.x,
                    msg->pose.orientation.y,
                    msg->pose.orientation.z,
                    msg->pose.orientation.w);

        // 3x3 Rotation matrix from quaternion
        tf2::Matrix3x3 m(q);

        // Roll Pitch and Yaw from rotation matrix
        //double roll, pitch, yaw;
        m.getRPY(roll, pitch, yaw);

		std::cout << "posaCallback exec" << std::endl;

        // Output the measure
        RCLCPP_INFO(get_logger(), "Received pose in '%s' frame : X: %.2f Y: %.2f Z: %.2f - R: %.2f P: %.2f Y: %.2f - Timestamp: %u.%u sec ",
                 msg->header.frame_id.c_str(),
                 tx, ty, tz,
                 roll * RAD2DEG, pitch * RAD2DEG, yaw * RAD2DEG,
                    msg->header.stamp.sec,msg->header.stamp.nanosec);
    }
};

/**
 * @brief Send a command to Arm the vehicle
 */
void OffboardControl::arm()
{
	publish_vehicle_command(VehicleCommand::VEHICLE_CMD_COMPONENT_ARM_DISARM, 1.0);

	RCLCPP_INFO(this->get_logger(), "Arm command send");
}

/**
 * @brief Send a command to Disarm the vehicle
 */
void OffboardControl::disarm()
{
	publish_vehicle_command(VehicleCommand::VEHICLE_CMD_COMPONENT_ARM_DISARM, 0.0);

	RCLCPP_INFO(this->get_logger(), "Disarm command send");
}

/**
 * @brief Publish the offboard control mode.
 *        For this example, only position and altitude controls are active.
 */
void OffboardControl::publish_offboard_control_mode()
{
	offb_ctrl_mode_msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	offboard_control_mode_publisher_->publish(offb_ctrl_mode_msg);
}

/**
 * @brief Publish a trajectory setpoint
 *        For this example, it sends a trajectory setpoint to make the
 *        vehicle hover at 5 meters with a yaw angle of 180 degrees.
 */
void OffboardControl::publish_trajectory_setpoint()
{
	traj_setpoint_msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	trajectory_setpoint_publisher_->publish(traj_setpoint_msg);
}

/**
 * @brief Publish vehicle commands
 * @param command   Command code (matches VehicleCommand and MAVLink MAV_CMD codes)
 * @param param1    Command parameter 1
 * @param param2    Command parameter 2
 */
void OffboardControl::publish_vehicle_command(uint16_t command, float param1, float param2)
{
	VehicleCommand msg{};
	msg.param1 = param1;
	msg.param2 = param2;
	msg.command = command;
	msg.target_system = 1;
	msg.target_component = 1;
	msg.source_system = 1;
	msg.source_component = 1;
	msg.from_external = true;
	msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	vehicle_command_publisher_->publish(msg);
}

void OffboardControl::publish_vehicle_command2(uint16_t command, float param1, float param2, float param3)
{
	VehicleCommand msg{};
	msg.param1 = param1;
	msg.param2 = param2;
	msg.param3 = param3;
	msg.command = command;
	msg.target_system = 1;
	msg.target_component = 1;
	msg.source_system = 1;
	msg.source_component = 1;
	msg.from_external = true;
	msg.timestamp = this->get_clock()->now().nanoseconds() / 1000;
	vehicle_command_publisher_->publish(msg);
}

int main(int argc, char *argv[])
{
	std::cout << "Starting offboard control node..." << std::endl;
	setvbuf(stdout, NULL, _IONBF, BUFSIZ);
	rclcpp::init(argc, argv);
	rclcpp::spin(std::make_shared<OffboardControl>());

	rclcpp::shutdown();
	return 0;
}

Object recognition ROS 2 node

Python
#! /usr/bin/env python3

import rclpy
from rclpy.node import Node

import cv2
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image, CompressedImage # Image is the message type
from std_msgs.msg import String
from geometry_msgs.msg import Point, Quaternion, Pose, PoseArray
import rospkg

from detector import ObjectDetectorLite, ObjectDetectorLiteExtDelegate
from visualization_utils import draw_bounding_boxes_on_image_array
from utils import load_image
import time

DEBUG_FLAG = True

DESIRED_IMAGE_WIDTH = 320 #300
DESIRED_IMAGE_HEIGHT = 320 #300

# A dictionary of general parameters derived from the camera image size,
# which will be populated later with the 'get_image_params' function
params = {'image_height':None, 'image_width': None,'resized_height':None,'resized_width': None,
        'x_ax_pos':None, 'y_ax_pos':None, 'width_scaling_factor':None, 'height_scaling_factor':None}

target_poses = PoseArray()

# (resized_width, resized_height) = (300, 300)
(resized_width, resized_height) = (320, 320)

# WARNING: file will be saved in /home/pi/.ros (hidden dir) if full path not specified
# check it with cwd = os.getcwd()
# Get an instance of RosPack with the default search paths
rospack = rospkg.RosPack()
# package_dir = rospack.get_path('avoid_pkg')
package_dir = '..'

folder = '/home/user/Videos/' # Put here your test videos

# Put here your tflite model
detect_file_dir = package_dir + '/scripts/Colab/detect_8600_steps_hgames3.tflite'
label_file_dir = package_dir + '/scripts/Colab/labelmap_hgames.txt'

# --------- Open video file
video_file = 'Avion_frontal_2_12fps.mp4' # Select the test video
# video_file = 'video_composit.mp4' # flakey

comp_obstacle = Point()
avoid_coord = Point()
image_size = Point()

# A dictionary of two empty buffers (arrays) for the Moving Average Filter
filt_buffer = {'width':[], 'height':[]}
NUM_FILT_POINTS            = 20 # Number of filtering points for the Moving Average Filter

bridge = CvBridge()

class ImagePublisher(Node):
    """
    Create an ImagePublisher class, which is a subclass of the Node class.
    """
    def __init__(self):
        """
        Class constructor to set up the node
        """
        # Initiate the Node class's constructor and give it a name
        super().__init__('image_publisher')
            
        # -------------------- Create ublishers --------------------
        # to the video_frames topic. The queue size is 10 messages.
        self.publisher_ = self.create_publisher(Image, 'video_frames', 10)
        # resz_im_frame_pub = rospy.Publisher("/resz_frame/compressed", CompressedImage, queue_size=10)
        self.resz_im_frame_pub = self.create_publisher(Image, "/resz_frame/compressed", 10)
        # self.inf_tgt_im_frame_pub = self.create_publisher(CompressedImage, "/inf_tgt_frame/compressed", 10)
        self.inf_tgt_im_frame_pub = self.create_publisher(Image, "/inf_tgt_frame/compressed", 10)
        # cap_im_frame_pub = rospy.Publisher("/cap_frame/compressed", CompressedImage, queue_size=10)
        self.cap_im_frame_pub = self.create_publisher(Image, "/cap_frame/compressed", 10)
        # self.resz_frame_size_pub = rospy.Publisher("/modified_image/size", String, queue_size=10)
        self.resz_frame_size_pub = self.create_publisher(Point, "/modified_image/size", 10)
        # target_poses_pub = rospy.Publisher("/target_poses", PoseArray, queue_size=10)
        self.target_poses_pub = self.create_publisher(PoseArray, "/target_poses", 10)
        # avoid_coord_pub = rospy.Publisher("/avoid_coord", Point, queue_size=10)
        self.avoid_coord_pub = self.create_publisher(Point, "/avoid_coord", 10)
        
        # We will publish a message every 0.1 seconds
        timer_period = 0 #0.1    # seconds
            
        # Create the timer
        self.timer = self.create_timer(timer_period, self.timer_callback)
                 
        # Used to convert between ROS and OpenCV images
        self.bridge = CvBridge()

        self.detector = ObjectDetectorLite(model_path=detect_file_dir, label_path=label_file_dir)
        # Object detector with external delegate for the NPU:
        # self.detector = ObjectDetectorLiteExtDelegate(model_path=detect_file_dir, label_path=label_file_dir)

        self.input_size = self.detector.get_input_size()
        print('self.input_size: ', self.input_size[0])

        # Create a VideoCapture object
        # The argument '0' gets the default webcam.
        # self.cap = cv2.VideoCapture(0)

        ## ----- NOTE: Navqplus doesn't work with VideoCapture(0), so we are using
        # a video file instead:
        VIDEO_STREAM = folder + video_file
        # filename, file_extension = os.path.splitext(VIDEO_STREAM)
        # VIDEO_STREAM_OUT = filename + "_result" + file_extension
        # print(VIDEO_STREAM_OUT)

        self.cap = cv2.VideoCapture(VIDEO_STREAM)
        (grabbed, cap_frame) = self.cap.read()
        if not grabbed:
            print ("Init - Not grabbed.")
            return

        print('-- Camera opened successfully')

        # Compute general parameters
        get_image_params(self.cap)
        # Override size:
        params['resized_width'] = self.input_size[0]
        params['resized_height'] = self.input_size[1]
        print(f"-- Original image width, height: {params['image_width']}, {params['image_height']}")
        print(f"-- Resized image width, height: {params['resized_width']}, {params['resized_height']}")

        cap_fps = self.cap.get(cv2.CAP_PROP_FPS)
        print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(cap_fps))
        cap_period = 1/cap_fps
        print("Frames period (s) : {0}".format(cap_period))

        detection_time = None
     
    def timer_callback(self):
        """
        Callback function.
        This function gets called every 0.1 seconds.
        """
        global target_poses

        # Capture frame-by-frame
        # This method returns True/False as well
        # as the video frame.
        print("-------------------------")
        cur_frame_num = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
        ret, cap_frame = self.cap.read()

        if not ret:
           print('no video')
           self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
           ret, cap_frame = self.cap.read()

        # start_time = time.time()

        # Resize to make processing faster
        dimension = (params['resized_width'], params['resized_height'])
        # dimension = (resized_width, resized_height)
        resz_frame = cv2.resize(cap_frame, dimension, interpolation = cv2.INTER_AREA)

        # image = load_image(resz_frame)
        # tgt_left, tgt_right, tgt_top, tgt_bottom = load_image2(resz_frame)

        image = load_image(resz_frame, dimension)
        tgt_left, tgt_right, tgt_top, tgt_bottom = load_image2(resz_frame, dimension)

        # print(f"image.shape: {image.shape}") # (300, 300, 3)
        # print(f"tgt_left: {tgt_left}, tgt_right: {tgt_right}, tgt_top: {tgt_top}, tgt_bottom: {tgt_bottom}") # 0, 300, 0, 300

        fr_height, fr_width, _ = resz_frame.shape
        im_height, im_width, _ = image.shape # Image shape

        start_time = time.time()
        boxes, scores, classes = self.detector.detect(image, 0.13) #0.02 #0.21 #0.13
        # print('++++++++++ boxes: ', boxes)
        # print('++++++++++ scores: ', scores)
        # print('++++++++++ classes: ', classes)
        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("tflite detection time: " + str(detection_time ))

        print("boxes_: ", boxes)

        for label, score in zip(classes, scores):
            print(label, score)
                    
        center_points = None
        if len(boxes) > 0:
            draw_bounding_boxes_on_image_array(image, boxes, display_str_list=classes)
            # print("classes: ", classes)
            # center_points = draw_bounding_boxes_on_image_array2(image, boxes, display_str_list=classes)

        # image= cv2.rectangle(image, (0, 0), (300, 300), color=(255, 0, 255), thickness=2) # Red
        image= cv2.rectangle(image, (0, 0), dimension, color=(255, 0, 255), thickness=2) # Red
        try:
            # self.inf_tgt_im_frame_pub.publish(bridge.cv2_to_compressed_imgmsg(image))
            self.inf_tgt_im_frame_pub.publish(bridge.cv2_to_imgmsg(image))
        except CvBridgeError as e:
            print('Exception inf_tgt_im_frame_pub: ', e)

        # delta_time = time.time() - start_time
        # detection_time = round(delta_time, 3)
        # print("Detection time: " + str(detection_time ))
        
        boxes_shape = boxes.shape
        num_det_objects = boxes_shape[0]

        print(f"num_det_objects: {num_det_objects}")
        waypoints = []
        comp_obstacle_w = 0
        comp_obstacle_h = 0
        for i in range(num_det_objects):
            (ymin, xmin, ymax, xmax) = (boxes[i, 0], boxes[i, 1], boxes[i, 2], boxes[i, 3])
            (left, right, top, bottom) = (xmin * im_width, xmax * im_width, ymin * im_height, ymax * im_height)
            tgt_cx = int(round(left + (right-left)/2))
            tgt_cy = int(round(top + (bottom-top)/2))

            orig_left = int(round(left/params['width_scaling_factor']))
            orig_right = int(round(right/params['width_scaling_factor']))
            orig_top = int(round(top/params['height_scaling_factor']))
            orig_bottom = int(round(bottom/params['height_scaling_factor']))

            # (orig_left, orig_right, orig_top, orig_bottom) = (xmin * fr_width, xmax * fr_width, ymin * fr_height, ymax * fr_height)

            orig_cx = round(tgt_cx/params['width_scaling_factor'])
            orig_cy = round(tgt_cy/params['height_scaling_factor'])
            comp_obstacle_w += orig_cx
            comp_obstacle_h += orig_cy

            # point_msg = Point(orig_cx, orig_cy, 0)
            point_msg = Point()
            point_msg.x = float(orig_cx)
            point_msg.y = float(orig_cy)
            point_msg.z = float(0)
            # quaternion_msg = Quaternion(0, 0, 0, 0)
            quaternion_msg = Quaternion()
            quaternion_msg.x = float(0)
            quaternion_msg.y = float(0)
            quaternion_msg.z = float(0)
            quaternion_msg.w = float(0)

            pose_msg = Pose()
            pose_msg.position = point_msg
            pose_msg.orientation = quaternion_msg
            waypoints.append(pose_msg)

            # print(f" ymin: {ymin}, xmin: {xmin}, , ymax: {ymax}, xmax: {xmax}")
            # print(f" left: {left}, right: {right}, top: {top}, bottom: {bottom}")
            # print(f" orig_left: {orig_left}, orig_right: {orig_right}, orig_top: {orig_top}, orig_bottom: {orig_bottom}")

            # Print centroids of all detected objects in image (inference image)
            image = cv2.circle(image, (tgt_cx, tgt_cy), radius=2, color=(0, 255, 0), thickness=2) # Green
            
            # Print centroids of all detected objects in cap_frame (original size frame)
            cap_frame = cv2.circle(cap_frame, (orig_cx, orig_cy), radius=5, color=(0, 0, 255), thickness=4) # Red
            # Print enclosing rectangles of detected objects in cap_frame (original size frame)
            cap_frame = cv2.rectangle(cap_frame, (int(orig_left), int(orig_top)), (int(orig_right), int(orig_bottom)), color=(0, 0, 255), thickness=4) # Red

        if num_det_objects > 0:
            comp_obstacle_w = comp_obstacle_w/num_det_objects
            comp_obstacle_h = comp_obstacle_h/num_det_objects
            # Apply Moving Average filter to target camera coordinates
            tgt_cam_coord = {'width':comp_obstacle_w, 'height':comp_obstacle_h}
            tgt_filt_cam_coord = moving_average_filter(tgt_cam_coord)
        else:
            comp_obstacle_w = params['y_ax_pos']
            comp_obstacle_h = params['x_ax_pos']
            flush_filter_buffer()
            tgt_filt_cam_coord = {'width':params['y_ax_pos'], 'height':params['x_ax_pos']}

        print(f">>> comp_obstacle wh: {comp_obstacle_w}, {comp_obstacle_h}")
        
        # Compute obstacle Cartesian centroid without filtering
        # comp_obstacle.x = comp_obstacle_w - params['y_ax_pos']
        # comp_obstacle.y = params['x_ax_pos'] - comp_obstacle_h

        # Compute obstacle Cartesian centroid with filtering
        comp_obstacle.x = float(tgt_filt_cam_coord['width'] - params['y_ax_pos'])
        comp_obstacle.y = float(params['x_ax_pos'] - tgt_filt_cam_coord['height'])
        print(f">>> comp_obstacle xy: {comp_obstacle.x}, {comp_obstacle.y}")
        
        # Compute avoid_coord as same-magnitude mirror of the comp_obstacle
        # avoid_coord.x = -comp_obstacle.x
        # avoid_coord.y = -comp_obstacle.y

        # Compute avoid_coord as magnitude-complement mirror of the comp_obstacle
        if comp_obstacle.y != 0 and comp_obstacle.x != 0:
            # Compute the slope of the rect defind by (0, 0) and comp_obstacle
            m = comp_obstacle.y / comp_obstacle.x 
            
            if comp_obstacle.x > 0: # If the 'x' coordinate is positive
                mirror_x = -params['y_ax_pos'] # Make mirror_x negative (opposite side in the Cartesian plane)
            elif comp_obstacle.x < 0: # If the 'x' coordinate is positive
                mirror_x = params['y_ax_pos']+1 # Make mirror_x positive(opposite side in the Cartesian plane)
            mirror_y = m * mirror_x # Compute mirror_y

            if mirror_y < -params['x_ax_pos']: # If mirror_y is down the bottom of the frame
                mirror_y = -params['x_ax_pos']-1 # Make mirror_y = to the bottom of the frame
                mirror_x = mirror_y / m # And compute a new mirror_x
            elif mirror_y > params['x_ax_pos']: # If mirror_y is over the top of the frame
                mirror_y = params['x_ax_pos'] # Make mirror_y = to the top of the frame
                mirror_x = mirror_y / m # And compute a new mirror_x

            # Add comp_obstacle to mirror, to find the magnitude-complement
            avoid_coord.x = float(mirror_x + comp_obstacle.x)
            avoid_coord.y = float(mirror_y + comp_obstacle.y)
        else:
            avoid_coord.x = float(0)
            avoid_coord.y = float(0)

            # # Draw avoid point
            # print(f"... avoid limit : {x}, {y}")
            # cap_frame = cv2.circle(cap_frame, (int(x), int(y)), radius=20, color=(255, 0, 255), thickness=4)

        print(f">>> avoid_coord xy: {avoid_coord.x}, {avoid_coord.y}")

        avoid_coord_w = avoid_coord.x + params['y_ax_pos']
        avoid_coord_h = params['x_ax_pos'] - avoid_coord.y
        # print(f">>> avoid_coord wh: {avoid_coord_w}, {avoid_coord_h}")

        # Draw Cartesian center
        cap_frame = cv2.circle(cap_frame, (params['y_ax_pos'], params['x_ax_pos']), radius=4, color=(255, 0, 255), thickness=4) # Green
        # Draw obstacle_ position vector
        cap_frame = cv2.line(cap_frame, (params['y_ax_pos'], params['x_ax_pos']),
        (int(comp_obstacle_w), int(comp_obstacle_h)), color=(255, 0, 255), thickness=4) # Red
        # Draw avoid_ position vector
        cap_frame = cv2.line(cap_frame, (params['y_ax_pos'], params['x_ax_pos']),
        (int(avoid_coord_w), int(avoid_coord_h)), color=(0, 255, 0), thickness=4) # Red

        # FOR DEBUG: Left - right inference image limits
        resz_frame = cv2.line(resz_frame, (tgt_left, 0), (tgt_left, resized_height), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (tgt_right, 0), (tgt_right, resized_height), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (0, tgt_top), (resized_width, tgt_top), color=(0, 0, 255), thickness=2) # Red
        resz_frame = cv2.line(resz_frame, (0, tgt_bottom), (resized_width, tgt_bottom), color=(0, 0, 255), thickness=2) # Red

        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("Detection & process time: " + str(detection_time ))

        try:
            self.avoid_coord_pub.publish(avoid_coord)
        except CvBridgeError as e:
            print('Exception avoid_coord_pub: ', e)
        
        if DEBUG_FLAG:
            # Downsize original image frame for topic publication
            pcent = 0.25 #0.25
            dim = (int(pcent*params['image_width']), int(pcent*params['image_height']))
            cap_frame_pub = cv2.resize(cap_frame, dim, interpolation = cv2.INTER_AREA)
            try:
                self.cap_im_frame_pub.publish(bridge.cv2_to_imgmsg(cap_frame_pub))
            except CvBridgeError as e:
                print('Exception cap_im_frame_pub: ', e)
            
            # poses = PoseArray()
            target_poses.header.frame_id = 'frame_' + str(int(cur_frame_num))
            print(f"frame_id: {target_poses.header.frame_id}")
            target_poses.poses = waypoints
            try:
                self.target_poses_pub.publish(target_poses)
            except CvBridgeError as e:
                print('Exception target_poses_pub: ', e)

            # Publish detection frame
            try:
                self.resz_im_frame_pub.publish(bridge.cv2_to_imgmsg(resz_frame))
            except CvBridgeError as e:
                print('Exception resz_im_frame_pub: ', e)

            # try:
            #     inf_tgt_im_frame_pub.publish(bridge.cv2_to_compressed_imgmsg(image))
            # except CvBridgeError as e:
            #     print('Exception inf_tgt_im_frame_pub: ', e)
            
            # Publish resz_frame_width
            resz_frame_height, resz_frame_width, channels = resz_frame.shape
            try:
                # str_bytes = bytes(str(resz_frame_width) + "," +  str(resz_frame_height), 'utf-8')
                # str_bytes = str(resz_frame_width) + "," +  str(resz_frame_height)
                # mess = bytes("22,33", 'utf-8')
                image_size.x = float(resz_frame_width)
                image_size.y = float(resz_frame_height)
                self.resz_frame_size_pub.publish(image_size)
            except CvBridgeError as e:
                print('Exception resz_frame_size_pub: ', e)
            
            # if ret == True:
            #     # Publish the image.
            #     # The 'cv2_to_imgmsg' method converts an OpenCV
            #     # image to a ROS 2 image message
            #     cap_frame = cv2.resize(cap_frame, (100, 100), interpolation = cv2.INTER_AREA)
            #     self.publisher_.publish(self.bridge.cv2_to_imgmsg(cap_frame))
 
        # Display the message on the console
        # self.get_logger().info('Publishing video frame')

        delta_time = time.time() - start_time
        detection_time = round(delta_time, 3)
        print("Detection & process & pub time: " + str(detection_time ))

def get_image_params(vid_cam):
    """ Computes useful general parameters derived from the camera image size."""

    # Grab a frame and get its size
    is_grabbed, frame = vid_cam.read()
    params['image_height'], params['image_width'], _ = frame.shape

    # Compute the scaling factor to scale the image to a desired size
    if params['image_height'] != DESIRED_IMAGE_HEIGHT:
        params['width_scaling_factor'] = round((DESIRED_IMAGE_WIDTH / params['image_width']), 4) # Rounded scaling factor
        params['height_scaling_factor'] = round((DESIRED_IMAGE_HEIGHT / params['image_height']), 4) # Rounded scaling factor
    else:
        params['width_scaling_factor'] = 1
        params['height_scaling_factor'] = 1

    print("params['width_scaling_factor']: ", params['width_scaling_factor'])

    # Compute resized width and height and resize the image
    params['resized_width'] = int(round(params['image_width'] * params['width_scaling_factor']))
    params['resized_height'] = int(round(params['image_height'] * params['height_scaling_factor']))
    dimension = (params['resized_width'], params['resized_height'])
    frame = cv2.resize(frame, dimension, interpolation = cv2.INTER_AREA)

    # Compute the position for the X and Y Cartesian coordinates in camera pixel units
    params['x_ax_pos'] = int(round(params['image_height']/2 - 1)) # NOTE: Perhaps the "- 1" is not needed
    params['y_ax_pos'] = int(round(params['image_width']/2 - 1)) 

    return

def load_image2(frame, new_size=(300, 300)):
  # Get the dimensions
  height, width, _ = frame.shape # Image shape
  new_width, new_height = new_size # Target shape 

  # Calculate the target image coordinates
  left = (width - new_width) // 2
  top = (height - new_height) // 2
  right = (width + new_width) // 2
  bottom = (height + new_height) // 2
  # print(f"-- frame.shape: {frame.shape}")
  # print(f"-- new_size: {new_size}")
  # print(f"-- left: {left}, top: {top}, right: {right}, bottom: {bottom}")

  # image = frame[left: right, top: bottom, :]
  # return image
  return left, right, top, bottom

def moving_average_filter(coord):
    """ Applies Low-Pass Moving Average Filter to a pair of coordinates."""

    # Append new coordinates to filter buffers
    filt_buffer['width'].append(coord['width'])
    filt_buffer['height'].append(coord['height'])

    # If the filters were full already with a number of NUM_FILT_POINTS values, 
    # discard the oldest value (FIFO buffer)
    if len(filt_buffer['width']) > NUM_FILT_POINTS:
        filt_buffer['width'] = filt_buffer['width'][1:]
        filt_buffer['height'] = filt_buffer['height'][1:]
    
    # Compute filtered camera coordinates
    N = len(filt_buffer['width']) # Get the number of values in buffers (will be < NUM_FILT_POINTS at the start)

    # Sum all values for each coordinate
    w_sum = sum( filt_buffer['width'] )
    h_sum = sum( filt_buffer['height'] )
    # Compute the average
    w_filt = int(round(w_sum / N))
    h_filt = int(round(h_sum / N))

    # Return filtered coordinates as a dictionary
    return {'width':w_filt, 'height':h_filt}

def flush_filter_buffer():
    global filt_buffer

    filt_buffer = {'width':[], 'height':[]}

def main(args=None):
    
    # Initialize the rclpy library
    rclpy.init(args=args)
    
    # Create the node
    image_publisher = ImagePublisher()
    
    # Spin the node so the callback function is called.
    rclpy.spin(image_publisher)
    
    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    image_publisher.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()
    
if __name__ == '__main__':
    main()

Tensorflow Lite detector

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from tflite_runtime.interpreter import Interpreter, load_delegate

from visualization_utils import *
from utils import *
import cv2
import numpy as np
class ObjectDetectorLite:
    def __init__(self, model_path, label_path):
        """
            Builds Tensorflow graph, load model and labels
        """

        # Load label_map
        self._load_label(label_path)

        # Define lite graph and Load Tensorflow Lite model into memory
        self.interpreter = Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        # Get input size
        input_shape = self.input_details[0]['shape']
        print(input_shape)
        print("type:", self.input_details[0]['dtype'])
        self.size = input_shape[:2] if len(input_shape) == 3 else input_shape[1:3]

        self.float_input = (self.input_details[0]['dtype'] == np.float32)

    def get_input_size(self):
        return self.size

    def detect(self, image, threshold=0.1):
        """
            Predicts person in frame with threshold level of confidence
            Returns list with top-left, bottom-right coordinates and list with labels, confidence in %
        """
        # image = cv2.resize(image, (320, 320), interpolation = cv2.INTER_AREA) # For my own tflite
        # image = image.astype(np.float32) # For my own tflite
        # Add a batch dimension
        # frame = np.expand_dims(image, axis=0)
        frame = image

        # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
        input_mean = 127.5
        input_std = 127.5
        if self.float_input:
            frame = (np.float32(frame) - input_mean) / input_std

        # Check if the input type is quantized, then rescale input data to uint8
        if self.input_details[0]['dtype'] == np.uint8:
            input_scale, input_zero_point = self.input_details[0]["quantization"]
            frame = frame / input_scale + input_zero_point
        frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])
        
        # run model
        self.interpreter.set_tensor(self.input_details[0]['index'], frame)
        self.interpreter.invoke()

        # get results
        # boxes = self.interpreter.get_tensor(self.output_details[0]['index'])
        # classes = self.interpreter.get_tensor(self.output_details[1]['index'])
        # scores = self.interpreter.get_tensor(self.output_details[2]['index'])
        # num = self.interpreter.get_tensor(self.output_details[3]['index'])

        # eIQ 
        # boxes = np.squeeze(self.interpreter.get_tensor(self.output_details[0]['index']))
        # classes = np.squeeze(self.interpreter.get_tensor(self.output_details[1]['index']))
        # scores = np.squeeze(self.interpreter.get_tensor(self.output_details[2]['index']))
        # num = np.squeeze(self.interpreter.get_tensor(self.output_details[3]['index']))

        # COLAB: Retrieve detection results
        boxes = self.interpreter.get_tensor(self.output_details[1]['index'])#[0] # Bounding box coordinates of detected objects
        classes = self.interpreter.get_tensor(self.output_details[3]['index'])#[0] # Class index of detected objects
        scores = self.interpreter.get_tensor(self.output_details[0]['index'])#[0] # Confidence of detected objects

        if self.output_details[0]['dtype'] == np.float32:
            print('++++++++++ float32 boxes: ', boxes)
            print('++++++++++ float32 scores: ', scores)
            print('++++++++++ float32 classes: ', classes)

            # Find detected boxes coordinates
            return self._boxes_coordinates(image,
                                            np.squeeze(boxes[0]),
                                            # np.squeeze(classes[0]+1).astype(np.int32),
                                            np.squeeze(classes[0]),
                                            np.squeeze(scores[0]),
                                            min_score_thresh=threshold)
        
        elif self.output_details[0]['dtype'] == np.uint8:
            boxes_ = boxes[0]*1.0/255.0
            classes_ = classes[0]*3.0/255.0
            scores_ = scores[0]*1.0/255.0
            print('++++++++++ uint8 boxes_: ', boxes_)
            print('++++++++++ uint8 scores_: ', scores_)
            print('++++++++++ uint8 classes_: ', classes_)

            return self._boxes_coordinates(image,
                                            np.squeeze(boxes_),
                                            np.squeeze(classes_),
                                            np.squeeze(scores_),
                                            min_score_thresh=threshold)
        
        # # boxes[0] = boxes[0]*1.0/255.0
        # # classes[0] = classes[0]*3.0/255.0
        # # scores[0] = (scores[0]*1.0/255.0).astype(np.int32)
        # # print('++++++++++ boxes[0]: ', boxes[0])
        # # print('++++++++++ scores[0]: ', scores[0])
        # # print('++++++++++ classes[0]: ', classes[0])

        # boxes_ = boxes[0]*1.0/255.0
        # classes_ = classes[0]*3.0/255.0
        # scores_ = scores[0]*1.0/255.0
        # print('++++++++++ boxes[0]: ', boxes_)
        # print('++++++++++ scores[0]: ', scores_)
        # print('++++++++++ classes[0]: ', classes_)

        # # Find detected boxes coordinates
        # # return self._boxes_coordinates(image,
        # #                                 np.squeeze(boxes[0]),
        # #                                 np.squeeze(classes[0]+1).astype(np.int32),
        # #                                 np.squeeze(scores[0]),
        # #                                 min_score_thresh=threshold)

        # return self._boxes_coordinates(image,
        #                                 np.squeeze(boxes_),
        #                                 np.squeeze(classes_),
        #                                 np.squeeze(scores_),
        #                                 min_score_thresh=threshold)
    
    def close(self):
        pass
      
    def _boxes_coordinates(self,
                        image,
                        boxes,
                        classes,
                        scores,
                        max_boxes_to_draw=20,
                        min_score_thresh=.5):
        """
        This function groups boxes that correspond to the same location
        and creates a display string for each detection 

        Args:
          image: uint8 numpy array with shape (img_height, img_width, 3)
          boxes: a numpy array of shape [N, 4]
          classes: a numpy array of shape [N]
          scores: a numpy array of shape [N] or None.  If scores=None, then
            this function assumes that the boxes to be plotted are groundtruth
            boxes and plot all boxes as black with no classes or scores.
          max_boxes_to_draw: maximum number of boxes to visualize.  If None, draw
            all boxes.
          min_score_thresh: minimum score threshold for a box to be visualized
        """

        if not max_boxes_to_draw:
            max_boxes_to_draw = boxes.shape[0]
        number_boxes = min(max_boxes_to_draw, boxes.shape[0])
        detected_boxes = []
        probabilities = []
        categories = []
        print('self.category_index:', self.category_index)
        for i in range(number_boxes):
            if scores is None or scores[i] > min_score_thresh:
                box = tuple(boxes[i].tolist())
                detected_boxes.append(box)
                probabilities.append(scores[i])
                categories.append(self.category_index[classes[i]])
        return np.array(detected_boxes), probabilities, categories

    def _load_label(self, path):
        """
            Loads labels
        """
        categories = load_labelmap(path)
        self.category_index = create_category_index(categories)

class ObjectDetectorLiteExtDelegate:
    def __init__(self, model_path, label_path):
        """
            Builds Tensorflow graph, load model and labels
        """

        # Load label_map
        self._load_label(label_path)

        # Loading external delegate from /lib/libvx_delegate.so with args: {} num threads: None
        # load external delegate
        ext_delegate = '/lib/libvx_delegate.so'
        ext_delegate_options = ''
        # if args.ext_delegate is not None:
        print("Loading external delegate from {} with args: {}".format(ext_delegate, ext_delegate_options))
        ext_delegate = [ load_delegate(ext_delegate, ext_delegate_options) ]

        self.interpreter = Interpreter(
                model_path=model_path, experimental_delegates=ext_delegate, num_threads=None)
        self.interpreter.allocate_tensors()

        # Define lite graph and Load Tensorflow Lite model into memory
        # self.interpreter = Interpreter(model_path=model_path)
        # self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        # Get input size
        input_shape = self.input_details[0]['shape']
        print(input_shape)
        print("type:", self.input_details[0]['dtype'])
        self.size = input_shape[:2] if len(input_shape) == 3 else input_shape[1:3]

        self.float_input = (self.input_details[0]['dtype'] == np.float32)

    def get_input_size(self):
        return self.size

    def detect(self, image, threshold=0.1):
        """
            Predicts person in frame with threshold level of confidence
            Returns list with top-left, bottom-right coordinates and list with labels, confidence in %
        """
        # image = cv2.resize(image, (640, 640), interpolation = cv2.INTER_AREA) # For my own tflite
        # image = image.astype(np.float32) # For my own tflite
        # Add a batch dimension
        # frame = np.expand_dims(image, axis=0)
        frame = image
        # frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])

        # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
        # input_mean = 127.5
        # input_std = 127.5
        # if self.float_input:
        #     frame = (np.float32(frame) - input_mean) / input_std

        # Check if the input type is quantized, then rescale input data to uint8
        if self.input_details[0]['dtype'] == np.uint8:
            input_scale, input_zero_point = self.input_details[0]["quantization"]
            frame = frame / input_scale + input_zero_point
        frame = np.expand_dims(frame, axis=0).astype(self.input_details[0]["dtype"])

        # output = interpreter.get_tensor(output_details["index"])[0]
        
        # run model
        self.interpreter.set_tensor(self.input_details[0]['index'], frame)
        self.interpreter.invoke()

        # get results
        # boxes = self.interpreter.get_tensor(self.output_details[0]['index'])
        # classes = self.interpreter.get_tensor(self.output_details[1]['index'])
        # scores = self.interpreter.get_tensor(self.output_details[2]['index'])
        # num = self.interpreter.get_tensor(self.output_details[3]['index'])

        # COLAB: Retrieve detection results
        boxes = self.interpreter.get_tensor(self.output_details[1]['index'])#[0] # Bounding box coordinates of detected objects
        classes = self.interpreter.get_tensor(self.output_details[3]['index'])#[0] # Class index of detected objects
        scores = self.interpreter.get_tensor(self.output_details[0]['index'])#[0] # Confidence of detected objects
        
        if self.output_details[0]['dtype'] == np.float32:
            print('++++++++++ float32 boxes: ', boxes)
            print('++++++++++ float32 scores: ', scores)
            print('++++++++++ float32 classes: ', classes)

            # Find detected boxes coordinates
            return self._boxes_coordinates(image,
                                            np.squeeze(boxes[0]),
                                            # np.squeeze(classes[0]+1).astype(np.int32),
                                            np.squeeze(classes[0]),
                                            np.squeeze(scores[0]),
                                            min_score_thresh=threshold)
        
        elif self.output_details[0]['dtype'] == np.uint8:
            boxes_ = boxes[0]*1.0/255.0
            classes_ = classes[0]*3.0/255.0
            scores_ = scores[0]*1.0/255.0
            print('++++++++++ uint8 boxes_: ', boxes_)
            print('++++++++++ uint8 scores_: ', scores_)
            print('++++++++++ uint8 classes_: ', classes_)

            return self._boxes_coordinates(image,
                                            np.squeeze(boxes_),
                                            np.squeeze(classes_),
                                            np.squeeze(scores_),
                                            min_score_thresh=threshold)

    def close(self):
        pass
      
    def _boxes_coordinates(self,
                        image,
                        boxes,
                        classes,
                        scores,
                        max_boxes_to_draw=20,
                        min_score_thresh=.5):
        """
        This function groups boxes that correspond to the same location
        and creates a display string for each detection 

        Args:
          image: uint8 numpy array with shape (img_height, img_width, 3)
          boxes: a numpy array of shape [N, 4]
          classes: a numpy array of shape [N]
          scores: a numpy array of shape [N] or None.  If scores=None, then
            this function assumes that the boxes to be plotted are groundtruth
            boxes and plot all boxes as black with no classes or scores.
          max_boxes_to_draw: maximum number of boxes to visualize.  If None, draw
            all boxes.
          min_score_thresh: minimum score threshold for a box to be visualized
        """

        if not max_boxes_to_draw:
            max_boxes_to_draw = boxes.shape[0]
        number_boxes = min(max_boxes_to_draw, boxes.shape[0])
        detected_boxes = []
        probabilities = []
        categories = []

        for i in range(number_boxes):
            if scores is None or scores[i] > min_score_thresh:
                box = tuple(boxes[i].tolist())
                detected_boxes.append(box)
                probabilities.append(scores[i])
                categories.append(self.category_index[classes[i]])
        return np.array(detected_boxes), probabilities, categories

    def _load_label(self, path):
        """
            Loads labels
        """
        categories = load_labelmap(path)
        self.category_index = create_category_index(categories)

Detector utilities

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Image and label map utility functions."""
import logging
import numpy as np


def create_category_index(categories):
  """Creates dictionary of COCO compatible categories keyed by category id.

  Args:
    categories: a list of dicts, each of which has the following keys:
      'id': (required) an integer id uniquely identifying this category.
      'name': (required) string representing category name
        e.g., 'cat', 'dog'.

  Returns:
    category_index: a dict containing the same entries as categories, but keyed
      by the 'id' field of each category.
  """
  category_index = {}
  for index, cat in enumerate(categories):
    category_index[index] = cat
  return category_index


def load_labelmap(path):

  with open(path, 'r') as fid:
    label_map = list(map(str.strip, fid.readlines()))
  return label_map


def get_label_map_dict(label_map_path):
  """Reads a label map and returns a dictionary of label names to id.

  Args:
    label_map_path: path to label_map.

  Returns:
    A dictionary mapping label names to id.
  """
  label_map = load_labelmap(label_map_path)
  label_map_dict = {}
  for item in label_map.item:
    label_map_dict[item.name] = item.id
  return label_map_dict


def load_image(frame, new_size=(300, 300)):
  # Get the dimensions
  height, width, _ = frame.shape # Image shape
  new_width, new_height = new_size # Target shape 

  # Calculate the target image coordinates
  left = (width - new_width) // 2
  top = (height - new_height) // 2
  right = (width + new_width) // 2
  bottom = (height + new_height) // 2
  # print(f"-- frame.shape: {frame.shape}")
  # print(f"-- new_size: {new_size}")
  # print(f"LOAD: left: {left}, top: {top}, right: {right}, bottom: {bottom}")

  image = frame[left: right, top: bottom, :]
  return image

Visualization utilities

Python
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""## Visualization Utilities"""

import numpy as np
from PIL import Image, ImageDraw, ImageFont

def draw_bounding_boxes_on_image_array(image,
                                       boxes,
                                       color='red',
                                       thickness=4,
                                       display_str_list=()):
  """Draws bounding boxes on image (numpy array).
  Args:
    image: a numpy array object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list_list: a list of strings for each bounding box.
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  image_pil = Image.fromarray(image)
  draw_bounding_boxes_on_image(image_pil, boxes, color, thickness,
                               display_str_list)
  np.copyto(image, np.array(image_pil))
  

def draw_bounding_boxes_on_image(image,
                                 boxes,
                                 color='red',
                                 thickness=4,
                                 display_str_list=()):
  """Draws bounding boxes on image.
  Args:
    image: a PIL.Image object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: a list of strings for each bounding box.
                           
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  boxes_shape = boxes.shape
  if not boxes_shape:
    return
  if len(boxes_shape) != 2 or boxes_shape[1] != 4:
    raise ValueError('Input must be of size [N, 4]')
  for i in range(boxes_shape[0]):
    draw_bounding_box_on_image(image, boxes[i, 0], boxes[i, 1], boxes[i, 2],
                               boxes[i, 3], color, thickness, display_str_list[i])
        
def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color='red',
                               thickness=4,
                               display_str=None,
                               use_normalized_coordinates=True):
  """Adds a bounding box to an image.
  Bounding box coordinates can be specified in either absolute (pixel) or
  normalized coordinates by setting the use_normalized_coordinates argument.
  Args:
    image: a PIL.Image object.
    ymin: ymin of bounding box.
    xmin: xmin of bounding box.
    ymax: ymax of bounding box.
    xmax: xmax of bounding box.
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: string to display in box
    use_normalized_coordinates: If True (default), treat coordinates
      ymin, xmin, ymax, xmax as relative to the image.  Otherwise treat
      coordinates as absolute.
  """
  draw = ImageDraw.Draw(image)
  im_width, im_height = image.size
  print(f"use_normalized_coordinates: {use_normalized_coordinates}")
  if use_normalized_coordinates:
    (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                  ymin * im_height, ymax * im_height)
  else:
    (left, right, top, bottom) = (xmin, xmax, ymin, ymax)
  draw.line([(left, top), (left, bottom), (right, bottom),
             (right, top), (left, top)], width=thickness, fill=color)
  # c_x = int(round(left + (right-left)/2))
  # c_y = int(round(top + (bottom-top)/2))
  # draw.ellipse((c_x-2, c_y-2, c_x+2, c_y+2), fill = color, outline ='green')
  try:
    font = ImageFont.truetype('arial.ttf', 24)
  except IOError:
    font = ImageFont.load_default()
  
  display_str_height = font.getsize(display_str)[1]

  if top > display_str_height:
    text_bottom = top
  else:
    text_bottom = bottom + display_str_height
    
  text_width, text_height = font.getsize(display_str)
  margin = np.ceil(0.05 * text_height)
  draw.rectangle(
      [(left, text_bottom - text_height - 2 * margin), (left + text_width,
                                                        text_bottom)],
      fill=color)

  draw.text(
      (left + margin, text_bottom - text_height - margin),
      display_str,
      fill='black',
      font=font)

Credits

Raul Alvarez-Torrico
7 projects • 12 followers
BEng in electronics, founder of TecBolivia (physical computing & educational robotics in Bolivia). WSNs, robotics, artificial intelligence.

Comments