File size: 6,588 Bytes
fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 0d3352f fbfc1c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import firebase_admin # type: ignore
from firebase_admin import credentials, firestore # type: ignore
from joblib import dump, load # type: ignore
import datetime
import re
from sklearn.feature_extraction.text import TfidfVectorizer # type: ignore
from sklearn.naive_bayes import MultinomialNB # type: ignore
import pandas as pd # type: ignore
# التهيئة مرة واحدة فقط
if not firebase_admin._apps:
# تأكد من وضع المسار الصحيح لملف التوثيق Firebase
cred = credentials.Certificate("D:/app-sentinel-7qnr19-firebase-adminsdk-kjmbe-f38e16a432.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# تحميل النموذج الحالي والمحول
try:
model = load('model.joblib')
vectorizer = load('vectorizer.joblib')
print("Model and vectorizer loaded successfully.")
except Exception as e:
model = None
vectorizer = None
print(f"Model and vectorizer not found. You need to train the model. Error: {e}")
# 1. وظيفة لتحليل النصوص وتصنيفها
def classify_and_store_message(message):
global model, vectorizer
try:
if not model or not vectorizer:
raise ValueError("Model or vectorizer not loaded. Train or load the model first.")
# تحويل الرسالة إلى سمات رقمية
message_vector = vectorizer.transform([message])
classification = model.predict(message_vector)[0]
# إعداد البيانات للتخزين
message_data = {
'text': message,
'classification': classification,
'timestamp': datetime.datetime.now()
}
# تخزين الرسالة في مجموعة Firestore حسب التصنيف
collection_name = classification.split('_')[0] # استخدام الجزء الأول من التصنيف كاسم المجموعة
db.collection(collection_name).add(message_data)
# تخزين الرسالة في مجموعة 'all_messages' لجميع الرسائل
db.collection('all_messages').add(message_data)
# تخزين الرسالة في مجموعة 'recently_analyzed_messages'
db.collection('recently_analyzed_messages').add(message_data)
print(f"Message classified as {classification} and stored in Firestore.")
return classification
except Exception as e:
print(f"Error classifying message: {e}")
return None
# 2. وظيفة لتحليل النصوص المدخلة
def analyze_input_text():
print("\n--- SMS Classification and Link Analysis Tool ---")
while True:
user_input = input("Enter a message to classify (or type 'exit' to quit): ").strip()
if user_input.lower() == 'exit':
print("Exiting the tool. Goodbye!")
break
# استخراج الروابط من النص المدخل
links = re.findall(r'(https?://[^\s]+)', user_input)
if links:
print(f"Detected links: {links}")
# تحليل الروابط (يمكن تطوير التحليل ليشمل أدوات أو خدمات خارجية)
for link in links:
# افتراض تحليل بسيط (يمكن تحسينه لاحقًا)
if "secure" in link or "safe" in link:
print(f"Link '{link}' appears safe.")
else:
print(f"Link '{link}' might be suspicious.")
else:
print("No links detected in the message.")
# تصنيف الرسالة
classification = classify_and_store_message(user_input)
if classification:
print(f"Message classified as: {classification}")
else:
print("Unable to classify the message. Please try again.")
# 3. دالة لتحديث النموذج مع بيانات جديدة
def update_model_with_new_data(new_messages, new_labels):
global model, vectorizer
try:
# تحميل البيانات الحالية
data = {
'message': new_messages,
'label': new_labels
}
df_new = pd.DataFrame(data)
# تحديث المحول والنموذج
if vectorizer is None or model is None:
vectorizer = TfidfVectorizer()
X_new = vectorizer.fit_transform(df_new['message'])
else:
X_new = vectorizer.transform(df_new['message'])
# جمع البيانات الجديدة مع القديمة وإعادة التدريب
y_new = df_new['label']
if model is None:
model = MultinomialNB()
model.partial_fit(X_new, y_new, classes=['spam_phishing', 'social_phishing', 'news_phishing', 'advertisement_phishing'])
# حفظ النموذج الجديد
dump(model, 'model.joblib')
dump(vectorizer, 'vectorizer.joblib')
print("Model updated and saved successfully.")
except Exception as e:
print(f"Error updating model: {e}")
# 4. دالة لاختبار النظام
def test_system():
test_messages = [
"Win a free vacation now! Visit https://spam-link.com",
"Breaking news: Major stock updates today.",
"Don't forget our meeting tomorrow at 10 AM.",
"Click here to secure your bank account: https://phishing-link.com",
"Exclusive offers just for you! Buy now at https://ad-link.com"
]
for msg in test_messages:
print(f"\nAnalyzing message: {msg}")
analyze_input_text(msg)
# 5. وظيفة للتصحيح اليدوي
def correct_classification(message_id, correct_label):
try:
# جلب الرسالة من Firestore
message_ref = db.collection('all_messages').document(message_id)
message_data = message_ref.get().to_dict()
if not message_data:
print("Message not found.")
return
# تحديث التصنيف في Firestore
message_ref.update({'classification': correct_label})
# إضافة البيانات إلى نموذج التدريب الجديد
update_model_with_new_data([message_data['text']], [correct_label])
print(f"Message classification corrected to {correct_label} and model updated.")
except Exception as e:
print(f"Error correcting classification: {e}")
# تشغيل تحليل النصوص
analyze_input_text()
|