import streamlit as st import cv2 import numpy as np import tempfile import time from ultralytics import YOLO from huggingface_hub import hf_hub_download from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import os import smtplib from transformers import AutoModel, AutoProcessor from PIL import Image, ImageDraw, ImageFont import re import torch # Email credentials FROM_EMAIL = "Fares5675@gmail.com" EMAIL_PASSWORD = "cawxqifzqiwjufde" # App-Specific Password TO_EMAIL = "Fares5675@gmail.com" SMTP_SERVER = 'smtp.gmail.com' SMTP_PORT = 465 # Arabic dictionary for converting license plate text arabic_dict = { "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥", "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب", "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط", "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن", "H": "ه", "U": "و", "V": "ي", " ": " " } # Color mapping for different classes class_colors = { 0: (0, 255, 0), # Green (Helmet) 1: (255, 0, 0), # Blue (License Plate) 2: (0, 0, 255), # Red (MotorbikeDelivery) 3: (255, 255, 0), # Cyan (MotorbikeSport) 4: (255, 0, 255), # Magenta (No Helmet) 5: (0, 255, 255), # Yellow (Person) } # Load the OCR model processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True) model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda') # Define lane area coordinates (example coordinates) red_lane = np.array([[2, 1583], [1, 1131], [1828, 1141], [1912, 1580]], np.int32) # YOLO inference function def run_yolo(image): results = model(image) return results # Function to process YOLO results and draw bounding boxes def process_results(results, image): boxes = results[0].boxes for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = box.conf[0] cls = int(box.cls[0]) label = model.names[cls] color = class_colors.get(cls, (255, 255, 255)) # Draw rectangle and label cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return image # Process uploaded images def process_image(uploaded_file): image = np.array(cv2.imdecode(np.frombuffer(uploaded_file.read(), np.uint8), 1)) results = run_yolo(image) processed_image = process_results(results, image) processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB) st.image(processed_image_rgb, caption='Detected Image', use_column_width=True) # Create a download button for the processed image im_pil = Image.fromarray(processed_image_rgb) im_pil.save("processed_image.png") with open("processed_image.png", "rb") as file: btn = st.download_button( label="Download Processed Image", data=file, file_name="processed_image.png", mime="image/png" ) # Process and save uploaded videos @st.cache_data # Define the function to process the video def process_video_and_save(uploaded_file): # Path for Arabic font font_path = "alfont_com_arial-1.ttf" # Paths for saving violation images violation_image_path = 'violation.jpg' # Track emails already sent to avoid duplicate emails sent_emails = {} # Dictionary to track violations per license plate violations_dict = {} # Paths for saving violation images and videos video_path = "uploaded_video.mp4" output_video_path = 'output_violation.mp4' # Save the uploaded video file to this path with open(video_path, "wb") as f: f.write(uploaded_file.getbuffer()) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("Error opening video file.") return None # Codec and output settings fourcc = cv2.VideoWriter_fourcc(*'mp4v') fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) margin_y = 50 # Process frames while cap.isOpened(): ret, frame = cap.read() if not ret: break # End of video # Draw the red lane rectangle on each frame cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3) # Red lane # Perform detection using YOLO on the current frame results = model.track(frame) # Process each detection in the results for box in results[0].boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) # Bounding box coordinates label = model.names[int(box.cls)] # Class name (MotorbikeDelivery, Helmet, etc.) color = (255, 0, 0) # Use a fixed color for bounding boxes confidence = box.conf[0].item() # Initialize flags and variables for the violations helmet_violation = False lane_violation = False violation_type = [] # Draw bounding box around detected object cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3) # 3 is the thickness of the rectangle # Add label to the box (e.g., 'MotorbikeDelivery') cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # Detect MotorbikeDelivery if label == 'MotorbikeDelivery' and confidence >= 0.4: motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2] delivery_center = ((x1 + x2) // 2, (y2)) in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False) if in_red_lane >= 0: lane_violation = True violation_type.append("In Red Lane") # Perform detection within the cropped motorbike region sub_results = model(motorbike_crop) for result in sub_results[0].boxes: sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy()) # Bounding box coordinates sub_label = model.names[int(result.cls)] sub_color = (255, 0, 0) # Red color for the bounding box of sub-objects # Draw bounding box around sub-detected objects (No_Helmet, License_plate, etc.) cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2) cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2) if sub_label == 'No_Helmet': helmet_violation = True violation_type.append("No Helmet") continue if sub_label == 'License_plate': license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2] # Apply OCR if a violation is detected if helmet_violation or lane_violation: # Perform OCR on the license plate cv2.imwrite(violation_image_path, frame) license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB)) temp_image_path = 'license_plate.png' license_plate_pil.save(temp_image_path) license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr') filtered_text = filter_license_plate_text(license_plate_text) # Check if the license plate is already detected and saved if filtered_text: # Add the license plate and its violations to the violations dictionary if filtered_text not in violations_dict: violations_dict[filtered_text] = violation_type #{"1234AB":[no_Helmet,In_red_Lane]} send_email(filtered_text, violation_image_path, ', '.join(violation_type)) else: # Update the violations for the license plate if new ones are found current_violations = set(violations_dict[filtered_text]) # no helmet new_violations = set(violation_type) # red lane, no helmet updated_violations = list(current_violations | new_violations) # red_lane, no helmet # If new violations are found, update and send email if updated_violations != violations_dict[filtered_text]: violations_dict[filtered_text] = updated_violations send_email(filtered_text, violation_image_path, ', '.join(updated_violations)) # Draw OCR text (English and Arabic) on the original frame arabic_text = convert_to_arabic(filtered_text) frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255)) frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0)) # Write the processed frame to the output video out.write(frame) # Release resources when done cap.release() out.release() if not os.path.exists(output_video_path): st.error("Error: Processed video was not created.") return output_video_path # Return the path of the processed video # Live video feed processing def live_video_feed(): stframe = st.empty() video = cv2.VideoCapture(0) if not video.isOpened(): st.error("Unable to access the webcam.") return while True: ret, frame = video.read() if not ret: st.error("Failed to capture frame.") break # Run YOLO on the captured frame results = run_yolo(frame) annotated_frame = process_results(results, frame) annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) # Display the frame with detections stframe.image(annotated_frame_rgb, channels="RGB", use_column_width=True) if st.button("Stop"): break video.release() st.stop() # Function to filter license plate text def filter_license_plate_text(license_plate_text): license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text) match = re.search(r'(\d{3,4})\s*([A-Z]{2})', license_plate_text) return f"{match.group(1)} {match.group(2)}" if match else None # Function to convert license plate text to Arabic def convert_to_arabic(license_plate_text): return "".join(arabic_dict.get(char, char) for char in license_plate_text) # Function to send email notification with image attachment def send_email(license_text, violation_image_path, violation_type): if violation_type == 'no_helmet': subject = 'تنبيه مخالفة: عدم ارتداء خوذة' body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'in_red_lane': subject = 'تنبيه مخالفة: دخول المسار الأيسر' body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'no_helmet_in_red_lane': subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر' body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." msg = MIMEMultipart() msg['From'] = FROM_EMAIL msg['To'] = TO_EMAIL msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) if os.path.exists(violation_image_path): with open(violation_image_path, 'rb') as attachment_file: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment_file.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}') msg.attach(part) with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: server.login(FROM_EMAIL, EMAIL_PASSWORD) server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string()) print("Email with attachment sent successfully!") def draw_text_pil(img, text, position, font_path, font_size, color): img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) try: font = ImageFont.truetype(font_path, size=font_size) except IOError: print(f"Font file not found at {font_path}. Using default font.") font = ImageFont.load_default() draw.text(position, text, font=font, fill=color) img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) return img_np # Streamlit app main function def main(): model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="yolov8_Medium.pt") global model model = YOLO(model_file) st.title("Motorbike Violation Detection") input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed")) if input_type == "Image": uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: process_image(uploaded_file) elif input_type == "Video": uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"]) if uploaded_file is not None: output_path = process_video_and_save(uploaded_file) # Now, move the download button here, outside the cached function with open(output_path, "rb") as video_file: btn = st.download_button( label="Download Processed Video", data=video_file, file_name="processed_video.mp4", mime="video/mp4" ) elif input_type == "Live Feed": live_video_feed() if __name__ == "__main__": main()