savakholin's picture
Duplicate from kisa-misa/YOLOv8-real-time
8b7211f
from glob import glob
import hydra
import argparse
import time
from pathlib import Path
import math
import cv2
import torch
import torch.backends.cudnn as cudnn
from numpy import random
from ultralytics.yolo.engine.predictor import BasePredictor
from ultralytics.yolo.utils import DEFAULT_CONFIG, ROOT, ops
from ultralytics.yolo.utils.checks import check_imgsz
from ultralytics.yolo.utils.plotting import Annotator, colors, save_one_box
import pandas as pd
import cv2
from ultralytics.yolo.v8.detect.deep_sort_pytorch.utils.parser import get_config
from ultralytics.yolo.v8.detect.deep_sort_pytorch.deep_sort import DeepSort
from collections import deque
import numpy as np
import csv
import matplotlib.pyplot as plt
import seaborn as sns
import gradio as gr
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
deq = {}
indices = [0] * 100
c = 0
num = 1
#f = open('/pulse.csv', "w+")
#f.close()
#with open('/pulse.csv', 'a') as f:
# create the csv writer
# writer = csv.writer(f)
# header = ['time', 'pulse']
# writer.writerow(header)
deepsort = None
object_counter = {}
speed_line_queue = {}
def estimatespeed(Location1, Location2, h, w):
#Euclidean Distance Formula
d_pixel = math.sqrt(math.pow(Location2[0] - Location1[0], 2) + math.pow(Location2[1] - Location1[1], 2))
# defining thr pixels per meter
ppm = max(h, w) // 10
d_meters = d_pixel/ppm
time_constant = 15*3.6
#distance = speed/time
speed = d_meters * time_constant
return int(speed)
def init_tracker():
global deepsort
cfg_deep = get_config()
cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
use_cuda=True)
##########################################################################################
def xyxy_to_xywh(*xyxy):
"""" Calculates the relative bounding box from absolute pixel values. """
bbox_left = min([xyxy[0].item(), xyxy[2].item()])
bbox_top = min([xyxy[1].item(), xyxy[3].item()])
bbox_w = abs(xyxy[0].item() - xyxy[2].item())
bbox_h = abs(xyxy[1].item() - xyxy[3].item())
x_c = (bbox_left + bbox_w / 2)
y_c = (bbox_top + bbox_h / 2)
w = bbox_w
h = bbox_h
return x_c, y_c, w, h
def compute_color_for_labels(label):
"""
Simple function that adds fixed color depending on the class
"""
if label == 7: #truck
color = (85,45,255)
elif label == 2: # Car
color = (222,82,175)
elif label == 3: # Motorcycle
color = (0, 204, 255)
elif label == 5: # Bus
color = (0, 149, 255)
else:
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
return tuple(color)
def draw_border(img, pt1, pt2, color, thickness, r, d):
x1,y1 = pt1
x2,y2 = pt2
# Top left
cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness)
cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness)
cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness)
# Top right
cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness)
cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness)
cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness)
# Bottom left
cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness)
cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness)
cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness)
# Bottom right
cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness)
cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness)
cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness)
cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA)
cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA)
cv2.circle(img, (x1 +r, y1+r), 2, color, 12)
cv2.circle(img, (x2 -r, y1+r), 2, color, 12)
cv2.circle(img, (x1 +r, y2-r), 2, color, 12)
cv2.circle(img, (x2 -r, y2-r), 2, color, 12)
return img
def UI_box(x, img, color=None, label=None, line_thickness=None):
# Plots one bounding box on image img
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
if label:
tf = max(tl - 1, 1) # font thickness
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2)
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
def ccw(A,B,C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
def draw_boxes(img, bbox, names,object_id,writer, writer2, identities=None, offset=(0, 0)):
height, width, _ = img.shape
# remove tracked point from buffer if object is lost
global c
for key in list(deq):
if key not in identities:
deq.pop(key)
weights = [0,0,int(6.72),int(1.638),0,30,0,int(18.75)]
speeds = [0] * 8
for i, box in enumerate(bbox):
obj_name = names[object_id[i]]
x1, y1, x2, y2 = [int(i) for i in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# code to find center of bottom edge
center = (int((x2+x1)/ 2), int((y2+y2)/2))
# get ID of object
id = int(identities[i]) if identities is not None else 0
# create new buffer for new object
if id not in deq:
deq[id] = deque(maxlen= 64)
if object_id[i] in [2, 3, 5, 7]:
c +=1
indices[id] = c
speed_line_queue[id] = []
color = compute_color_for_labels(object_id[i])
label = '{}{:d}'.format("", indices[id]) + ":"+ '%s' % (obj_name)
# add center to buffer
deq[id].appendleft(center)
if len(deq[id]) >= 2:
object_speed = estimatespeed(deq[id][1], deq[id][0], x2-x1, y2-y1)
speed_line_queue[id].append(object_speed)
if obj_name not in object_counter:
object_counter[obj_name] = 1
#motorcycle_weight = 1.638
#car_weight = 6.72
#truck_weight = 18.75
#bus_weight = 30
try:
spd = sum(speed_line_queue[id])//len(speed_line_queue[id])
speeds[object_id[i]] += spd
label = label + " v=" + str(spd) + " m=" + str(weights[object_id[i]])
writer2.writerow([str(indices[id]), obj_name, str(spd), str(weights[object_id[i]])])
except:
pass
UI_box(box, img, label=label, color=color, line_thickness=2)
#cv2.putText(img, f"{speeds}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA)
t = time.localtime()
current_time = time.strftime("%H:%M:%S %d.%m.%Y", t)
pulse = sum(np.multiply(speeds, weights))
# write a row to the csv file
writer.writerow([f"{current_time}", f"{pulse}"])
cv2.putText(img, f"pulse: {pulse}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA)
#for i, object_speed in enumerate(speeds):
# object_speed = sum(object_speed)*weights[i]
return img
class DetectionPredictor(BasePredictor):
def get_annotator(self, img):
return Annotator(img, line_width=self.args.line_thickness, example=str(self.model.names))
def preprocess(self, img):
img = torch.from_numpy(img).to(self.model.device)
img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32
img /= 255 # 0 - 255 to 0.0 - 1.0
return img
def postprocess(self, preds, img, orig_img):
preds = ops.non_max_suppression(preds,
self.args.conf,
self.args.iou,
classes = [2, 3, 5, 7],
agnostic=self.args.agnostic_nms,
max_det=self.args.max_det)
for i, pred in enumerate(preds):
shape = orig_img[i].shape if self.webcam else orig_img.shape
pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()
return preds
def write_results(self, idx, preds, batch):
global num
p, im, im0 = batch
all_outputs = []
log_string = ""
if len(im.shape) == 3:
im = im[None] # expand for batch dim
self.seen += 1
im0 = im0.copy()
if self.webcam: # batch_size >= 1
log_string += f'{idx}: '
frame = self.dataset.count
else:
frame = getattr(self.dataset, 'frame', 0)
self.data_path = p
save_path = str(self.save_dir / p.name) # im.jpg
self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}')
log_string += '%gx%g ' % im.shape[2:] # print string
self.annotator = self.get_annotator(im0)
det = preds[idx]
all_outputs.append(det)
if len(det) == 0:
return log_string
count = 0
for c in det[:, 5].unique():
count += 1
n = (det[:, 5] == c).sum() # detections per class
cv2.putText(im0, f"{n} {self.model.names[int(c)]}", (11, count*50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA)
log_string += f"{n} {self.model.names[int(c)]}{'s' * (n > 1)}, "
# write
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
xywh_bboxs = []
confs = []
oids = []
outputs = []
for *xyxy, conf, cls in reversed(det):
x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy)
xywh_obj = [x_c, y_c, bbox_w, bbox_h]
xywh_bboxs.append(xywh_obj)
confs.append([conf.item()])
oids.append(int(cls))
xywhs = torch.Tensor(xywh_bboxs)
confss = torch.Tensor(confs)
outputs = deepsort.update(xywhs, confss, oids, im0)
#with open('/pulse.csv', 'a') as f:
# create the csv writer
#writer = csv.writer(f)
if len(outputs) > 0:
bbox_xyxy = outputs[:, :4]
identities = outputs[:, -2]
object_id = outputs[:, -1]
#f2 = open('/vehicles_data.csv', "w+")
#f2.close()
#with open('/vehicles_data.csv', 'a') as f:
#writer2 = csv.writer(f)
#header = ['id', 'class', 'speed', 'weight']
#writer2.writerow(header)
img= draw_boxes(im0, bbox_xyxy, self.model.names, object_id,writer, writer2, identities)
cv2.imshow("window", img)
#df = pd.read_csv("/pulse.csv")
#df['time'] = pd.to_datetime(df['time'], format = '%H:%M:%S %d.%m.%Y')
#df.index = df['time']
#del df['time']
#try:
#fig, ax = plt.subplots()
#plt.clf()
#sns.lineplot(df)
#ax.set_xticklabels([t.get_text().split(".")[0] for t in ax.get_xticklabels()])
#ax.set_xticklabels([pd.to_datetime(t.get_text()).strftime('%H:%M:%S') for t in ax.get_xticklabels()])
#plt.ylabel('Pulse')
#plt.xlabel('time')
#plt.savefig(f'/time_series/figure_{num:010d}.png')
#num += 1
#except:
#log_string += f'An error occured while saving figure_{num:010d}.png, '
return [log_string, img]
@hydra.main(version_base=None, config_path=str(DEFAULT_CONFIG.parent), config_name=DEFAULT_CONFIG.name)
def predict(cfg):
init_tracker()
cfg.model = cfg.model or "yolov8n.pt"
cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image size
cfg.source = cfg.source if cfg.source is not None else ROOT / "assets"
predictor = DetectionPredictor(cfg)
predictor()
#model = Yolov4(weight_path="yolov4.weights", class_name_path='coco_classes.txt')
def gradio_wrapper(img):
result = predict(model="YOLOv8-real-time/ultralytics/yolo/v8/detect/yolov8x6.pt", source=img)
#print(np.shape(img))
return result
demo = gr.Interface(
fn=gradio_wrapper,
#gr.Image(source="webcam", streaming=True, flip=True),
inputs=gr.Image(source="webcam", streaming=True),
outputs="image",
live=True
)
demo.launch()