hilmantm's picture
chore: change section title Detect Accident from Image to Traffic Accident Detection from CCTV POV
a9e0edf verified
from transformers import DetrForObjectDetection, DetrImageProcessor
import torch
from PIL import Image
import matplotlib.pyplot as plt
import gradio as gr
import cv2
import torch
import supervision as sv
import numpy as np
from pytube import YouTube
import uuid
import os
from moviepy.editor import VideoFileClip
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
CHECKPOINT = 'facebook/detr-resnet-50'
CHECKPOINT_ACCIDENT_DETECTION = 'hilmantm/detr-traffic-accident-detection'
CONFIDENCE_TRESHOLD = 0.5
IOU_TRESHOLD = 0.8
NMS_TRESHOLD = 0.5
VIDEO_PATH = os.path.join("video")
VIDEO_INFRENCE = False
fdic = {
"family" : "Impact",
"style" : "italic",
"size" : 15,
"color" : "yellow",
"weight" : "bold"
}
image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT)
model = DetrForObjectDetection.from_pretrained(CHECKPOINT_ACCIDENT_DETECTION)
model.to(DEVICE)
def inference_from_image(pil_image):
box_annotator = sv.BoxAnnotator()
numpy_image = np.array(pil_image)
# Convert BGR to RGB if needed (OpenCV uses BGR by default)
opencv_image_bgr = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
image = cv2.cvtColor(opencv_image_bgr, cv2.COLOR_BGR2RGB)
# inference
with torch.no_grad():
# load image and predict
inputs = image_processor(images=image, return_tensors='pt').to(DEVICE)
outputs = model(**inputs)
# post-process
target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
results = image_processor.post_process_object_detection(
outputs=outputs,
threshold=CONFIDENCE_TRESHOLD,
target_sizes=target_sizes
)[0]
if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0:
# annotate
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD)
labels = [
f"{model.config.id2label[class_id]} {confidence:0.2f}"
for _, confidence, class_id, _
in detections
]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)
result_image = Image.fromarray(frame)
return result_image
else:
print("No object detected")
return None
def convert_to_h264(file_path, output_file):
clip = VideoFileClip(file_path)
clip.write_videofile(output_file, codec="libx264")
clip.close()
def inference_from_video(url):
box_annotator = sv.BoxAnnotator()
# Define the YouTube video URL
video_url = url
# Create a YouTube object and get the video stream
yt = YouTube(video_url)
yt_stream = yt.streams.filter(progressive=True, file_extension='mp4').first()
# Download the video to a file
unique_id = uuid.uuid4().hex[:6].upper()
video_folder = os.path.join(VIDEO_PATH, unique_id)
video_filename = os.path.join(video_folder, f"{unique_id}.mp4")
result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4")
result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4")
os.mkdir(video_folder)
yt_stream.download(filename=video_filename)
# Load the video
cap = cv2.VideoCapture(video_filename)
# Get the video frame dimensions
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
# Define the codec and create a VideoWriter object
out = cv2.VideoWriter(result_video_filename_temp, cv2.VideoWriter_fourcc(*'mp4v'), 30, (frame_width, frame_height))
while True:
ret, image = cap.read()
if not ret:
break
# inference
with torch.no_grad():
# load image and predict
inputs = image_processor(images=image, return_tensors='pt').to(DEVICE)
outputs = model(**inputs)
# post-process
target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
results = image_processor.post_process_object_detection(
outputs=outputs,
threshold=CONFIDENCE_TRESHOLD,
target_sizes=target_sizes
)[0]
print("transformer result", results)
if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0:
# annotate
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD)
labels = [
f"{model.config.id2label[class_id]} {confidence:0.2f}"
for _, confidence, class_id, _
in detections
]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)
out.write(frame)
else:
out.write(image)
cap.release()
out.release()
convert_to_h264(result_video_filename_temp, result_video_filename)
# delete temp file
os.remove(result_video_filename_temp)
return result_video_filename
def testing(file):
unique_id = "39EE5A"
video_folder = os.path.join(VIDEO_PATH, unique_id)
video_filename = os.path.join(video_folder, f"{unique_id}.mp4")
result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4")
result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4")
convert_to_h264(result_video_filename_temp, result_video_filename)
os.remove(result_video_filename_temp)
return result_video_filename
with gr.Blocks() as demo:
gr.Markdown(
"""
# Hello!!!
Welcome to the live demonstration of our traffic accident detection application!
Experience cutting-edge technology designed to enhance road safety.
Trained on a diverse and multilabel dataset, including 'accident' and 'vehicle' labels.
This dual-label dataset enhances the model's capacity to comprehensively understand and interpret complex traffic scenarios and can minimize accident detection errors during congested traffic or traffic jams conditions,
making it a potent tool for accident detection and analysis. Let's explore the capabilities of our innovative solution together.
""")
gr.Markdown("## Traffic Accident Detection from CCTV POV")
with gr.Row():
with gr.Column():
input_image = gr.Image(label="Input image", type="pil")
with gr.Column():
output_image = gr.Image(label="Output image with predicted accident", type="pil")
detect_image_btn = gr.Button(value="Detect Accident")
detect_image_btn.click(fn=inference_from_image, inputs=[input_image], outputs=[output_image])
if VIDEO_INFRENCE:
gr.Markdown("## Detect Accident from Video")
with gr.Row():
with gr.Column():
inp = gr.Textbox(label="Youtube URL", placeholder="You should upload video to youtube and drop the link here")
with gr.Column():
output_video = gr.Video(label="Output image with predicted accident", format="mp4")
detect_video_btn = gr.Button(value="Detect Accident")
detect_video_btn.click(fn=inference_from_video, inputs=[inp], outputs=[output_video])
demo.launch(debug=True)