# Email credentials
FROM_EMAIL = ""
EMAIL_PASSWORD = "" # App-Specific Password
TO_EMAIL = ""
SMTP_SERVER = ''
SMTP_PORT = 465

# Arabic dictionary for converting license plate text
arabic_dict = {
    "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥", "6": "٦", "7": "٧", "8": "٨", "9": "٩",
    "A": "ا", "B": "ب", "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط", "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن", "H": "ه", "U": "و", "V": "ي", " ": " "
}

# Color mapping for different classes
class_colors = {
    0: (0, 255, 0),  # Green (Helmet) 1: (255, 0, 0), # Blue (License Plate) 2: (0, 0, 255), # Red (MotorbikeDelivery) 3: (255, 255, 0), # Cyan (MotorbikeSport) 4: (255, 0, 255), # Magenta (No Helmet) 5: (0, 255, 255), # Yellow (Person) } # Load the OCR model processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True) model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda') # Define lane area coordinates (example coordinates) red_lane = np.array([[2, 1583], [1, 1131], [1828, 1141], [1912, 1580]], np.int32) # YOLO inference function def run_yolo(image): results = model(image) return results # Function to process YOLO results and draw bounding boxes def process_results(results, image): boxes = results[0].boxes for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = box.conf[0] cls = int(box.cls[0]) label = model.names[cls] color = class_colors.get(cls, (255, 255, 255)) # Draw rectangle and label cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return image # Process uploaded images def process_image(uploaded_file): image = np.array(cv2.imdecode(np.frombuffer(, np.uint8), 1)) results = run_yolo(image) processed_image = process_results(results, image) processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB) st.image(processed_image_rgb, caption='Detected Image', use_column_width=True) # Create a download button for the processed image im_pil = Image.fromarray(processed_image_rgb)"processed_image.png") with open("processed_image.png", "rb") as file: btn = st.download_button( label="Download Processed Image", data=file, file_name="processed_image.png", mime="image/png" ) # Process and save uploaded videos @st.cache_data # Define the function to process the video def process_video_and_save(uploaded_file): # Path for Arabic font font_path = "alfont_com_arial-1.ttf" # Paths for saving violation images violation_image_path = 'violation.jpg' # Track emails already sent to avoid duplicate emails sent_emails = {} # Dictionary to track violations per license plate violations_dict = {} # Paths for saving violation images and videos video_path = "uploaded_video.mp4" output_video_path = 'output_violation.mp4' # Save the uploaded video file to this path with open(video_path, "wb") as f: f.write(uploaded_file.getbuffer()) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("Error opening video file.") return None # Codec and output settings fourcc = cv2.VideoWriter_fourcc(*'mp4v') fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) margin_y = 50 # Process frames while cap.isOpened(): ret, frame = if not ret: break # End of video # Draw the red lane rectangle on each frame cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3) # Red lane # Perform detection using YOLO on the current frame results = model.track(frame) # Process each detection in the results for box in results[0].boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) # Bounding box coordinates label = model.names[int(box.cls)] # Class name (MotorbikeDelivery, Helmet, etc.) color = (255, 0, 0) # Use a fixed color for bounding boxes confidence = box.conf[0].item() # Initialize flags and variables for the violations helmet_violation = False lane_violation = False violation_type = [] # Draw bounding box around detected object cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3) # 3 is the thickness of the rectangle # Add label to the box (e.g., 'MotorbikeDelivery') cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # Detect MotorbikeDelivery if label == 'MotorbikeDelivery' and confidence >= 0.4: motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2] delivery_center = ((x1 + x2) // 2, (y2)) in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False) if in_red_lane >= 0: lane_violation = True violation_type.append("In Red Lane") # Perform detection within the cropped motorbike region sub_results = model(motorbike_crop) for result in sub_results[0].boxes: sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy()) # Bounding box coordinates sub_label = model.names[int(result.cls)] sub_color = (255, 0, 0) # Red color for the bounding box of sub-objects # Draw bounding box around sub-detected objects (No_Helmet, License_plate, etc.) cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2) cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2) if sub_label == 'No_Helmet': helmet_violation = True violation_type.append("No Helmet") continue if sub_label == 'License_plate': license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2] # Apply OCR if a violation is detected if helmet_violation or lane_violation: # Perform OCR on the license plate cv2.imwrite(violation_image_path, frame) license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB)) temp_image_path = 'license_plate.png' license_plate_text =, temp_image_path, ocr_type='ocr') filtered_text = filter_license_plate_text(license_plate_text) # Check if the license plate is already detected and saved if filtered_text: # Add the license plate and its violations to the violations dictionary if filtered_text not in violations_dict: violations_dict[filtered_text] = violation_type #{"1234AB":[no_Helmet,In_red_Lane]} send_email(filtered_text, violation_image_path, ', '.join(violation_type)) else: # Update the violations for the license plate if new ones are found current_violations = set(violations_dict[filtered_text]) # no helmet new_violations = set(violation_type) # red lane, no helmet updated_violations = list(current_violations | new_violations) # red_lane, no helmet # If new violations are found, update and send email if updated_violations != violations_dict[filtered_text]: violations_dict[filtered_text] = updated_violations send_email(filtered_text, violation_image_path, ', '.join(updated_violations)) # Draw OCR text (English and Arabic) on the original frame arabic_text = convert_to_arabic(filtered_text) frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255)) frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0)) # Write the processed frame to the output video out.write(frame) # Release resources when done cap.release() out.release() if not os.path.exists(output_video_path): st.error("Error: Processed video was not created.") return output_video_path # Return the path of the processed video # Live video feed processing def live_video_feed(): stframe = st.empty() video = cv2.VideoCapture(0) if not video.isOpened(): st.error("Unable to access the webcam.") return while True: ret, frame = if not ret: st.error("Failed to capture frame.") break # Run YOLO on the captured frame results = run_yolo(frame) annotated_frame = process_results(results, frame) annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) # Display the frame with detections stframe.image(annotated_frame_rgb, channels="RGB", use_column_width=True) if st.button("Stop"): break video.release() st.stop() # Function to filter license plate text def filter_license_plate_text(license_plate_text): license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text) match ='(\d{3,4})\s*([A-Z]{2})', license_plate_text) return f"{} {}" if match else None # Function to convert license plate text to Arabic def convert_to_arabic(license_plate_text): return "".join(arabic_dict.get(char, char) for char in license_plate_text) # Function to send email notification with image attachment def send_email(license_text, violation_image_path, violation_type): if violation_type == 'no_helmet': subject = 'تنبيه مخالفة: عدم ارتداء خوذة' body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'in_red_lane': subject = 'تنبيه مخالفة: دخول المسار الأيسر' body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'no_helmet_in_red_lane': subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر' body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." msg = MIMEMultipart() msg['From'] = FROM_EMAIL msg['To'] = TO_EMAIL msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) if os.path.exists(violation_image_path): with open(violation_image_path, 'rb') as attachment_file: part = MIMEBase('application', 'octet-stream') part.set_payload( encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}') msg.attach(part) with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: server.login(FROM_EMAIL, EMAIL_PASSWORD) server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string()) print("Email with attachment sent successfully!") def draw_text_pil(img, text, position, font_path, font_size, color): img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) try: font = ImageFont.truetype(font_path, size=font_size) except IOError: print(f"Font file not found at {font_path}. Using default font.") font = ImageFont.load_default() draw.text(position, text, font=font, fill=color) img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) return img_np # Streamlit app main function def main(): model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="") global model model = YOLO(model_file) st.title("Motorbike Violation Detection") input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed")) if input_type == "Image": uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: process_image(uploaded_file) elif input_type == "Video": uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"]) if uploaded_file is not None: output_path = process_video_and_save(uploaded_file) # Now, move the download button here, outside the cached function with open(output_path, "rb") as video_file: btn = st.download_button( label="Download Processed Video", data=video_file, file_name="processed_video.mp4", mime="video/mp4" ) elif input_type == "Live Feed": live_video_feed() if __name__ == "__main__": main()