Spaces:
Paused
Paused
from glob import glob | |
import hydra | |
import argparse | |
import time | |
from pathlib import Path | |
import math | |
import cv2 | |
import torch | |
import torch.backends.cudnn as cudnn | |
from numpy import random | |
from ultralytics.yolo.engine.predictor import BasePredictor | |
from ultralytics.yolo.utils import DEFAULT_CONFIG, ROOT, ops | |
from ultralytics.yolo.utils.checks import check_imgsz | |
from ultralytics.yolo.utils.plotting import Annotator, colors, save_one_box | |
import pandas as pd | |
import cv2 | |
from ultralytics.yolo.v8.detect.deep_sort_pytorch.utils.parser import get_config | |
from ultralytics.yolo.v8.detect.deep_sort_pytorch.deep_sort import DeepSort | |
from collections import deque | |
import numpy as np | |
import csv | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import gradio as gr | |
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1) | |
deq = {} | |
indices = [0] * 100 | |
c = 0 | |
num = 1 | |
#f = open('/pulse.csv', "w+") | |
#f.close() | |
#with open('/pulse.csv', 'a') as f: | |
# create the csv writer | |
# writer = csv.writer(f) | |
# header = ['time', 'pulse'] | |
# writer.writerow(header) | |
deepsort = None | |
object_counter = {} | |
speed_line_queue = {} | |
def estimatespeed(Location1, Location2, h, w): | |
#Euclidean Distance Formula | |
d_pixel = math.sqrt(math.pow(Location2[0] - Location1[0], 2) + math.pow(Location2[1] - Location1[1], 2)) | |
# defining thr pixels per meter | |
ppm = max(h, w) // 10 | |
d_meters = d_pixel/ppm | |
time_constant = 15*3.6 | |
#distance = speed/time | |
speed = d_meters * time_constant | |
return int(speed) | |
def init_tracker(): | |
global deepsort | |
cfg_deep = get_config() | |
cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml") | |
deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT, | |
max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE, | |
nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE, | |
max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET, | |
use_cuda=True) | |
########################################################################################## | |
def xyxy_to_xywh(*xyxy): | |
"""" Calculates the relative bounding box from absolute pixel values. """ | |
bbox_left = min([xyxy[0].item(), xyxy[2].item()]) | |
bbox_top = min([xyxy[1].item(), xyxy[3].item()]) | |
bbox_w = abs(xyxy[0].item() - xyxy[2].item()) | |
bbox_h = abs(xyxy[1].item() - xyxy[3].item()) | |
x_c = (bbox_left + bbox_w / 2) | |
y_c = (bbox_top + bbox_h / 2) | |
w = bbox_w | |
h = bbox_h | |
return x_c, y_c, w, h | |
def compute_color_for_labels(label): | |
""" | |
Simple function that adds fixed color depending on the class | |
""" | |
if label == 7: #truck | |
color = (85,45,255) | |
elif label == 2: # Car | |
color = (222,82,175) | |
elif label == 3: # Motorcycle | |
color = (0, 204, 255) | |
elif label == 5: # Bus | |
color = (0, 149, 255) | |
else: | |
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette] | |
return tuple(color) | |
def draw_border(img, pt1, pt2, color, thickness, r, d): | |
x1,y1 = pt1 | |
x2,y2 = pt2 | |
# Top left | |
cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness) | |
cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness) | |
cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness) | |
# Top right | |
cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness) | |
cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness) | |
cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness) | |
# Bottom left | |
cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness) | |
cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness) | |
cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness) | |
# Bottom right | |
cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness) | |
cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness) | |
cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness) | |
cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA) | |
cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA) | |
cv2.circle(img, (x1 +r, y1+r), 2, color, 12) | |
cv2.circle(img, (x2 -r, y1+r), 2, color, 12) | |
cv2.circle(img, (x1 +r, y2-r), 2, color, 12) | |
cv2.circle(img, (x2 -r, y2-r), 2, color, 12) | |
return img | |
def UI_box(x, img, color=None, label=None, line_thickness=None): | |
# Plots one bounding box on image img | |
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness | |
color = color or [random.randint(0, 255) for _ in range(3)] | |
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) | |
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) | |
if label: | |
tf = max(tl - 1, 1) # font thickness | |
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] | |
img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2) | |
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA) | |
def ccw(A,B,C): | |
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) | |
def draw_boxes(img, bbox, names,object_id,writer, writer2, identities=None, offset=(0, 0)): | |
height, width, _ = img.shape | |
# remove tracked point from buffer if object is lost | |
global c | |
for key in list(deq): | |
if key not in identities: | |
deq.pop(key) | |
weights = [0,0,int(6.72),int(1.638),0,30,0,int(18.75)] | |
speeds = [0] * 8 | |
for i, box in enumerate(bbox): | |
obj_name = names[object_id[i]] | |
x1, y1, x2, y2 = [int(i) for i in box] | |
x1 += offset[0] | |
x2 += offset[0] | |
y1 += offset[1] | |
y2 += offset[1] | |
# code to find center of bottom edge | |
center = (int((x2+x1)/ 2), int((y2+y2)/2)) | |
# get ID of object | |
id = int(identities[i]) if identities is not None else 0 | |
# create new buffer for new object | |
if id not in deq: | |
deq[id] = deque(maxlen= 64) | |
if object_id[i] in [2, 3, 5, 7]: | |
c +=1 | |
indices[id] = c | |
speed_line_queue[id] = [] | |
color = compute_color_for_labels(object_id[i]) | |
label = '{}{:d}'.format("", indices[id]) + ":"+ '%s' % (obj_name) | |
# add center to buffer | |
deq[id].appendleft(center) | |
if len(deq[id]) >= 2: | |
object_speed = estimatespeed(deq[id][1], deq[id][0], x2-x1, y2-y1) | |
speed_line_queue[id].append(object_speed) | |
if obj_name not in object_counter: | |
object_counter[obj_name] = 1 | |
#motorcycle_weight = 1.638 | |
#car_weight = 6.72 | |
#truck_weight = 18.75 | |
#bus_weight = 30 | |
try: | |
spd = sum(speed_line_queue[id])//len(speed_line_queue[id]) | |
speeds[object_id[i]] += spd | |
label = label + " v=" + str(spd) + " m=" + str(weights[object_id[i]]) | |
writer2.writerow([str(indices[id]), obj_name, str(spd), str(weights[object_id[i]])]) | |
except: | |
pass | |
UI_box(box, img, label=label, color=color, line_thickness=2) | |
#cv2.putText(img, f"{speeds}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) | |
t = time.localtime() | |
current_time = time.strftime("%H:%M:%S %d.%m.%Y", t) | |
pulse = sum(np.multiply(speeds, weights)) | |
# write a row to the csv file | |
writer.writerow([f"{current_time}", f"{pulse}"]) | |
cv2.putText(img, f"pulse: {pulse}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) | |
#for i, object_speed in enumerate(speeds): | |
# object_speed = sum(object_speed)*weights[i] | |
return img | |
class DetectionPredictor(BasePredictor): | |
def get_annotator(self, img): | |
return Annotator(img, line_width=self.args.line_thickness, example=str(self.model.names)) | |
def preprocess(self, img): | |
img = torch.from_numpy(img).to(self.model.device) | |
img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32 | |
img /= 255 # 0 - 255 to 0.0 - 1.0 | |
return img | |
def postprocess(self, preds, img, orig_img): | |
preds = ops.non_max_suppression(preds, | |
self.args.conf, | |
self.args.iou, | |
classes = [2, 3, 5, 7], | |
agnostic=self.args.agnostic_nms, | |
max_det=self.args.max_det) | |
for i, pred in enumerate(preds): | |
shape = orig_img[i].shape if self.webcam else orig_img.shape | |
pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round() | |
return preds | |
def write_results(self, idx, preds, batch): | |
global num | |
p, im, im0 = batch | |
all_outputs = [] | |
log_string = "" | |
if len(im.shape) == 3: | |
im = im[None] # expand for batch dim | |
self.seen += 1 | |
im0 = im0.copy() | |
if self.webcam: # batch_size >= 1 | |
log_string += f'{idx}: ' | |
frame = self.dataset.count | |
else: | |
frame = getattr(self.dataset, 'frame', 0) | |
self.data_path = p | |
save_path = str(self.save_dir / p.name) # im.jpg | |
self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}') | |
log_string += '%gx%g ' % im.shape[2:] # print string | |
self.annotator = self.get_annotator(im0) | |
det = preds[idx] | |
all_outputs.append(det) | |
if len(det) == 0: | |
return log_string | |
count = 0 | |
for c in det[:, 5].unique(): | |
count += 1 | |
n = (det[:, 5] == c).sum() # detections per class | |
cv2.putText(im0, f"{n} {self.model.names[int(c)]}", (11, count*50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) | |
log_string += f"{n} {self.model.names[int(c)]}{'s' * (n > 1)}, " | |
# write | |
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh | |
xywh_bboxs = [] | |
confs = [] | |
oids = [] | |
outputs = [] | |
for *xyxy, conf, cls in reversed(det): | |
x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy) | |
xywh_obj = [x_c, y_c, bbox_w, bbox_h] | |
xywh_bboxs.append(xywh_obj) | |
confs.append([conf.item()]) | |
oids.append(int(cls)) | |
xywhs = torch.Tensor(xywh_bboxs) | |
confss = torch.Tensor(confs) | |
outputs = deepsort.update(xywhs, confss, oids, im0) | |
#with open('/pulse.csv', 'a') as f: | |
# create the csv writer | |
#writer = csv.writer(f) | |
if len(outputs) > 0: | |
bbox_xyxy = outputs[:, :4] | |
identities = outputs[:, -2] | |
object_id = outputs[:, -1] | |
#f2 = open('/vehicles_data.csv', "w+") | |
#f2.close() | |
#with open('/vehicles_data.csv', 'a') as f: | |
#writer2 = csv.writer(f) | |
#header = ['id', 'class', 'speed', 'weight'] | |
#writer2.writerow(header) | |
img= draw_boxes(im0, bbox_xyxy, self.model.names, object_id,writer, writer2, identities) | |
cv2.imshow("window", img) | |
#df = pd.read_csv("/pulse.csv") | |
#df['time'] = pd.to_datetime(df['time'], format = '%H:%M:%S %d.%m.%Y') | |
#df.index = df['time'] | |
#del df['time'] | |
#try: | |
#fig, ax = plt.subplots() | |
#plt.clf() | |
#sns.lineplot(df) | |
#ax.set_xticklabels([t.get_text().split(".")[0] for t in ax.get_xticklabels()]) | |
#ax.set_xticklabels([pd.to_datetime(t.get_text()).strftime('%H:%M:%S') for t in ax.get_xticklabels()]) | |
#plt.ylabel('Pulse') | |
#plt.xlabel('time') | |
#plt.savefig(f'/time_series/figure_{num:010d}.png') | |
#num += 1 | |
#except: | |
#log_string += f'An error occured while saving figure_{num:010d}.png, ' | |
return [log_string, img] | |
def predict(cfg): | |
init_tracker() | |
cfg.model = cfg.model or "yolov8n.pt" | |
cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image size | |
cfg.source = cfg.source if cfg.source is not None else ROOT / "assets" | |
predictor = DetectionPredictor(cfg) | |
predictor() | |
#model = Yolov4(weight_path="yolov4.weights", class_name_path='coco_classes.txt') | |
def gradio_wrapper(img): | |
result = predict(model="YOLOv8-real-time/ultralytics/yolo/v8/detect/yolov8x6.pt", source=img) | |
#print(np.shape(img)) | |
return result | |
demo = gr.Interface( | |
fn=gradio_wrapper, | |
#gr.Image(source="webcam", streaming=True, flip=True), | |
inputs=gr.Image(source="webcam", streaming=True), | |
outputs="image", | |
live=True | |
) | |
demo.launch() |