T5_final_project / processor.py
TheKnight115's picture
Update processor.py
1289698 verified
raw
history blame
6.98 kB
# processor.py
import cv2
import numpy as np
import os
from ultralytics import YOLO
from transformers import AutoModel, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import re
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
# Email credentials (Use environment variables for security)
FROM_EMAIL = os.getenv("FROM_EMAIL")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
TO_EMAIL = os.getenv("TO_EMAIL")
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 465
# Arabic dictionary
arabic_dict = {
"0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
"6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
"J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
"E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
"H": "ه", "U": "و", "V": "ي", " ": " "
}
class_colors = {
0: (0, 255, 0), # Green (Helmet)
1: (255, 0, 0), # Blue (License Plate)
2: (0, 0, 255), # Red (MotorbikeDelivery)
3: (255, 255, 0), # Cyan (MotorbikeSport)
4: (255, 0, 255), # Magenta (No Helmet)
5: (0, 255, 255), # Yellow (Person)
}
# Load models
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
model = YOLO('yolov8_Medium.pt') # Update path as needed
# Define lane area
red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)
# Violation tracking
violations_dict = {}
def filter_license_plate_text(text):
text = re.sub(r'[^A-Z0-9]+', "", text)
match = re.search(r'(\d{4})\s*([A-Z]{2})', text)
return f"{match.group(1)} {match.group(2)}" if match else None
def convert_to_arabic(text):
return "".join(arabic_dict.get(char, char) for char in text)
def send_email(license_text, violation_image_path, violation_type):
subject = {
'No Helmet, In Red Lane': 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر',
'In Red Lane': 'تنبيه مخالفة: دخول المسار الأيسر',
'No Helmet': 'تنبيه مخالفة: عدم ارتداء خوذة'
}.get(violation_type, 'تنبيه مخالفة')
body = {
'No Helmet, In Red Lane': f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
'In Red Lane': f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
'No Helmet': f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
}.get(violation_type, f"تم تغريم دراجة نارية التي تحمل لوحة ({license_text})")
msg = MIMEMultipart()
msg['From'] = FROM_EMAIL
msg['To'] = TO_EMAIL
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if os.path.exists(violation_image_path):
with open(violation_image_path, 'rb') as attachment_file:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment_file.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
msg.attach(part)
try:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(FROM_EMAIL, EMAIL_PASSWORD)
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
except Exception as e:
print(f"Failed to send email: {e}")
def draw_text_pil(img, text, position, font_path, font_size, color):
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
try:
font = ImageFont.truetype(font_path, size=font_size)
except IOError:
font = ImageFont.load_default()
draw.text(position, text, font=font, fill=color)
return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
def process_frame(frame, font_path, violation_image_path='violation.jpg'):
results = model.track(frame)
for box in results[0].boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
label = model.names[int(box.cls)]
color = class_colors.get(int(box.cls), (255, 255, 255))
confidence = box.conf[0].item()
if label == 'MotorbikeDelivery' and confidence >= 0.4:
motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
delivery_center = ((x1 + x2) // 2, y2)
in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
violation_types = []
if in_red_lane >= 0:
violation_types.append("In Red Lane")
sub_results = model(motorbike_crop)
for sub_box in sub_results[0].boxes:
sub_x1, sub_y1, sub_x2, sub_y2 = map(int, sub_box.xyxy[0].cpu().numpy())
sub_label = model.names[int(sub_box.cls)]
if sub_label == 'No_Helmet':
violation_types.append("No Helmet")
elif sub_label == 'License_plate':
license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
if violation_types:
cv2.imwrite(violation_image_path, frame)
license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
license_plate_pil.save('license_plate.png')
license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
filtered_text = filter_license_plate_text(license_plate_text)
if filtered_text:
if filtered_text not in violations_dict:
violations_dict[filtered_text] = violation_types
send_email(filtered_text, violation_image_path, ', '.join(violation_types))
else:
current = set(violations_dict[filtered_text])
new = set(violation_types)
updated = current | new
if updated != current:
violations_dict[filtered_text] = list(updated)
send_email(filtered_text, violation_image_path, ', '.join(updated))
arabic_text = convert_to_arabic(filtered_text)
frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, 30, (255, 255, 255))
frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, 30, (0, 255, 0))
return frame