from fastapi import FastAPI, HTTPException, Request, Form, Depends, status, APIRouter, Header from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse, StreamingResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic from datetime import datetime, timedelta import random import folium import uuid as uuid_module from folium.plugins import MarkerCluster from typing import Optional from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from models import Base, User, StatusRecord, SystemSetting, Device import io import csv from typing import Dict # Database setup SQLALCHEMY_DATABASE_URL = "sqlite:///./database.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base.metadata.create_all(bind=engine) # Create default admin user and system settings def create_default_data(): db = SessionLocal() try: # Create default admin user if not exists if not db.query(User).filter(User.username == "admin").first(): admin_user = User( username="admin", email="admin@email.com", password="admin", is_admin=True, is_active=True ) db.add(admin_user) # Create default system settings if not exists if not db.query(SystemSetting).first(): default_settings = SystemSetting() db.add(default_settings) db.commit() except Exception as e: db.rollback() print(f"Error creating default data: {str(e)}") finally: db.close() create_default_data() app = FastAPI() templates = Jinja2Templates(directory="templates") security = HTTPBasic() # Create APIRouters for grouping admin_router = APIRouter(prefix="/admin", tags=["admin"]) api_router = APIRouter(prefix="/api", tags=["api"]) # Dependency to get the database session def get_db(): db = SessionLocal() try: yield db finally: db.close() def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: username = request.cookies.get("username") if username: return db.query(User).filter(User.username == username).first() return None def login_required(request: Request, db: Session = Depends(get_db)): username = request.cookies.get("username") if not username: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) user = db.query(User).filter(User.username == username).first() if not user: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) return user # Device authentication function def authenticate_device(device_id: str, device_password: str, db: Session = Depends(get_db)): device = db.query(Device).filter(Device.device_id == device_id).first() if not device or device.password != device_password: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid device credentials") return device @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login(request: Request, username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): user = db.query(User).filter(User.username == username, User.password == password, User.is_active == True).first() if user: user.last_login = datetime.now() db.commit() response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) response.set_cookie(key="username", value=username, httponly=True) return response return templates.TemplateResponse("login.html", {"request": request, "error": "Invalid credentials or inactive account"}) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) response.delete_cookie("username") return response @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): return templates.TemplateResponse("register.html", {"request": request}) @app.post("/register") async def register(username: str = Form(...), email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): existing_user = db.query(User).filter(User.username == username).first() if existing_user: return RedirectResponse(url="/register?error=1", status_code=status.HTTP_302_FOUND) new_user = User(username=username, email=email, password=password) db.add(new_user) db.commit() return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) # Admin routes @admin_router.get("", response_class=HTMLResponse) async def admin_page(request: Request, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") users = db.query(User).all() devices = db.query(Device).all() return templates.TemplateResponse("admin.html", {"request": request, "users": users, "devices": devices}) @admin_router.post("/delete/{username}") async def delete_user(request: Request, username: str, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") user = db.query(User).filter(User.username == username).first() if user: db.delete(user) db.commit() return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) @admin_router.post("/edit/{username}") async def edit_user(request: Request, username: str, new_username: str = Form(...), email: str = Form(...), is_admin: bool = Form(False), is_active: bool = Form(False), db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") user = db.query(User).filter(User.username == username).first() if user: if new_username != username and db.query(User).filter(User.username == new_username).first(): raise HTTPException(status_code=400, detail="Username already exists") user.username = new_username user.email = email user.is_admin = is_admin user.is_active = is_active db.commit() return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) @admin_router.post("/add_device") async def add_device( request: Request, name: str = Form(...), description: str = Form(...), device_id: str = Form(...), password: str = Form(...), db: Session = Depends(get_db) ): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") existing_device = db.query(Device).filter(Device.device_id == device_id).first() if existing_device: raise HTTPException(status_code=400, detail="Device ID already exists") new_device = Device(name=name, description=description, device_id=device_id, password=password) db.add(new_device) db.commit() return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) @admin_router.post("/edit_device/{device_id}") async def edit_device( request: Request, device_id: str, name: str = Form(...), description: str = Form(...), new_device_id: str = Form(...), password: str = Form(...), db: Session = Depends(get_db) ): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") device = db.query(Device).filter(Device.device_id == device_id).first() if not device: raise HTTPException(status_code=404, detail="Device not found") if new_device_id != device_id and db.query(Device).filter(Device.device_id == new_device_id).first(): raise HTTPException(status_code=400, detail="New Device ID already exists") device.name = name device.description = description device.device_id = new_device_id device.password = password db.commit() return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) @admin_router.post("/delete_device/{device_id}") async def delete_device(request: Request, device_id: str, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") device = db.query(Device).filter(Device.device_id == device_id).first() if device: db.delete(device) db.commit() return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) # API routes @api_router.post("/generate-data") def generate_data( device_id: str = Header(...), device_password: str = Header(...), db: Session = Depends(get_db) ): authenticate_device(device_id, device_password, db) base_latitude = 35.6837 base_longitude = 139.6805 start_date = datetime(2024, 8, 1) end_date = datetime(2024, 8, 7) delta = end_date - start_date for _ in range(100): random_days = random.randint(0, delta.days) random_seconds = random.randint(0, 86400) random_time = start_date + timedelta(days=random_days, seconds=random_seconds) random_latitude = base_latitude + random.uniform(-0.01, 0.01) random_longitude = base_longitude + random.uniform(-0.01, 0.01) random_connect_status = random.choice([0, 1]) status_record = StatusRecord( device_id=device_id, latitude=random_latitude, longitude=random_longitude, timestamp=random_time, connect_status=random_connect_status ) db.add(status_record) db.commit() return {"message": "Demo data generated successfully"} @api_router.delete("/delete-data", summary="Delete all status records") def delete_all_data( device_id: str = Header(...), device_password: str = Header(...), db: Session = Depends(get_db) ): """ Delete all status records from the database. Requires device authentication. """ authenticate_device(device_id, device_password, db) try: db.query(StatusRecord).delete() db.commit() return {"message": "All data deleted successfully"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") @api_router.delete("/delete-data/{device_id}", summary="Delete status records for a specific device") def delete_device_data( device_id: str, auth_device_id: str = Header(...), device_password: str = Header(...), db: Session = Depends(get_db) ): """ Delete status records for a specific device ID. Requires device authentication. """ authenticate_device(auth_device_id, device_password, db) try: deleted_count = db.query(StatusRecord).filter(StatusRecord.device_id == device_id).delete() db.commit() if deleted_count == 0: return {"message": f"No data found for device ID: {device_id}"} return {"message": f"Data for device ID {device_id} deleted successfully"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") @api_router.post("/upload") async def upload_data( device_id: str = Header(...), device_password: str = Header(...), uuid_str: str = Form(...), latitude: float = Form(...), longitude: float = Form(...), timestamp: str = Form(...), connect_status: int = Form(...), db: Session = Depends(get_db) ): """ Upload a new status record. Requires device authentication and a unique UUID. """ authenticate_device(device_id, device_password, db) # Validate UUID try: uuid_obj = uuid_module.UUID(uuid_str) except ValueError: raise HTTPException(status_code=400, detail="Invalid UUID format") try: timestamp_dt = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') status_record = StatusRecord( uuid=str(uuid_obj), device_id=device_id, latitude=latitude, longitude=longitude, timestamp=timestamp_dt, connect_status=connect_status ) db.add(status_record) db.commit() return {"message": "Data uploaded successfully"} except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="UUID already exists") except ValueError: raise HTTPException(status_code=400, detail="Invalid timestamp format. Use 'YYYY-MM-DD HH:MM:SS'") except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") @api_router.get("/health_check", summary="Check if the API is functioning correctly") def health_check( device_id: str = Header(...), device_password: str = Header(...), db: Session = Depends(get_db) ): """ Perform a health check on the API. Requires device authentication. Returns a 200 status code if successful. Returns a 401 Unauthorized error if authentication fails. """ try: authenticate_device(device_id, device_password, db) return JSONResponse(content={"status": "ok"}, status_code=status.HTTP_200_OK) except HTTPException as e: if e.status_code == status.HTTP_401_UNAUTHORIZED: return JSONResponse(content={"status": "error", "detail": "Unauthorized"}, status_code=status.HTTP_401_UNAUTHORIZED) raise e @api_router.get("/config", summary="Get system configuration", response_model=Dict[str, int]) def get_config( device_id: str = Header(...), device_password: str = Header(...), db: Session = Depends(get_db) ): """ Retrieve the system configuration from SystemSetting. Requires device authentication. """ authenticate_device(device_id, device_password, db) system_setting = db.query(SystemSetting).first() if not system_setting: raise HTTPException(status_code=404, detail="System settings not found") return { "check_connect_period": system_setting.check_connect_period, "data_sync_period": system_setting.data_sync_period, "get_config_period": system_setting.get_config_period, "point_distance": system_setting.point_distance } @app.get("/", response_class=HTMLResponse) def show_map(request: Request, start_date: str = None, end_date: str = None, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user query = db.query(StatusRecord) if start_date and end_date: start_datetime = datetime.strptime(start_date, '%Y-%m-%d') end_datetime = datetime.strptime(end_date, '%Y-%m-%d') query = query.filter(StatusRecord.timestamp.between(start_datetime, end_datetime)) status_data = [(s.latitude, s.longitude, s.connect_status, s.device_id) for s in query.all()] m = folium.Map(location=[35.6837, 139.6805], zoom_start=12) marker_cluster = MarkerCluster().add_to(m) for lat, lon, connect_status, device_id in status_data: color = 'green' if connect_status == 1 else 'red' folium.CircleMarker( location=[lat, lon], radius=10, popup=f'{device_id}', color=color, fill=True, fill_opacity=0.6 ).add_to(marker_cluster) map_html = m._repr_html_() return templates.TemplateResponse("map.html", { "request": request, "map_html": map_html, "start_date": start_date, "end_date": end_date, "current_user": current_user }) @app.get("/download-csv") async def download_csv(request: Request, db: Session = Depends(get_db)): current_user = login_required(request, db) if isinstance(current_user, RedirectResponse): return current_user status_records = db.query(StatusRecord).all() output = io.StringIO() writer = csv.writer(output) writer.writerow(["UUID", "Device ID", "Latitude", "Longitude", "Timestamp", "Connect Status"]) for record in status_records: writer.writerow([record.uuid, record.device_id, record.latitude, record.longitude, record.timestamp, record.connect_status]) response = StreamingResponse(iter([output.getvalue()]), media_type="text/csv") response.headers["Content-Disposition"] = "attachment; filename=status_records.csv" return response @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): if exc.status_code == status.HTTP_401_UNAUTHORIZED: return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) return templates.TemplateResponse("error.html", {"request": request, "detail": exc.detail}, status_code=exc.status_code) # Include the routers app.include_router(admin_router) app.include_router(api_router) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)