from transformers import DetrForObjectDetection, DetrImageProcessor import torch from PIL import Image import matplotlib.pyplot as plt import gradio as gr import cv2 import torch import supervision as sv import numpy as np from pytube import YouTube import uuid import os from moviepy.editor import VideoFileClip DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') CHECKPOINT = 'facebook/detr-resnet-50' CHECKPOINT_ACCIDENT_DETECTION = 'hilmantm/detr-traffic-accident-detection' CONFIDENCE_TRESHOLD = 0.5 IOU_TRESHOLD = 0.8 NMS_TRESHOLD = 0.5 VIDEO_PATH = os.path.join("video") VIDEO_INFRENCE = False fdic = { "family" : "Impact", "style" : "italic", "size" : 15, "color" : "yellow", "weight" : "bold" } image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT) model = DetrForObjectDetection.from_pretrained(CHECKPOINT_ACCIDENT_DETECTION) model.to(DEVICE) def inference_from_image(pil_image): box_annotator = sv.BoxAnnotator() numpy_image = np.array(pil_image) # Convert BGR to RGB if needed (OpenCV uses BGR by default) opencv_image_bgr = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR) image = cv2.cvtColor(opencv_image_bgr, cv2.COLOR_BGR2RGB) # inference with torch.no_grad(): # load image and predict inputs = image_processor(images=image, return_tensors='pt').to(DEVICE) outputs = model(**inputs) # post-process target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE) results = image_processor.post_process_object_detection( outputs=outputs, threshold=CONFIDENCE_TRESHOLD, target_sizes=target_sizes )[0] if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0: # annotate detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD) labels = [ f"{model.config.id2label[class_id]} {confidence:0.2f}" for _, confidence, class_id, _ in detections ] frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels) result_image = Image.fromarray(frame) return result_image else: print("No object detected") return None def convert_to_h264(file_path, output_file): clip = VideoFileClip(file_path) clip.write_videofile(output_file, codec="libx264") clip.close() def inference_from_video(url): box_annotator = sv.BoxAnnotator() # Define the YouTube video URL video_url = url # Create a YouTube object and get the video stream yt = YouTube(video_url) yt_stream = yt.streams.filter(progressive=True, file_extension='mp4').first() # Download the video to a file unique_id = uuid.uuid4().hex[:6].upper() video_folder = os.path.join(VIDEO_PATH, unique_id) video_filename = os.path.join(video_folder, f"{unique_id}.mp4") result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4") result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4") os.mkdir(video_folder) yt_stream.download(filename=video_filename) # Load the video cap = cv2.VideoCapture(video_filename) # Get the video frame dimensions frame_width = int(cap.get(3)) frame_height = int(cap.get(4)) # Define the codec and create a VideoWriter object out = cv2.VideoWriter(result_video_filename_temp, cv2.VideoWriter_fourcc(*'mp4v'), 30, (frame_width, frame_height)) while True: ret, image = cap.read() if not ret: break # inference with torch.no_grad(): # load image and predict inputs = image_processor(images=image, return_tensors='pt').to(DEVICE) outputs = model(**inputs) # post-process target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE) results = image_processor.post_process_object_detection( outputs=outputs, threshold=CONFIDENCE_TRESHOLD, target_sizes=target_sizes )[0] print("transformer result", results) if results['scores'].shape[0] != 0 or results['labels'].shape[0] != 0: # annotate detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_TRESHOLD) labels = [ f"{model.config.id2label[class_id]} {confidence:0.2f}" for _, confidence, class_id, _ in detections ] frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels) out.write(frame) else: out.write(image) cap.release() out.release() convert_to_h264(result_video_filename_temp, result_video_filename) # delete temp file os.remove(result_video_filename_temp) return result_video_filename def testing(file): unique_id = "39EE5A" video_folder = os.path.join(VIDEO_PATH, unique_id) video_filename = os.path.join(video_folder, f"{unique_id}.mp4") result_video_filename = os.path.join(video_folder, f"{unique_id}_result.mp4") result_video_filename_temp = os.path.join(video_folder, f"{unique_id}_result_temp.mp4") convert_to_h264(result_video_filename_temp, result_video_filename) os.remove(result_video_filename_temp) return result_video_filename with gr.Blocks() as demo: gr.Markdown( """ # Hello!!! Welcome to the live demonstration of our traffic accident detection application! Experience cutting-edge technology designed to enhance road safety. Trained on a diverse and multilabel dataset, including 'accident' and 'vehicle' labels. This dual-label dataset enhances the model's capacity to comprehensively understand and interpret complex traffic scenarios and can minimize accident detection errors during congested traffic or traffic jams conditions, making it a potent tool for accident detection and analysis. Let's explore the capabilities of our innovative solution together. """) gr.Markdown("## Traffic Accident Detection from CCTV POV") with gr.Row(): with gr.Column(): input_image = gr.Image(label="Input image", type="pil") with gr.Column(): output_image = gr.Image(label="Output image with predicted accident", type="pil") detect_image_btn = gr.Button(value="Detect Accident") detect_image_btn.click(fn=inference_from_image, inputs=[input_image], outputs=[output_image]) if VIDEO_INFRENCE: gr.Markdown("## Detect Accident from Video") with gr.Row(): with gr.Column(): inp = gr.Textbox(label="Youtube URL", placeholder="You should upload video to youtube and drop the link here") with gr.Column(): output_video = gr.Video(label="Output image with predicted accident", format="mp4") detect_video_btn = gr.Button(value="Detect Accident") detect_video_btn.click(fn=inference_from_video, inputs=[inp], outputs=[output_video]) demo.launch(debug=True)