Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, Depends, HTTPException, Request, Form, Query, status | |
from fastapi.responses import HTMLResponse, RedirectResponse | |
from sqlalchemy.orm import Session | |
from models import User, Device, SystemSetting, StatusRecord | |
from database import get_db | |
from datetime import datetime | |
from typing import Optional | |
from fastapi.templating import Jinja2Templates | |
admin_router = APIRouter(prefix="/admin", tags=["admin"]) | |
templates = Jinja2Templates(directory="templates") | |
def login_required(request: Request, db: Session = Depends(get_db)): | |
username = request.cookies.get("username") | |
if not username: | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
user = db.query(User).filter(User.username == username).first() | |
if not user: | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
return user | |
async def admin_page(request: Request, db: Session = Depends(get_db)): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
users = db.query(User).all() | |
devices = db.query(Device).all() | |
system_setting = db.query(SystemSetting).first() | |
return templates.TemplateResponse( | |
"admin.html", { | |
"request": request, | |
"users": users, | |
"devices": devices, | |
"system_setting": system_setting, | |
"current_user": current_user | |
} | |
) | |
async def edit_system_setting( | |
request: Request, | |
check_connect_period: int = Form(...), | |
data_sync_period: int = Form(...), | |
get_config_period: int = Form(...), | |
point_distance: int = Form(...), | |
db: Session = Depends(get_db), | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
system_setting = db.query(SystemSetting).first() | |
if not system_setting: | |
system_setting = SystemSetting() | |
db.add(system_setting) | |
system_setting.check_connect_period = check_connect_period | |
system_setting.data_sync_period = data_sync_period | |
system_setting.get_config_period = get_config_period | |
system_setting.point_distance = point_distance | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
async def admin_data( | |
request: Request, | |
start_date: Optional[str] = Query(None), | |
end_date: Optional[str] = Query(None), | |
device_id: Optional[str] = Query(None), | |
page: int = Query(1, ge=1), | |
per_page: int = Query(10, ge=10, le=100), | |
db: Session = Depends(get_db) | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") | |
query = db.query(StatusRecord) | |
if start_date: | |
query = query.filter(StatusRecord.timestamp >= datetime.strptime(start_date, "%Y-%m-%d")) | |
if end_date: | |
query = query.filter(StatusRecord.timestamp <= datetime.strptime(end_date, "%Y-%m-%d")) | |
if device_id: | |
query = query.filter(StatusRecord.device_id == device_id) | |
total_records = query.count() | |
total_pages = (total_records + per_page - 1) // per_page | |
records = query.order_by(StatusRecord.timestamp.desc()).offset((page - 1) * per_page).limit(per_page).all() | |
devices = db.query(Device.device_id).distinct().all() | |
device_ids = [device.device_id for device in devices] | |
# Tính toán phạm vi trang để hiển thị | |
page_range = 5 | |
start_page = max(1, page - page_range // 2) | |
end_page = min(total_pages, start_page + page_range - 1) | |
start_page = max(1, end_page - page_range + 1) | |
return templates.TemplateResponse( | |
"admin_data.html", | |
{ | |
"request": request, | |
"records": records, | |
"current_page": page, | |
"total_pages": total_pages, | |
"start_page": start_page, | |
"end_page": end_page, | |
"start_date": start_date, | |
"end_date": end_date, | |
"device_id": device_id, | |
"device_ids": device_ids, | |
"current_user": current_user, | |
"per_page": per_page, | |
} | |
) | |
async def delete_user(request: Request, username: str, db: Session = Depends(get_db)): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
user = db.query(User).filter(User.username == username).first() | |
if user: | |
db.delete(user) | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
async def edit_user( | |
request: Request, | |
username: str, | |
new_username: str = Form(...), | |
email: str = Form(...), | |
is_admin: bool = Form(False), | |
is_active: bool = Form(False), | |
old_password: str = Form(None), | |
new_password: str = Form(None), | |
db: Session = Depends(get_db), | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
user = db.query(User).filter(User.username == username).first() | |
if user: | |
if ( | |
new_username != username | |
and db.query(User).filter(User.username == new_username).first() | |
): | |
raise HTTPException(status_code=400, detail="Username already exists") | |
user.username = new_username | |
user.email = email | |
user.is_admin = is_admin | |
user.is_active = is_active | |
if old_password and new_password: | |
if user.password != old_password: | |
raise HTTPException(status_code=400, detail="Incorrect old password") | |
user.password = new_password | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
async def add_device( | |
request: Request, | |
name: str = Form(...), | |
description: str = Form(...), | |
device_id: str = Form(...), | |
password: str = Form(...), | |
db: Session = Depends(get_db), | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
existing_device = db.query(Device).filter(Device.device_id == device_id).first() | |
if existing_device: | |
raise HTTPException(status_code=400, detail="Device ID already exists") | |
new_device = Device( | |
name=name, description=description, device_id=device_id, password=password | |
) | |
db.add(new_device) | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
async def edit_device( | |
request: Request, | |
device_id: str, | |
name: str = Form(...), | |
description: str = Form(...), | |
new_device_id: str = Form(...), | |
password: str = Form(...), | |
db: Session = Depends(get_db), | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
device = db.query(Device).filter(Device.device_id == device_id).first() | |
if not device: | |
raise HTTPException(status_code=404, detail="Device not found") | |
if ( | |
new_device_id != device_id | |
and db.query(Device).filter(Device.device_id == new_device_id).first() | |
): | |
raise HTTPException(status_code=400, detail="New Device ID already exists") | |
device.name = name | |
device.description = description | |
device.device_id = new_device_id | |
device.password = password | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
async def delete_device( | |
request: Request, device_id: str, db: Session = Depends(get_db) | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
if not current_user.is_admin: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
) | |
device = db.query(Device).filter(Device.device_id == device_id).first() | |
if device: | |
db.delete(device) | |
db.commit() | |
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |