File size: 6,980 Bytes
6550da2
 
 
 
 
 
 
 
 
1289698
 
 
 
 
6550da2
1289698
 
 
 
6550da2
 
 
1289698
6550da2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1289698
6550da2
 
1289698
6550da2
1289698
6550da2
 
1289698
6550da2
 
1289698
 
 
6550da2
 
1289698
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6550da2
 
 
 
1289698
6550da2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1289698
6550da2
1289698
6550da2
 
1289698
 
6550da2
 
 
 
 
 
 
1289698
6550da2
1289698
6550da2
 
1289698
 
 
6550da2
1289698
 
6550da2
1289698
6550da2
 
1289698
1cf4588
6550da2
 
 
1289698
 
6550da2
1289698
 
 
 
 
 
6550da2
1289698
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# processor.py

import cv2
import numpy as np
import os
from ultralytics import YOLO
from transformers import AutoModel, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import re
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders

# Email credentials (Use environment variables for security)
FROM_EMAIL = os.getenv("FROM_EMAIL")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
TO_EMAIL = os.getenv("TO_EMAIL")
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 465

# Arabic dictionary
arabic_dict = {
    "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
    "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
    "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
    "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
    "H": "ه", "U": "و", "V": "ي", " ": " "
}
class_colors = {
    0: (0, 255, 0),    # Green (Helmet)
    1: (255, 0, 0),    # Blue (License Plate)
    2: (0, 0, 255),    # Red (MotorbikeDelivery)
    3: (255, 255, 0),  # Cyan (MotorbikeSport)
    4: (255, 0, 255),  # Magenta (No Helmet)
    5: (0, 255, 255),  # Yellow (Person)
}

# Load models
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
model = YOLO('yolov8_Medium.pt')  # Update path as needed

# Define lane area
red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)

# Violation tracking
violations_dict = {}

def filter_license_plate_text(text):
    text = re.sub(r'[^A-Z0-9]+', "", text)
    match = re.search(r'(\d{4})\s*([A-Z]{2})', text)
    return f"{match.group(1)} {match.group(2)}" if match else None

def convert_to_arabic(text):
    return "".join(arabic_dict.get(char, char) for char in text)

def send_email(license_text, violation_image_path, violation_type):
    subject = {
        'No Helmet, In Red Lane': 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر',
        'In Red Lane': 'تنبيه مخالفة: دخول المسار الأيسر',
        'No Helmet': 'تنبيه مخالفة: عدم ارتداء خوذة'
    }.get(violation_type, 'تنبيه مخالفة')

    body = {
        'No Helmet, In Red Lane': f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
        'In Red Lane': f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
        'No Helmet': f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
    }.get(violation_type, f"تم تغريم دراجة نارية التي تحمل لوحة ({license_text})")

    msg = MIMEMultipart()
    msg['From'] = FROM_EMAIL
    msg['To'] = TO_EMAIL
    msg['Subject'] = subject
    msg.attach(MIMEText(body, 'plain'))

    if os.path.exists(violation_image_path):
        with open(violation_image_path, 'rb') as attachment_file:
            part = MIMEBase('application', 'octet-stream')
            part.set_payload(attachment_file.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
            msg.attach(part)

    try:
        with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
            server.login(FROM_EMAIL, EMAIL_PASSWORD)
            server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
    except Exception as e:
        print(f"Failed to send email: {e}")

def draw_text_pil(img, text, position, font_path, font_size, color):
    img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img_pil)
    try:
        font = ImageFont.truetype(font_path, size=font_size)
    except IOError:
        font = ImageFont.load_default()
    draw.text(position, text, font=font, fill=color)
    return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

def process_frame(frame, font_path, violation_image_path='violation.jpg'):
    results = model.track(frame)
    for box in results[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
        label = model.names[int(box.cls)]
        color = class_colors.get(int(box.cls), (255, 255, 255))
        confidence = box.conf[0].item()

        if label == 'MotorbikeDelivery' and confidence >= 0.4:
            motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
            delivery_center = ((x1 + x2) // 2, y2)
            in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
            violation_types = []
            if in_red_lane >= 0:
                violation_types.append("In Red Lane")

            sub_results = model(motorbike_crop)
            for sub_box in sub_results[0].boxes:
                sub_x1, sub_y1, sub_x2, sub_y2 = map(int, sub_box.xyxy[0].cpu().numpy())
                sub_label = model.names[int(sub_box.cls)]
                if sub_label == 'No_Helmet':
                    violation_types.append("No Helmet")
                elif sub_label == 'License_plate':
                    license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
                    if violation_types:
                        cv2.imwrite(violation_image_path, frame)
                        license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
                        license_plate_pil.save('license_plate.png')
                        license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
                        filtered_text = filter_license_plate_text(license_plate_text)
                        if filtered_text:
                            if filtered_text not in violations_dict:
                                violations_dict[filtered_text] = violation_types
                                send_email(filtered_text, violation_image_path, ', '.join(violation_types))
                            else:
                                current = set(violations_dict[filtered_text])
                                new = set(violation_types)
                                updated = current | new
                                if updated != current:
                                    violations_dict[filtered_text] = list(updated)
                                    send_email(filtered_text, violation_image_path, ', '.join(updated))
                            arabic_text = convert_to_arabic(filtered_text)
                            frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, 30, (255, 255, 255))
                            frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, 30, (0, 255, 0))
    return frame