from fastapi import FastAPI, HTTPException, Request, Form, Depends, status, APIRouter from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse from fastapi.templating import Jinja2Templates from fastapi.security import HTTPBasic from datetime import datetime import folium from folium.plugins import MarkerCluster from typing import Optional from sqlalchemy.orm import Session from models import User, StatusRecord, SystemSetting, Device import io import csv from admin import admin_router from api import api_router from database import get_db, SessionLocal # Create default admin user and system settings def create_default_data(): db = SessionLocal() try: # Create default admin user if not exists if not db.query(User).filter(User.username == "admin").first(): admin_user = User( username="admin", email="admin@email.com", password="admin", is_admin=True, is_active=True, ) db.add(admin_user) # Create default system settings if not exists if not db.query(SystemSetting).first(): default_settings = SystemSetting() db.add(default_settings) db.commit() except Exception as e: db.rollback() print(f"Error creating default data: {str(e)}") finally: db.close() create_default_data() app = FastAPI() templates = Jinja2Templates(directory="templates") security = HTTPBasic() def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: username = request.cookies.get("username") if username: return db.query(User).filter(User.username == username).first() return None def login_required(request: Request, db: Session = Depends(get_db)): username = request.cookies.get("username") if not username: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) user = db.query(User).filter(User.username == username).first() if not user: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) return user # Device authentication function def authenticate_device( device_id: str, device_password: str, db: Session = Depends(get_db) ): device = db.query(Device).filter(Device.device_id == device_id).first() if not device or device.password != device_password: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid device credentials", ) return device @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login( request: Request, username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), ): user = ( db.query(User) .filter( User.username == username, User.password == password, User.is_active == True ) .first() ) if user: user.last_login = datetime.now() db.commit() response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) response.set_cookie(key="username", value=username, httponly=True) return response return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid credentials or inactive account"}, ) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) response.delete_cookie("username") return response @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): return templates.TemplateResponse("register.html", {"request": request}) @app.post("/register") async def register( username: str = Form(...), email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), ): existing_user = db.query(User).filter(User.username == username).first() if existing_user: return RedirectResponse( url="/register?error=1", status_code=status.HTTP_302_FOUND ) new_user = User(username=username, email=email, password=password) db.add(new_user) db.commit() return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) @app.get("/", response_class=HTMLResponse) def show_map( request: Request, start_date: str = None, end_date: str = None, db: Session = Depends(get_db), ): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user query = db.query(StatusRecord) if start_date and end_date: start_datetime = datetime.strptime(start_date, "%Y-%m-%d") end_datetime = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter( StatusRecord.timestamp.between(start_datetime, end_datetime) ) status_data = [ (s.latitude, s.longitude, s.connect_status, s.device_id) for s in query.all() ] m = folium.Map(location=[35.6837, 139.6805], zoom_start=12) marker_cluster = MarkerCluster().add_to(m) for lat, lon, connect_status, device_id in status_data: color = "green" if connect_status == 1 else "red" folium.CircleMarker( location=[lat, lon], radius=10, popup=f"{device_id}", color=color, fill=True, fill_opacity=0.6, ).add_to(marker_cluster) map_html = m._repr_html_() return templates.TemplateResponse( "map.html", { "request": request, "map_html": map_html, "start_date": start_date, "end_date": end_date, "current_user": current_user, }, ) @app.get("/download-csv") async def download_csv(request: Request, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user status_records = db.query(StatusRecord).all() output = io.StringIO() writer = csv.writer(output) writer.writerow( ["UUID", "Device ID", "Latitude", "Longitude", "Timestamp", "Connect Status"] ) for record in status_records: writer.writerow( [ record.uuid, record.device_id, record.latitude, record.longitude, record.timestamp, record.connect_status, ] ) response = StreamingResponse(iter([output.getvalue()]), media_type="text/csv") response.headers["Content-Disposition"] = "attachment; filename=status_records.csv" return response @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): if exc.status_code == status.HTTP_401_UNAUTHORIZED: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) return templates.TemplateResponse( "error.html", {"request": request, "detail": exc.detail}, status_code=exc.status_code, ) # Include the routers app.include_router(admin_router) app.include_router(api_router) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)