|
import uuid |
|
import time |
|
from datetime import datetime |
|
import json |
|
import os |
|
import re |
|
import numpy as np |
|
from flask import Flask, render_template, request, jsonify, flash, redirect, url_for, abort, Response, stream_with_context |
|
from flask_sqlalchemy import SQLAlchemy |
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user |
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
from werkzeug.utils import secure_filename |
|
from sqlalchemy import func |
|
from sqlalchemy.orm import relationship |
|
from g4f.client import Client |
|
import markdown |
|
from PIL import Image |
|
import imageio.v3 as iio |
|
import numpy as np |
|
import os |
|
import random |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
|
|
|
@app.template_filter('from_json') |
|
def from_json(value): |
|
try: |
|
return json.loads(value) |
|
except: |
|
return [] |
|
|
|
@app.template_filter('format_datetime') |
|
def format_datetime(value): |
|
"""Format a datetime object to a readable string""" |
|
now = datetime.now() |
|
diff = now - value |
|
|
|
if diff.days == 0: |
|
if diff.seconds < 30: |
|
return "الآن" |
|
elif diff.seconds < 60: |
|
return f"منذ {diff.seconds} ثانية" |
|
elif diff.seconds < 3600: |
|
minutes = diff.seconds // 60 |
|
if minutes == 1: |
|
return "منذ دقيقة واحدة" |
|
elif minutes == 2: |
|
return "منذ دقيقتين" |
|
elif minutes <= 10: |
|
return f"منذ {minutes} دقائق" |
|
else: |
|
return f"منذ {minutes} دقيقة" |
|
else: |
|
hours = diff.seconds // 3600 |
|
if hours == 1: |
|
return "منذ ساعة واحدة" |
|
elif hours == 2: |
|
return "منذ ساعتين" |
|
elif hours <= 10: |
|
return f"منذ {hours} ساعات" |
|
else: |
|
return f"منذ {hours} ساعة" |
|
elif diff.days == 1: |
|
return "منذ يوم واحد" |
|
elif diff.days == 2: |
|
return "منذ يومين" |
|
elif diff.days <= 10: |
|
return f"منذ {diff.days} أيام" |
|
elif diff.days < 30: |
|
return f"منذ {diff.days} يوماً" |
|
else: |
|
|
|
return value.strftime("%I:%M %p").replace("AM", "صباحاً").replace("PM", "مساءً") + " " + value.strftime("%Y-%m-%d") |
|
|
|
app.config['SECRET_KEY'] = 'your-secret-key' |
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://sql7760039:[email protected]/sql7760039' |
|
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
|
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { |
|
'charset': 'utf8mb4' |
|
} |
|
app.config['UPLOAD_FOLDER2'] = 'static/videos/video' |
|
app.config['THUMBNAIL_FOLDER'] = 'static/videos/thumbnails' |
|
from sqlalchemy.pool import NullPool |
|
|
|
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'poolclass': NullPool} |
|
from sqlalchemy.orm import scoped_session, sessionmaker |
|
|
|
|
|
|
|
|
|
db = SQLAlchemy(app) |
|
login_manager = LoginManager(app) |
|
login_manager.login_view = 'login' |
|
|
|
class User(UserMixin, db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
username = db.Column(db.String(80), unique=True, nullable=False) |
|
email = db.Column(db.String(120), unique=True, nullable=False) |
|
password = db.Column(db.String(120), nullable=False) |
|
profession = db.Column(db.String(120), nullable=False) |
|
skills = db.Column(db.String(500), nullable=False) |
|
security_question = db.Column(db.String(200), nullable=False) |
|
security_answer = db.Column(db.String(200), nullable=False) |
|
profile_photo = db.Column(db.String(200), nullable=True, default='uploads/default-avatar.jpg') |
|
|
|
|
|
|
|
class Comment(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
content = db.Column(db.Text, nullable=False) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) |
|
parent_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=True) |
|
created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
|
|
replies = relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy='dynamic') |
|
|
|
class Post(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
title = db.Column(db.String(120), nullable=True) |
|
content = db.Column(db.Text, nullable=True) |
|
image_url = db.Column(db.String(120), nullable=True) |
|
video_url = db.Column(db.String(120), nullable=True) |
|
background_color = db.Column(db.String(20), nullable=True) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
user_email = db.Column(db.String(120), nullable=False) |
|
user_profession = db.Column(db.String(120), nullable=False) |
|
created_at = db.Column(db.DateTime, nullable=False, default=datetime.now) |
|
|
|
def __repr__(self): |
|
return f'<Post {self.title}>' |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return User.query.get(int(user_id)) |
|
|
|
|
|
class Message(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
sender_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
receiver_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
content = db.Column(db.Text) |
|
file_url = db.Column(db.String(255)) |
|
file_type = db.Column(db.String(50)) |
|
created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
watched_by_receiver = db.Column(db.Boolean, default=False) |
|
watched_by_sender = db.Column(db.Boolean, default=False) |
|
last_updated = db.Column(db.DateTime, onupdate=datetime.utcnow) |
|
|
|
|
|
|
|
|
|
class Article(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
title = db.Column(db.String(200), nullable=False) |
|
content = db.Column(db.Text, nullable=False) |
|
user = db.Column(db.String(100), nullable=False) |
|
id_user = db.Column(db.String(100), nullable=False) |
|
photo_user = db.Column(db.String(100), nullable=False) |
|
profession_user = db.Column(db.String(100), nullable=False) |
|
created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
|
|
class Video(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
video_path = db.Column(db.String(200), nullable=False) |
|
thumbnail_path = db.Column(db.String(200), nullable=True) |
|
uploader_name = db.Column(db.String(80), nullable=False) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
uploader_image = db.Column(db.String(200), nullable=True) |
|
upload_time = db.Column(db.DateTime, default=datetime.utcnow) |
|
title = db.Column(db.String(200), nullable=False) |
|
category = db.Column(db.String(100), nullable=False) |
|
|
|
|
|
class Playlist(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
name = db.Column(db.String(200), nullable=False) |
|
is_public = db.Column(db.Boolean, default=True) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
|
|
|
|
class PlaylistVideo(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
playlist_id = db.Column(db.Integer, db.ForeignKey('playlist.id'), nullable=False) |
|
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=False) |
|
added_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
@app.route('/post/<int:post_id>') |
|
def view_post(post_id): |
|
|
|
post = Post.query.get_or_404(post_id) |
|
|
|
|
|
user = User.query.get_or_404(post.user_id) |
|
|
|
|
|
posts = Post.query.filter( |
|
Post.user_profession == current_user.profession, |
|
Post.user_email != current_user.email, |
|
Post.id != post_id |
|
).order_by(Post.id.desc()).all() |
|
|
|
|
|
post_authors = {} |
|
for p in posts: |
|
if p.user_email not in post_authors: |
|
author = User.query.filter_by(email=p.user_email).first() |
|
post_authors[p.user_email] = author.profile_photo if author else 'uploads/default-avatar.jpg' |
|
|
|
|
|
return render_template('dashboard.html', posts=posts, post=post, user=user, show_post_modal=True, post_authors=post_authors) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/share/<int:post_id>', methods=['POST']) |
|
def share_post(post_id): |
|
post = Post.query.get_or_404(post_id) |
|
db.session.commit() |
|
return jsonify({"success": True}) |
|
|
|
|
|
|
|
@app.route('/') |
|
def home(): |
|
if current_user.is_authenticated: |
|
return redirect(url_for('dashboard')) |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
email = f"{username}@moltka.eg" |
|
password = request.form.get('password') |
|
confirm_password = request.form.get('confirm_password') |
|
profession = request.form.get('profession') |
|
skills = request.form.get('skills') |
|
security_question = request.form.get('security_question') |
|
security_answer = request.form.get('security_answer') |
|
|
|
|
|
if password != confirm_password: |
|
flash('كلمات المرور غير متطابقة!', 'error') |
|
return redirect(url_for('register')) |
|
|
|
|
|
user = User.query.filter_by(email=email).first() |
|
if user: |
|
flash('اسم المستخدم موجود بالفعل!', 'error') |
|
return redirect(url_for('register')) |
|
|
|
|
|
new_user = User( |
|
username=username, |
|
email=email, |
|
password=password, |
|
profession=profession, |
|
skills=skills, |
|
security_question=security_question, |
|
security_answer=security_answer |
|
) |
|
|
|
db.session.add(new_user) |
|
db.session.commit() |
|
|
|
flash('تم إنشاء الحساب بنجاح!', 'success') |
|
return redirect(url_for('login')) |
|
|
|
return render_template('register.html') |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
email = f"{username}@moltka.eg" |
|
password = request.form.get('password') |
|
|
|
|
|
user = User.query.filter_by(email=email).first() |
|
|
|
|
|
if user and user.password == password: |
|
login_user(user) |
|
return redirect(url_for('dashboard')) |
|
|
|
flash('اسم المستخدم أو كلمة المرور غير صحيحة!', 'error') |
|
return render_template('login.html') |
|
|
|
@app.route('/dashboard') |
|
@login_required |
|
def dashboard(): |
|
|
|
user_hash = str(uuid.uuid5(uuid.NAMESPACE_DNS, current_user.username))[:6] |
|
|
|
|
|
posts = Post.query.filter( |
|
Post.user_profession == current_user.profession, |
|
Post.user_email != current_user.email |
|
).order_by(Post.id.desc()).all() |
|
|
|
|
|
post_authors = {} |
|
for post in posts: |
|
if post.user_email not in post_authors: |
|
author = User.query.filter_by(email=post.user_email).first() |
|
post_authors[post.user_email] = author.profile_photo if author else 'uploads/default-avatar.jpg' |
|
|
|
|
|
unread_count = Message.query.filter_by(receiver_id=current_user.id, watched_by_receiver=False).count() |
|
|
|
return render_template('dashboard.html', user_hash=user_hash, posts=posts, post_authors=post_authors, unread_count=unread_count) |
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
flash('تم تسجيل الخروج بنجاح', 'success') |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/validate-username') |
|
def validate_username(): |
|
username = request.args.get('username') |
|
email = f"{username}@moltka.eg" |
|
user = User.query.filter_by(email=email).first() |
|
return jsonify({'exists': user is not None}) |
|
|
|
@app.route('/forgot_password', methods=['GET', 'POST']) |
|
def forgot_password(): |
|
if request.method == 'POST': |
|
email = request.form.get('email') |
|
user = User.query.filter_by(email=email).first() |
|
|
|
if user: |
|
return render_template('reset_password.html', email=email) |
|
else: |
|
flash('البريد الإلكتروني غير موجود!', 'error') |
|
|
|
return render_template('forgot_password.html') |
|
|
|
@app.route('/reset_password', methods=['POST']) |
|
def reset_password(): |
|
email = request.form.get('email') |
|
security_answer = request.form.get('security_answer') |
|
new_password = request.form.get('new_password') |
|
confirm_password = request.form.get('confirm_password') |
|
|
|
user = User.query.filter_by(email=email).first() |
|
|
|
if user and user.security_answer == security_answer: |
|
if new_password == confirm_password: |
|
hashed_password = generate_password_hash(new_password) |
|
user.password = hashed_password |
|
db.session.commit() |
|
flash('تم تغيير كلمة المرور بنجاح!', 'success') |
|
return redirect(url_for('login')) |
|
else: |
|
flash('كلمات المرور غير متطابقة!', 'error') |
|
else: |
|
flash('الإجابة على سؤال الأمان غير صحيحة!', 'error') |
|
|
|
return redirect(url_for('forgot_password')) |
|
|
|
|
|
@app.route('/validate_security_answer', methods=['POST']) |
|
def validate_security_answer(): |
|
data = request.get_json() |
|
email = data.get('email') |
|
security_answer = data.get('security_answer') |
|
|
|
user = User.query.filter_by(email=email).first() |
|
if user and user.security_answer == security_answer: |
|
return jsonify({'valid': True}) |
|
else: |
|
return jsonify({'valid': False}) |
|
|
|
@app.route('/profile') |
|
@login_required |
|
def profile(): |
|
|
|
posts = Post.query.filter_by( |
|
user_email=current_user.email |
|
).order_by(Post.id.desc()).all() |
|
|
|
return render_template('profile.html', posts=posts) |
|
|
|
|
|
|
|
@app.route('/profile/<user_email>') |
|
def public_profile(user_email): |
|
user = User.query.filter_by(email=user_email).first() |
|
if not user: |
|
abort(404) |
|
posts = Post.query.filter_by(user_email=user_email).all() |
|
return render_template('public_profile.html', user=user, posts=posts) |
|
|
|
|
|
@app.route('/edit_post_content', methods=['POST']) |
|
@login_required |
|
def edit_post_content(): |
|
post_id = request.form.get('post_id') |
|
new_content = request.form.get('new_content') |
|
|
|
post = Post.query.get(post_id) |
|
if post and post.user_email == current_user.email: |
|
post.content = new_content |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
return jsonify({'success': False}) |
|
|
|
@app.route('/delete_post', methods=['POST']) |
|
@login_required |
|
def delete_post(): |
|
post_id = request.form.get('post_id') |
|
post = Post.query.get(post_id) |
|
|
|
if post and post.user_email == current_user.email: |
|
db.session.delete(post) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
return jsonify({'success': False}) |
|
|
|
@app.route('/update_profile', methods=['POST']) |
|
@login_required |
|
def update_profile(): |
|
username = request.form.get('username') |
|
profession = request.form.get('profession') |
|
skills = request.form.get('skills') |
|
|
|
if not username or not profession: |
|
flash('جميع الحقول مطلوبة!', 'error') |
|
return redirect(url_for('profile')) |
|
|
|
|
|
if username != current_user.username: |
|
existing_user = User.query.filter_by(username=username).first() |
|
if existing_user: |
|
flash('اسم المستخدم مستخدم بالفعل!', 'error') |
|
return redirect(url_for('profile')) |
|
|
|
try: |
|
|
|
current_user.username = username |
|
current_user.email = f"{username}@moltka.eg" |
|
current_user.profession = profession |
|
current_user.skills = skills |
|
|
|
|
|
avatar = request.files.get('avatar') |
|
remove_photo = request.form.get('remove_photo') |
|
|
|
if remove_photo: |
|
|
|
if current_user.profile_photo != 'uploads/default-avatar.jpg': |
|
|
|
old_photo_path = os.path.join('static', current_user.profile_photo) |
|
if os.path.exists(old_photo_path) and 'default-avatar' not in old_photo_path: |
|
os.remove(old_photo_path) |
|
current_user.profile_photo = 'uploads/default-avatar.jpg' |
|
elif avatar and avatar.filename: |
|
|
|
if current_user.profile_photo != 'uploads/default-avatar.jpg': |
|
old_photo_path = os.path.join('static', current_user.profile_photo) |
|
if os.path.exists(old_photo_path) and 'default-avatar' not in old_photo_path: |
|
os.remove(old_photo_path) |
|
|
|
|
|
filename = f"{int(time.time())}_{secure_filename(avatar.filename)}" |
|
avatar.save(os.path.join('static/uploads', filename)) |
|
current_user.profile_photo = 'uploads/' + filename |
|
|
|
|
|
posts = Post.query.filter_by(user_email=f"{current_user.username}@moltka.eg").all() |
|
for post in posts: |
|
post.user_email = f"{username}@moltka.eg" |
|
post.user_profession = profession |
|
|
|
db.session.commit() |
|
flash('تم تحديث الملف الشخصي بنجاح!', 'success') |
|
except Exception as e: |
|
db.session.rollback() |
|
flash('حدث خطأ أثناء تحديث الملف الشخصي!', 'error') |
|
print(f"Error updating profile: {str(e)}") |
|
|
|
return redirect(url_for('profile')) |
|
|
|
|
|
|
|
|
|
@app.route('/add_comment', methods=['POST']) |
|
@login_required |
|
def add_comment(): |
|
post_id = request.form.get('post_id') |
|
content = request.form.get('content') |
|
parent_id = request.form.get('parent_id') |
|
|
|
if not content or not post_id: |
|
return jsonify({'success': False, 'message': 'Content and post ID are required.'}) |
|
|
|
try: |
|
|
|
comment = Comment( |
|
content=content, |
|
user_id=current_user.id, |
|
post_id=post_id, |
|
parent_id=parent_id if parent_id else None, |
|
created_at=datetime.now() |
|
) |
|
db.session.add(comment) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
except Exception as e: |
|
print(f"Error adding comment: {str(e)}") |
|
return jsonify({'success': False, 'message': 'An error occurred while adding the comment.'}) |
|
|
|
|
|
def serialize_comment(comment): |
|
"""Serialize comment and its nested replies.""" |
|
return { |
|
'id': comment.id, |
|
'content': comment.content, |
|
'username': User.query.get(comment.user_id).username if comment.user_id else 'Unknown User', |
|
'created_at': format_datetime(comment.created_at), |
|
'profile_photo': User.query.get(comment.user_id).profile_photo if comment.user_id else 'uploads/default-avatar.jpg', |
|
'replies': [serialize_comment(reply) for reply in comment.replies.order_by(Comment.created_at.asc())] |
|
} |
|
|
|
@app.route('/get_comments/<int:post_id>') |
|
@login_required |
|
def get_comments(post_id): |
|
comments = Comment.query.filter_by(post_id=post_id, parent_id=None).order_by(Comment.created_at.desc()).all() |
|
serialized_comments = [serialize_comment(comment) for comment in comments] |
|
return jsonify(serialized_comments) |
|
|
|
|
|
@app.route('/create_post', methods=['GET', 'POST']) |
|
@login_required |
|
def create_post(): |
|
if request.method == 'GET': |
|
return render_template('create_post.html') |
|
|
|
content = request.form.get('content') |
|
media = request.files.get('media') |
|
background_color = request.form.get('background_color') |
|
|
|
|
|
image_url = None |
|
video_url = None |
|
if media and media.filename: |
|
filename = f"{int(time.time())}_{secure_filename(media.filename)}" |
|
if media.filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): |
|
image_url = 'uploads/' + filename |
|
media.save(os.path.join('static/uploads', filename)) |
|
elif media.filename.lower().endswith(('.mp4', '.mov', '.avi')): |
|
video_url = 'uploads/' + filename |
|
media.save(os.path.join('static/uploads', filename)) |
|
|
|
|
|
post = Post( |
|
content=content, |
|
image_url=image_url, |
|
video_url=video_url, |
|
background_color=background_color, |
|
user_id=current_user.id, |
|
user_email=current_user.email, |
|
user_profession=current_user.profession, |
|
created_at=datetime.now() |
|
) |
|
db.session.add(post) |
|
db.session.commit() |
|
|
|
|
|
return """ |
|
<script> |
|
window.parent.postMessage('success', '*'); |
|
window.parent.location.reload(); |
|
</script> |
|
""" |
|
|
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return User.query.get(int(user_id)) |
|
|
|
active_users = set() |
|
|
|
print(active_users) |
|
|
|
@app.route('/messages/<user_email>') |
|
@login_required |
|
def messages(user_email): |
|
user = User.query.filter_by(email=user_email).first() |
|
if not user: |
|
abort(404) |
|
|
|
|
|
active_users.add(current_user.id) |
|
active_users.add(user.id) |
|
|
|
|
|
Message.query.filter_by(receiver_id=current_user.id, sender_id=user.id, watched_by_receiver=False).update({'watched_by_receiver': True}) |
|
db.session.commit() |
|
|
|
return render_template('messages.html', user=user, current_user=current_user) |
|
|
|
|
|
@app.route('/get_messages/<int:user_id>', methods=['GET']) |
|
@login_required |
|
def get_messages(user_id): |
|
|
|
Message.query.filter_by(receiver_id=current_user.id, sender_id=user_id, watched_by_receiver=False).update({'watched_by_receiver': True}) |
|
db.session.commit() |
|
|
|
|
|
Message.query.filter_by(sender_id=current_user.id, receiver_id=user_id, watched_by_sender=False).update({'watched_by_sender': True}) |
|
db.session.commit() |
|
|
|
|
|
messages = Message.query.filter( |
|
(Message.sender_id == current_user.id) & (Message.receiver_id == user_id) | |
|
(Message.sender_id == user_id) & (Message.receiver_id == current_user.id) |
|
).order_by(Message.created_at.asc()).all() |
|
|
|
messages_data = [ |
|
{ |
|
'id': message.id, |
|
'sender_id': message.sender_id, |
|
'receiver_id': message.receiver_id, |
|
'content': message.content, |
|
'file_url': message.file_url, |
|
'file_type': message.file_type, |
|
'created_at': message.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
'watched_by_receiver': message.watched_by_receiver, |
|
'watched_by_sender': message.watched_by_sender |
|
} for message in messages |
|
] |
|
return jsonify(messages_data) |
|
|
|
|
|
|
|
|
|
|
|
app.config['UPLOAD_FOLDER'] = 'static/uploads' |
|
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi'} |
|
|
|
|
|
def allowed_file(filename): |
|
return '.' in filename and \ |
|
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
|
|
|
|
|
|
|
def upload_file(file): |
|
if file and allowed_file(file.filename): |
|
|
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(filepath) |
|
|
|
return filepath.replace("\\", "/") |
|
return None |
|
|
|
|
|
@app.route('/send_message', methods=['POST']) |
|
@login_required |
|
def send_message(): |
|
data = request.form |
|
receiver_id = data.get('receiver_id') |
|
content = data.get('content') |
|
file = request.files.get('file') |
|
|
|
if not receiver_id or (not content and not file): |
|
return jsonify({'success': False, 'error': 'Missing receiver_id or content/file'}), 400 |
|
|
|
try: |
|
|
|
new_message = Message( |
|
sender_id=current_user.id, |
|
receiver_id=receiver_id, |
|
content=content, |
|
file_url=upload_file(file) if file else None, |
|
file_type=file.content_type if file else None, |
|
watched_by_receiver=False, |
|
watched_by_sender=True |
|
) |
|
db.session.add(new_message) |
|
db.session.commit() |
|
|
|
|
|
return jsonify({ |
|
'success': True, |
|
'message_id': new_message.id |
|
}) |
|
except Exception as e: |
|
db.session.rollback() |
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
@app.route('/stream_messages/<int:user_id>') |
|
@login_required |
|
def stream_messages(user_id): |
|
def event_stream(): |
|
last_message_id = None |
|
|
|
while True: |
|
try: |
|
|
|
query = Message.query.filter( |
|
((Message.sender_id == current_user.id) & (Message.receiver_id == user_id)) | |
|
((Message.sender_id == user_id) & (Message.receiver_id == current_user.id)) |
|
).order_by(Message.created_at.desc()) |
|
|
|
if last_message_id: |
|
query = query.filter(Message.id > last_message_id) |
|
|
|
messages = query.limit(1).all() |
|
|
|
if messages: |
|
message = messages[0] |
|
last_message_id = message.id |
|
|
|
|
|
message_data = { |
|
'message_id': message.id, |
|
'sender_id': message.sender_id, |
|
'receiver_id': message.receiver_id, |
|
'content': message.content, |
|
'file_url': message.file_url, |
|
'file_type': message.file_type, |
|
'created_at': message.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
'watched_by_receiver': message.watched_by_receiver, |
|
'watched_by_sender': message.watched_by_sender |
|
} |
|
|
|
|
|
yield f"data: {json.dumps(message_data)}\n\n" |
|
|
|
time.sleep(1) |
|
|
|
except Exception as e: |
|
print(f"Error in SSE stream: {e}") |
|
time.sleep(5) |
|
|
|
return Response(stream_with_context(event_stream()), content_type='text/event-stream') |
|
|
|
@app.route('/is_user_active/<int:user_id>') |
|
@login_required |
|
def is_user_active(user_id): |
|
|
|
is_active = user_id in active_users |
|
print(is_active) |
|
return jsonify({ |
|
'active': is_active |
|
}) |
|
|
|
|
|
|
|
|
|
|
|
def get_file_type(content, file_url): |
|
if content: |
|
return 'text' |
|
|
|
|
|
if file_url: |
|
_, ext = os.path.splitext(file_url) |
|
if ext.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: |
|
return 'image' |
|
elif ext.lower() in ['.mp4', '.avi', '.mov', '.mkv']: |
|
return 'video' |
|
|
|
return 'text' |
|
|
|
|
|
@app.route('/get_notifications') |
|
@login_required |
|
def get_notifications(): |
|
|
|
received_messages = Message.query.filter( |
|
(Message.receiver_id == current_user.id) & |
|
(Message.sender_id != current_user.id) |
|
).order_by(Message.created_at.desc()).all() |
|
|
|
|
|
sent_messages = Message.query.filter( |
|
(Message.sender_id == current_user.id) & |
|
(Message.watched_by_sender == False) |
|
).order_by(Message.created_at.desc()).all() |
|
|
|
|
|
notifications = [] |
|
|
|
for message in received_messages: |
|
sender = User.query.get(message.sender_id) |
|
file_type = get_file_type(message.content, message.file_url) |
|
notifications.append({ |
|
'id': message.id, |
|
'sender_id': message.sender_id, |
|
'sender_name': sender.username, |
|
'sender_email': sender.email, |
|
'sender_photo': url_for('static', filename=sender.profile_photo), |
|
'content': message.content, |
|
'file_url': message.file_url, |
|
'created_at': message.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
'watched_by_receiver': message.watched_by_receiver, |
|
'is_received': True, |
|
'file_type': file_type |
|
}) |
|
|
|
for message in sent_messages: |
|
receiver = User.query.get(message.receiver_id) |
|
file_type = get_file_type(message.content, message.file_url) |
|
notifications.append({ |
|
'id': message.id, |
|
'receiver_id': message.receiver_id, |
|
'receiver_name': receiver.username, |
|
'receiver_email': receiver.email, |
|
'receiver_photo': url_for('static', filename=receiver.profile_photo), |
|
'content': message.content, |
|
'file_url': message.file_url, |
|
'created_at': message.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
'watched_by_sender': message.watched_by_sender, |
|
'is_received': False, |
|
'file_type': file_type |
|
}) |
|
|
|
return jsonify(notifications) |
|
|
|
|
|
@app.route('/mark_all_as_read', methods=['POST']) |
|
@login_required |
|
def mark_all_as_read(): |
|
|
|
Message.query.filter_by(receiver_id=current_user.id, watched_by_receiver=False).update({'watched_by_receiver': False}) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
|
|
|
|
@app.route('/stream_unread_count') |
|
@login_required |
|
def stream_unread_count(): |
|
def event_stream(): |
|
last_count = None |
|
|
|
while True: |
|
|
|
unread_count = Message.query.filter( |
|
(Message.receiver_id == current_user.id) & |
|
(Message.watched_by_receiver == False) |
|
).count() |
|
|
|
|
|
if unread_count != last_count: |
|
last_count = unread_count |
|
yield f"data: {json.dumps({'unread_count': unread_count})}\n\n" |
|
|
|
time.sleep(1) |
|
|
|
return Response(stream_with_context(event_stream()), content_type='text/event-stream') |
|
|
|
@app.route('/mark_message_as_seen/<int:message_id>', methods=['POST']) |
|
@login_required |
|
def mark_message_as_seen(message_id): |
|
|
|
message = Message.query.get(message_id) |
|
if message and message.receiver_id == current_user.id: |
|
message.watched_by_receiver = True |
|
message.last_updated = datetime.utcnow() |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
return jsonify({'success': False}), 404 |
|
|
|
|
|
|
|
@app.route('/get_unread_count') |
|
@login_required |
|
def get_unread_count(): |
|
|
|
unread_receiver_count = Message.query.filter( |
|
(Message.receiver_id == current_user.id) & |
|
(Message.watched_by_receiver == False) |
|
).count() |
|
|
|
|
|
unread_sender_count = Message.query.filter( |
|
(Message.sender_id == current_user.id) & |
|
(Message.watched_by_sender == False) |
|
).count() |
|
|
|
|
|
total_unread_count = unread_receiver_count + unread_sender_count |
|
|
|
return jsonify({'unread_count': total_unread_count}) |
|
|
|
|
|
|
|
@app.route('/stream_message_updates/<int:user_id>') |
|
@login_required |
|
def stream_message_updates(user_id): |
|
def event_stream(): |
|
last_update_time = datetime.utcnow() |
|
|
|
while True: |
|
|
|
messages = Message.query.filter( |
|
( |
|
(Message.sender_id == current_user.id) | |
|
(Message.receiver_id == current_user.id) |
|
) & |
|
(Message.last_updated > last_update_time) |
|
).order_by(Message.last_updated.asc()).all() |
|
|
|
if messages: |
|
last_update_time = messages[-1].last_updated |
|
|
|
for message in messages: |
|
|
|
message_data = { |
|
'message_id': message.id, |
|
'sender_id': message.sender_id, |
|
'receiver_id': message.receiver_id, |
|
'watched_by_receiver': message.watched_by_receiver, |
|
'watched_by_sender': message.watched_by_sender, |
|
'last_updated': message.last_updated.strftime('%Y-%m-%d %H:%M:%S') |
|
} |
|
yield f"data: {json.dumps(message_data)}\n\n" |
|
|
|
time.sleep(0.5) |
|
|
|
return Response(stream_with_context(event_stream()), content_type='text/event-stream') |
|
|
|
@app.route('/leave_chat', methods=['POST']) |
|
@login_required |
|
def leave_chat(): |
|
|
|
print(f"Attempting to leave chat for user {current_user.id}") |
|
|
|
|
|
active_users.discard(current_user.id) |
|
|
|
|
|
print(f"User {current_user.id} left the chat. Active users: {active_users}") |
|
|
|
return jsonify({'success': True}) |
|
|
|
|
|
@app.route('/join_chat', methods=['POST']) |
|
@login_required |
|
def join_chat(): |
|
|
|
active_users.add(current_user.id) |
|
print(active_users) |
|
return jsonify({'success': True}) |
|
|
|
|
|
|
|
@app.route('/mark_all_as_seen', methods=['POST']) |
|
@login_required |
|
def mark_all_as_seen(): |
|
receiver_id = request.form.get('receiver_id') |
|
|
|
|
|
Message.query.filter( |
|
(Message.sender_id == current_user.id) & |
|
(Message.receiver_id == receiver_id) & |
|
(Message.watched_by_receiver == False) |
|
).update({'watched_by_receiver': True}) |
|
|
|
|
|
Message.query.filter( |
|
(Message.sender_id == receiver_id) & |
|
(Message.receiver_id == current_user.id) & |
|
(Message.watched_by_sender == False) |
|
).update({'watched_by_sender': True}) |
|
|
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
|
|
|
|
|
|
@app.route('/getchat') |
|
@login_required |
|
def get_chat(): |
|
|
|
subquery = db.session.query( |
|
db.case( |
|
(Message.sender_id > Message.receiver_id, Message.sender_id), |
|
else_=Message.receiver_id |
|
).label('user1'), |
|
db.case( |
|
(Message.sender_id > Message.receiver_id, Message.receiver_id), |
|
else_=Message.sender_id |
|
).label('user2'), |
|
db.func.max(Message.id).label('last_message_id') |
|
).filter( |
|
(Message.receiver_id == current_user.id) | |
|
(Message.sender_id == current_user.id) |
|
).group_by('user1', 'user2').subquery() |
|
|
|
|
|
latest_messages = Message.query.join( |
|
subquery, Message.id == subquery.c.last_message_id |
|
).order_by(Message.created_at.desc()).all() |
|
|
|
|
|
chats = [] |
|
for message in latest_messages: |
|
|
|
if message.sender_id == current_user.id: |
|
other_user = User.query.get(message.receiver_id) |
|
else: |
|
other_user = User.query.get(message.sender_id) |
|
|
|
|
|
unread_count = Message.query.filter( |
|
(Message.receiver_id == current_user.id) & |
|
(Message.sender_id == other_user.id) & |
|
(Message.watched_by_receiver == False) |
|
).count() |
|
|
|
|
|
last_message = message.content |
|
message_type = None |
|
|
|
|
|
if last_message: |
|
|
|
if not message.file_url or not message.file_url.lower().endswith(('jpg', 'jpeg', 'png')): |
|
message_type = 'text' |
|
elif message.file_url: |
|
|
|
if message.file_url.lower().endswith(('mp4', 'avi', 'mov')): |
|
message_type = 'video' |
|
|
|
elif message.file_url.lower().endswith(('jpg', 'jpeg', 'png')): |
|
message_type = 'image' |
|
|
|
|
|
chats.append({ |
|
'id': message.id, |
|
'user_id': other_user.id, |
|
'user_name': other_user.username, |
|
'user_email': other_user.email, |
|
'user_photo': url_for('static', filename=other_user.profile_photo), |
|
'last_message': last_message, |
|
'message_type': message_type, |
|
'unread_count': unread_count, |
|
'last_updated': message.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
'is_sender': message.sender_id == current_user.id |
|
}) |
|
|
|
return jsonify(chats) |
|
|
|
@app.route('/search_users') |
|
def search_users(): |
|
query = request.args.get('q', '') |
|
if query: |
|
|
|
users = User.query.filter(User.username.like(f'%{query}%')).all() |
|
|
|
results = [{ |
|
'username': user.username, |
|
'profile_photo': user.profile_photo |
|
} for user in users] |
|
return jsonify(results) |
|
return jsonify([]) |
|
|
|
from flask_login import current_user |
|
|
|
@app.route('/artical', methods=['GET']) |
|
def artical_page(): |
|
if not current_user.is_authenticated: |
|
return redirect(url_for('login')) |
|
|
|
|
|
articles = Article.query.filter_by(profession_user=current_user.profession).order_by(Article.created_at.desc()).all() |
|
|
|
return render_template('artical.html', articles=articles) |
|
|
|
@app.route('/create', methods=['POST']) |
|
def create_article(): |
|
data = request.get_json() |
|
title = data['title'] |
|
|
|
|
|
if current_user.is_authenticated: |
|
user = current_user.username |
|
id_user = current_user.id |
|
photo_user = current_user.profile_photo |
|
profession_user = current_user.profession |
|
else: |
|
return jsonify({"error": "يجب تسجيل الدخول لإنشاء مقال"}), 401 |
|
|
|
|
|
client = Client() |
|
stream = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[{"role": "user", "content": f"Write an article about {title}"}], |
|
stream=True, |
|
web_search=False |
|
) |
|
|
|
|
|
content_markdown = "" |
|
for chunk in stream: |
|
if chunk.choices[0].delta.content: |
|
content_markdown += chunk.choices[0].delta.content |
|
|
|
|
|
extensions = [ |
|
'fenced_code', |
|
'tables', |
|
'toc', |
|
'nl2br', |
|
] |
|
content_html = markdown.markdown(content_markdown, extensions=extensions) |
|
|
|
|
|
|
|
content_html = re.sub(r'```(\w+)', r'<pre><code class="\1">', content_html) |
|
content_html = content_html.replace('```', '</code></pre>') |
|
|
|
|
|
article = Article(title=title, content=content_html, user=user, id_user=id_user, photo_user=photo_user , profession_user = profession_user) |
|
db.session.add(article) |
|
db.session.commit() |
|
|
|
|
|
return jsonify({ |
|
"title": article.title, |
|
"content": article.content, |
|
"user": article.user, |
|
"created_at": article.created_at.strftime('%Y-%m-%d %H:%M:%S'), |
|
"photo_user": f"/static/{article.photo_user}", |
|
"id_user": id_user |
|
}) |
|
|
|
@app.route('/current_user_id', methods=['GET']) |
|
def get_current_user_id(): |
|
if current_user.is_authenticated: |
|
print(f"ID المستخدم الحالي: {current_user.id}") |
|
return jsonify({'id': current_user.id}), 200 |
|
else: |
|
print("المستخدم غير مسجل الدخول") |
|
return jsonify({'error': 'User not authenticated'}), 401 |
|
|
|
|
|
|
|
|
|
|
|
def calculate_frame_contrast(frame): |
|
""" |
|
حساب التباين في الإطار باستخدام Pillow |
|
""" |
|
image = Image.fromarray(frame) |
|
gray_image = image.convert('L') |
|
img_array = np.array(gray_image) |
|
contrast = np.std(img_array) |
|
return contrast |
|
|
|
def get_video_duration(video_path): |
|
""" |
|
الحصول على مدة الفيديو بالثواني |
|
""" |
|
return len(list(iio.imiter(video_path))) |
|
|
|
def generate_thumbnail(video_path): |
|
""" |
|
إنشاء صورة مصغرة من ملف فيديو باستخدام 3 إطارات عشوائية |
|
""" |
|
thumbnail_filename = os.path.splitext(os.path.basename(video_path))[0] + '_thumbnail.jpg' |
|
thumbnail_path = os.path.join(app.config['THUMBNAIL_FOLDER'], thumbnail_filename) |
|
|
|
|
|
video_length = get_video_duration(video_path) |
|
|
|
|
|
frame_positions = sorted([ |
|
int(video_length * 0.2 + random.randint(0, int(video_length * 0.2))), |
|
int(video_length * 0.5 + random.randint(0, int(video_length * 0.2))), |
|
int(video_length * 0.8 + random.randint(0, int(video_length * 0.2))) |
|
]) |
|
|
|
max_contrast = 0 |
|
best_frame = None |
|
|
|
|
|
reader = iio.imiter(video_path) |
|
|
|
|
|
for i, frame in enumerate(reader): |
|
if i in frame_positions: |
|
contrast = calculate_frame_contrast(frame) |
|
|
|
if contrast > max_contrast: |
|
max_contrast = contrast |
|
best_frame = frame |
|
|
|
if best_frame is not None: |
|
|
|
image = Image.fromarray(best_frame) |
|
|
|
|
|
target_width, target_height = 1280, 720 |
|
|
|
|
|
original_width, original_height = image.size |
|
aspect_ratio_original = original_width / original_height |
|
aspect_ratio_target = target_width / target_height |
|
|
|
|
|
if aspect_ratio_original > aspect_ratio_target: |
|
new_width = target_width |
|
new_height = int(target_width / aspect_ratio_original) |
|
else: |
|
new_height = target_height |
|
new_width = int(target_height * aspect_ratio_original) |
|
|
|
|
|
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
|
|
|
|
img_array = np.array(image) |
|
dominant_color = tuple(map(int, np.mean(img_array, axis=(0, 1)))) |
|
|
|
|
|
thumbnail = Image.new('RGB', (target_width, target_height), dominant_color) |
|
|
|
|
|
x_offset = (target_width - new_width) // 2 |
|
y_offset = (target_height - new_height) // 2 |
|
|
|
|
|
thumbnail.paste(resized_image, (x_offset, y_offset)) |
|
|
|
|
|
thumbnail.save(thumbnail_path, 'JPEG', quality=95) |
|
|
|
return thumbnail_path |
|
|
|
|
|
|
|
@app.route('/videos', methods=['GET', 'POST']) |
|
@login_required |
|
def videos(): |
|
if request.method == 'POST': |
|
|
|
title = request.form.get('title') |
|
|
|
|
|
category = current_user.profession |
|
|
|
|
|
video = request.files.get('video') |
|
thumbnail = request.files.get('thumbnail') |
|
|
|
if video: |
|
|
|
video_filename = secure_filename(video.filename) |
|
video_path = os.path.join(app.config['UPLOAD_FOLDER2'], video_filename) |
|
video.save(video_path) |
|
|
|
|
|
thumbnail_path = None |
|
if thumbnail: |
|
thumbnail_filename = secure_filename(thumbnail.filename) |
|
thumbnail_path = os.path.join(app.config['THUMBNAIL_FOLDER'], thumbnail_filename) |
|
thumbnail.save(thumbnail_path) |
|
else: |
|
|
|
thumbnail_path = generate_thumbnail(video_path) |
|
|
|
|
|
new_video = Video( |
|
video_path=video_path, |
|
thumbnail_path=thumbnail_path, |
|
uploader_name=current_user.username, |
|
user_id=current_user.id, |
|
uploader_image=current_user.profile_photo, |
|
title=title, |
|
category=category |
|
) |
|
db.session.add(new_video) |
|
db.session.commit() |
|
|
|
flash('تم رفع الفيديو بنجاح!', 'success') |
|
return redirect(url_for('videos')) |
|
|
|
|
|
return render_template('videos.html') |
|
|
|
|
|
@app.route('/get_videos', methods=['GET']) |
|
@login_required |
|
def get_videos(): |
|
current_user_id = current_user.id |
|
|
|
|
|
videos = Video.query.filter_by(category=current_user.profession).order_by(Video.upload_time.desc()).all() |
|
|
|
|
|
videos_data = [{ |
|
'id': video.id, |
|
'title': video.title, |
|
'thumbnail': video.thumbnail_path, |
|
'uploader_name': video.uploader_name, |
|
'upload_time': video.upload_time.strftime('%Y-%m-%d %H:%M:%S'), |
|
'uploader_image': url_for('static', filename=video.uploader_image), |
|
'category': video.category, |
|
'video_path': video.video_path, |
|
'is_owner': video.user_id == current_user_id |
|
} for video in videos] |
|
|
|
return jsonify(videos_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/edit_video/<int:video_id>', methods=['POST']) |
|
def edit_video(video_id): |
|
video = Video.query.get_or_404(video_id) |
|
data = request.get_json() |
|
new_title = data.get('title') |
|
|
|
if new_title: |
|
video.title = new_title |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
return jsonify({'success': False, 'message': 'العنوان غير صالح'}), 400 |
|
|
|
|
|
@app.route('/delete_video/<int:video_id>', methods=['DELETE']) |
|
@login_required |
|
def delete_video(video_id): |
|
|
|
video = Video.query.get_or_404(video_id) |
|
|
|
|
|
if video.user_id != current_user.id: |
|
return jsonify({'success': False, 'message': 'غير مصرح لك بحذف هذا الفيديو'}), 403 |
|
|
|
|
|
PlaylistVideo.query.filter_by(video_id=video_id).delete() |
|
|
|
|
|
if video.video_path: |
|
video_file_name = os.path.basename(video.video_path) |
|
video_file_path = os.path.join(app.config['UPLOAD_FOLDER2'], video_file_name) |
|
print(f"Video file path: {video_file_path}") |
|
if os.path.exists(video_file_path): |
|
os.remove(video_file_path) |
|
else: |
|
print("Video file does not exist!") |
|
|
|
|
|
if video.thumbnail_path: |
|
thumbnail_file_name = os.path.basename(video.thumbnail_path) |
|
thumbnail_file_path = os.path.join(app.config['THUMBNAIL_FOLDER'], thumbnail_file_name) |
|
print(f"Thumbnail file path: {thumbnail_file_path}") |
|
if os.path.exists(thumbnail_file_path): |
|
os.remove(thumbnail_file_path) |
|
else: |
|
print("Thumbnail file does not exist!") |
|
|
|
|
|
db.session.delete(video) |
|
db.session.commit() |
|
|
|
return jsonify({'success': True}) |
|
|
|
|
|
|
|
|
|
@app.route('/get_playlists_with_video_status/<int:video_id>', methods=['GET']) |
|
@login_required |
|
def get_playlists_with_video_status(video_id): |
|
playlists = Playlist.query.filter_by(user_id=current_user.id).all() |
|
playlists_data = [] |
|
for playlist in playlists: |
|
video_exists = PlaylistVideo.query.filter_by(playlist_id=playlist.id, video_id=video_id).first() is not None |
|
playlists_data.append({ |
|
'id': playlist.id, |
|
'name': playlist.name, |
|
'is_public': playlist.is_public, |
|
'video_exists': video_exists, |
|
}) |
|
return jsonify(playlists_data) |
|
|
|
|
|
@app.route('/remove_video_from_playlist/<int:playlist_id>/<int:video_id>', methods=['DELETE']) |
|
@login_required |
|
def remove_video_from_playlist(playlist_id, video_id): |
|
playlist_video = PlaylistVideo.query.filter_by(playlist_id=playlist_id, video_id=video_id).first() |
|
if playlist_video: |
|
db.session.delete(playlist_video) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
return jsonify({'success': False, 'message': 'الفيديو غير موجود في القائمة'}), 404 |
|
|
|
|
|
@app.route('/create_playlist', methods=['POST']) |
|
@login_required |
|
def create_playlist(): |
|
data = request.get_json() |
|
new_playlist = Playlist( |
|
name=data['name'], |
|
is_public=data['is_public'], |
|
user_id=current_user.id |
|
) |
|
db.session.add(new_playlist) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
|
|
@app.route('/add_video_to_playlist/<int:playlist_id>/<int:video_id>', methods=['POST']) |
|
@login_required |
|
def add_video_to_playlist(playlist_id, video_id): |
|
|
|
video = Video.query.get_or_404(video_id) |
|
playlist = Playlist.query.get_or_404(playlist_id) |
|
|
|
|
|
if playlist.user_id != current_user.id: |
|
return jsonify({'success': False, 'message': 'غير مصرح لك بإضافة الفيديو إلى هذه القائمة'}), 403 |
|
|
|
|
|
existing_playlist_video = PlaylistVideo.query.filter_by(playlist_id=playlist_id, video_id=video_id).first() |
|
if existing_playlist_video: |
|
return jsonify({'success': False, 'message': 'الفيديو موجود بالفعل في القائمة'}), 400 |
|
|
|
|
|
playlist_video = PlaylistVideo(playlist_id=playlist_id, video_id=video_id) |
|
db.session.add(playlist_video) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
|
|
|
|
|
|
@app.route('/delete_playlist/<int:playlist_id>', methods=['DELETE']) |
|
@login_required |
|
def delete_playlist(playlist_id): |
|
playlist = Playlist.query.get_or_404(playlist_id) |
|
|
|
|
|
if playlist.user_id != current_user.id: |
|
return jsonify({'success': False, 'message': 'غير مصرح لك بحذف هذه القائمة'}), 403 |
|
|
|
|
|
db.session.delete(playlist) |
|
db.session.commit() |
|
return jsonify({'success': True}) |
|
|
|
|
|
|
|
|
|
def get_current_user_id(): |
|
|
|
if current_user.is_authenticated: |
|
return current_user.id |
|
else: |
|
return None |
|
|
|
@app.route('/get_playlists') |
|
def get_playlists(): |
|
current_user_id = get_current_user_id() |
|
|
|
if current_user_id is None: |
|
return jsonify({"error": "User not authenticated"}), 401 |
|
|
|
|
|
playlists = Playlist.query.filter( |
|
Playlist.is_public == True, |
|
Playlist.user_id != current_user_id |
|
).all() |
|
|
|
|
|
playlists_data = [{'id': playlist.id, 'name': playlist.name} for playlist in playlists] |
|
|
|
return jsonify(playlists_data) |
|
|
|
|
|
@app.route('/get_user_playlists') |
|
def get_user_playlists(): |
|
|
|
current_user_id = get_current_user_id() |
|
|
|
if current_user_id is None: |
|
return jsonify({"error": "User not authenticated"}), 401 |
|
|
|
|
|
playlists = Playlist.query.filter( |
|
Playlist.user_id == current_user_id |
|
).all() |
|
|
|
|
|
playlists_data = [{'id': playlist.id, 'name': playlist.name} for playlist in playlists] |
|
|
|
return jsonify(playlists_data) |
|
|
|
|
|
@app.route('/get_playlist_videos/<int:playlist_id>') |
|
@login_required |
|
def get_playlist_videos(playlist_id): |
|
|
|
playlist = Playlist.query.get_or_404(playlist_id) |
|
|
|
|
|
videos = db.session.query(Video).join(PlaylistVideo).filter(PlaylistVideo.playlist_id == playlist_id).all() |
|
|
|
|
|
is_owner = playlist.user_id == current_user.id |
|
|
|
|
|
videos_data = [{ |
|
'id': video.id, |
|
'title': video.title, |
|
'thumbnail_path': video.thumbnail_path, |
|
'uploader_name': video.uploader_name, |
|
'uploader_image': video.uploader_image, |
|
'upload_time': video.upload_time.strftime('%Y-%m-%d %H:%M:%S'), |
|
'is_owner': is_owner |
|
} for video in videos] |
|
|
|
return jsonify(videos_data) |
|
|
|
|
|
@app.route('/get_video/<int:video_id>') |
|
def get_video(video_id): |
|
|
|
video = Video.query.get_or_404(video_id) |
|
|
|
|
|
video_data = { |
|
'id': video.id, |
|
'title': video.title, |
|
'video_path': video.video_path, |
|
'uploader_name': video.uploader_name, |
|
'uploader_image': video.uploader_image, |
|
'upload_time': video.upload_time.strftime('%Y-%m-%d %H:%M:%S') |
|
} |
|
|
|
return jsonify(video_data) |
|
|
|
|
|
|
|
|
|
@app.route('/delete_video_from_playlist', methods=['POST']) |
|
@login_required |
|
def delete_video_from_playlist(): |
|
video_id = request.json.get('video_id') |
|
playlist_id = request.json.get('playlist_id') |
|
|
|
|
|
playlist_video = PlaylistVideo.query.filter_by(video_id=video_id, playlist_id=playlist_id).first() |
|
|
|
if not playlist_video: |
|
return jsonify({"message": "الفيديو ليس موجودًا في قائمة التشغيل"}), 400 |
|
|
|
|
|
playlist = Playlist.query.get(playlist_id) |
|
if playlist.user_id != current_user.id: |
|
return jsonify({"message": "ليس لديك صلاحية لحذف الفيديو من هذه القائمة"}), 403 |
|
|
|
|
|
db.session.delete(playlist_video) |
|
db.session.commit() |
|
|
|
return jsonify({"message": "تم حذف الفيديو من قائمة التشغيل بنجاح"}), 200 |
|
|
|
|
|
if __name__ == '__main__': |
|
with app.app_context(): |
|
db.create_all() |
|
Session = scoped_session(sessionmaker(bind=db.engine)) |
|
app.run(debug=True, host="0.0.0.0") |
|
|
|
|