Spaces:
Sleeping
Sleeping
File size: 6,980 Bytes
6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 6550da2 1289698 1cf4588 6550da2 1289698 6550da2 1289698 6550da2 1289698 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# processor.py
import cv2
import numpy as np
import os
from ultralytics import YOLO
from transformers import AutoModel, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import re
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
# Email credentials (Use environment variables for security)
FROM_EMAIL = os.getenv("FROM_EMAIL")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
TO_EMAIL = os.getenv("TO_EMAIL")
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 465
# Arabic dictionary
arabic_dict = {
"0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
"6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
"J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
"E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
"H": "ه", "U": "و", "V": "ي", " ": " "
}
class_colors = {
0: (0, 255, 0), # Green (Helmet)
1: (255, 0, 0), # Blue (License Plate)
2: (0, 0, 255), # Red (MotorbikeDelivery)
3: (255, 255, 0), # Cyan (MotorbikeSport)
4: (255, 0, 255), # Magenta (No Helmet)
5: (0, 255, 255), # Yellow (Person)
}
# Load models
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
model = YOLO('yolov8_Medium.pt') # Update path as needed
# Define lane area
red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)
# Violation tracking
violations_dict = {}
def filter_license_plate_text(text):
text = re.sub(r'[^A-Z0-9]+', "", text)
match = re.search(r'(\d{4})\s*([A-Z]{2})', text)
return f"{match.group(1)} {match.group(2)}" if match else None
def convert_to_arabic(text):
return "".join(arabic_dict.get(char, char) for char in text)
def send_email(license_text, violation_image_path, violation_type):
subject = {
'No Helmet, In Red Lane': 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر',
'In Red Lane': 'تنبيه مخالفة: دخول المسار الأيسر',
'No Helmet': 'تنبيه مخالفة: عدم ارتداء خوذة'
}.get(violation_type, 'تنبيه مخالفة')
body = {
'No Helmet, In Red Lane': f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
'In Red Lane': f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
'No Helmet': f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
}.get(violation_type, f"تم تغريم دراجة نارية التي تحمل لوحة ({license_text})")
msg = MIMEMultipart()
msg['From'] = FROM_EMAIL
msg['To'] = TO_EMAIL
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if os.path.exists(violation_image_path):
with open(violation_image_path, 'rb') as attachment_file:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment_file.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
msg.attach(part)
try:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(FROM_EMAIL, EMAIL_PASSWORD)
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
except Exception as e:
print(f"Failed to send email: {e}")
def draw_text_pil(img, text, position, font_path, font_size, color):
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
try:
font = ImageFont.truetype(font_path, size=font_size)
except IOError:
font = ImageFont.load_default()
draw.text(position, text, font=font, fill=color)
return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
def process_frame(frame, font_path, violation_image_path='violation.jpg'):
results = model.track(frame)
for box in results[0].boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
label = model.names[int(box.cls)]
color = class_colors.get(int(box.cls), (255, 255, 255))
confidence = box.conf[0].item()
if label == 'MotorbikeDelivery' and confidence >= 0.4:
motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
delivery_center = ((x1 + x2) // 2, y2)
in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
violation_types = []
if in_red_lane >= 0:
violation_types.append("In Red Lane")
sub_results = model(motorbike_crop)
for sub_box in sub_results[0].boxes:
sub_x1, sub_y1, sub_x2, sub_y2 = map(int, sub_box.xyxy[0].cpu().numpy())
sub_label = model.names[int(sub_box.cls)]
if sub_label == 'No_Helmet':
violation_types.append("No Helmet")
elif sub_label == 'License_plate':
license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
if violation_types:
cv2.imwrite(violation_image_path, frame)
license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
license_plate_pil.save('license_plate.png')
license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
filtered_text = filter_license_plate_text(license_plate_text)
if filtered_text:
if filtered_text not in violations_dict:
violations_dict[filtered_text] = violation_types
send_email(filtered_text, violation_image_path, ', '.join(violation_types))
else:
current = set(violations_dict[filtered_text])
new = set(violation_types)
updated = current | new
if updated != current:
violations_dict[filtered_text] = list(updated)
send_email(filtered_text, violation_image_path, ', '.join(updated))
arabic_text = convert_to_arabic(filtered_text)
frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, 30, (255, 255, 255))
frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, 30, (0, 255, 0))
return frame
|