Spaces:
Runtime error
Runtime error
from transformers import DetrForObjectDetection, DetrImageProcessor | |
import torch | |
from PIL import Image | |
import matplotlib.pyplot as plt | |
import gradio as gr | |
import cv2 | |
import torch | |
import supervision as sv | |
import numpy as np | |
from pytube import YouTube | |
import uuid | |
import os | |
from moviepy.editor import VideoFileClip | |
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
CHECKPOINT = 'facebook/detr-resnet-50' | |
CHECKPOINT_ACCIDENT_DETECTION = 'hilmantm/detr-traffic-accident-detection' | |
CONFIDENCE_TRESHOLD = 0.5 | |
IOU_TRESHOLD = 0.8 | |
NMS_TRESHOLD = 0.5 | |
VIDEO_PATH = os.path.join("video") | |
VIDEO_INFRENCE = False | |
fdic = { | |
"family" : "Impact", | |
"style" : "italic", | |
"size" : 15, | |
"color" : "yellow", | |
"weight" : "bold" | |
} | |
image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT) | |
model = DetrForObjectDetection.from_pretrained(CHECKPOINT_ACCIDENT_DETECTION) | |
model.to(DEVICE) | |
def inference_from_image(pil_image): | |
box_annotator = sv.BoxAnnotator() | |
numpy_image = np.array(pil_image) | |
# Convert BGR to RGB if needed (OpenCV uses BGR by default) | |
opencv_image_bgr = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR) | |
image = cv2.cvtColor(opencv_image_bgr, cv2.COLOR_BGR2RGB) | |
# inference | |
with torch.no_grad(): | |
# load image and predict | |
inputs = image_processor(images=image, return_tensors='pt').to(DEVICE) | |
outputs = model(**inputs) | |
# post-process | |
target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE) | |
results = image_processor.post_process_object_detection( | |
outputs=outputs, | |
threshold=CONFIDENCE_TRESHOLD, | |
target_sizes=target_sizes | |
)[0] | |
if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0: | |
# annotate | |
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD) | |
labels = [ | |
f"{model.config.id2label[class_id]} {confidence:0.2f}" | |
for _, confidence, class_id, _ | |
in detections | |
] | |
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels) | |
result_image = Image.fromarray(frame) | |
return result_image | |
else: | |
print("No object detected") | |
return None | |
def convert_to_h264(file_path, output_file): | |
clip = VideoFileClip(file_path) | |
clip.write_videofile(output_file, codec="libx264") | |
clip.close() | |
def inference_from_video(url): | |
box_annotator = sv.BoxAnnotator() | |
# Define the YouTube video URL | |
video_url = url | |
# Create a YouTube object and get the video stream | |
yt = YouTube(video_url) | |
yt_stream = yt.streams.filter(progressive=True, file_extension='mp4').first() | |
# Download the video to a file | |
unique_id = uuid.uuid4().hex[:6].upper() | |
video_folder = os.path.join(VIDEO_PATH, unique_id) | |
video_filename = os.path.join(video_folder, f"{unique_id}.mp4") | |
result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4") | |
result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4") | |
os.mkdir(video_folder) | |
yt_stream.download(filename=video_filename) | |
# Load the video | |
cap = cv2.VideoCapture(video_filename) | |
# Get the video frame dimensions | |
frame_width = int(cap.get(3)) | |
frame_height = int(cap.get(4)) | |
# Define the codec and create a VideoWriter object | |
out = cv2.VideoWriter(result_video_filename_temp, cv2.VideoWriter_fourcc(*'mp4v'), 30, (frame_width, frame_height)) | |
while True: | |
ret, image = cap.read() | |
if not ret: | |
break | |
# inference | |
with torch.no_grad(): | |
# load image and predict | |
inputs = image_processor(images=image, return_tensors='pt').to(DEVICE) | |
outputs = model(**inputs) | |
# post-process | |
target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE) | |
results = image_processor.post_process_object_detection( | |
outputs=outputs, | |
threshold=CONFIDENCE_TRESHOLD, | |
target_sizes=target_sizes | |
)[0] | |
print("transformer result", results) | |
if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0: | |
# annotate | |
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD) | |
labels = [ | |
f"{model.config.id2label[class_id]} {confidence:0.2f}" | |
for _, confidence, class_id, _ | |
in detections | |
] | |
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels) | |
out.write(frame) | |
else: | |
out.write(image) | |
cap.release() | |
out.release() | |
convert_to_h264(result_video_filename_temp, result_video_filename) | |
# delete temp file | |
os.remove(result_video_filename_temp) | |
return result_video_filename | |
def testing(file): | |
unique_id = "39EE5A" | |
video_folder = os.path.join(VIDEO_PATH, unique_id) | |
video_filename = os.path.join(video_folder, f"{unique_id}.mp4") | |
result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4") | |
result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4") | |
convert_to_h264(result_video_filename_temp, result_video_filename) | |
os.remove(result_video_filename_temp) | |
return result_video_filename | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# Hello!!! | |
Welcome to the live demonstration of our traffic accident detection application! | |
Experience cutting-edge technology designed to enhance road safety. | |
Trained on a diverse and multilabel dataset, including 'accident' and 'vehicle' labels. | |
This dual-label dataset enhances the model's capacity to comprehensively understand and interpret complex traffic scenarios and can minimize accident detection errors during congested traffic or traffic jams conditions, | |
making it a potent tool for accident detection and analysis. Let's explore the capabilities of our innovative solution together. | |
""") | |
gr.Markdown("## Traffic Accident Detection from CCTV POV") | |
with gr.Row(): | |
with gr.Column(): | |
input_image = gr.Image(label="Input image", type="pil") | |
with gr.Column(): | |
output_image = gr.Image(label="Output image with predicted accident", type="pil") | |
detect_image_btn = gr.Button(value="Detect Accident") | |
detect_image_btn.click(fn=inference_from_image, inputs=[input_image], outputs=[output_image]) | |
if VIDEO_INFRENCE: | |
gr.Markdown("## Detect Accident from Video") | |
with gr.Row(): | |
with gr.Column(): | |
inp = gr.Textbox(label="Youtube URL", placeholder="You should upload video to youtube and drop the link here") | |
with gr.Column(): | |
output_video = gr.Video(label="Output image with predicted accident", format="mp4") | |
detect_video_btn = gr.Button(value="Detect Accident") | |
detect_video_btn.click(fn=inference_from_video, inputs=[inp], outputs=[output_video]) | |
demo.launch(debug=True) |