signal-tracker / api.py
vumichien's picture
up
7e6b994
from fastapi import APIRouter, Depends, HTTPException, Header, status
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from models import StatusRecord, Device, StatusRecordBatch, SystemSetting
from database import get_db
from datetime import datetime, timedelta
import uuid as uuid_module
import random
from sqlalchemy.exc import IntegrityError
from typing import Dict
api_router = APIRouter(prefix="/api", tags=["api"])
def authenticate_device(device_id: str, device_password: str, db: Session = Depends(get_db)):
device = db.query(Device).filter(Device.device_id == device_id).first()
if not device or device.password != device_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid device credentials",
)
return device
@api_router.post("/generate-data")
def generate_data(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
authenticate_device(device_id, device_password, db)
base_latitude = 35.6837
base_longitude = 139.6805
start_date = datetime(2024, 8, 1)
end_date = datetime(2024, 8, 7)
delta = end_date - start_date
for _ in range(100):
random_days = random.randint(0, delta.days)
random_seconds = random.randint(0, 86400)
random_time = start_date + timedelta(days=random_days, seconds=random_seconds)
random_latitude = base_latitude + random.uniform(-0.01, 0.01)
random_longitude = base_longitude + random.uniform(-0.01, 0.01)
random_connect_status = random.choice([0, 1])
status_record = StatusRecord(
device_id=device_id,
latitude=random_latitude,
longitude=random_longitude,
timestamp=random_time,
connect_status=random_connect_status,
)
db.add(status_record)
db.commit()
return {"message": "Demo data generated successfully"}
@api_router.delete("/delete-data", summary="Delete all status records")
def delete_all_data(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Delete all status records from the database.
Requires device authentication.
"""
authenticate_device(device_id, device_password, db)
try:
db.query(StatusRecord).delete()
db.commit()
return {"message": "All data deleted successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@api_router.delete(
"/delete-data/{device_id}", summary="Delete status records for a specific device"
)
def delete_device_data(
device_id: str,
auth_device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Delete status records for a specific device ID.
Requires device authentication.
"""
authenticate_device(auth_device_id, device_password, db)
try:
deleted_count = (
db.query(StatusRecord).filter(StatusRecord.device_id == device_id).delete()
)
db.commit()
if deleted_count == 0:
return {"message": f"No data found for device ID: {device_id}"}
return {"message": f"Data for device ID {device_id} deleted successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@api_router.post("/upload_batch")
async def upload_data_batch(
records: StatusRecordBatch,
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Upload multiple status records in a single request.
Requires device authentication and unique UUIDs for each record.
Uses the device_id from the header for all records.
"""
authenticate_device(device_id, device_password, db)
successful_uploads = 0
failed_uploads = 0
error_messages = []
failed_records = []
for record in records.records:
try:
# Validate UUID
uuid_obj = uuid_module.UUID(record.uuid)
# Validate timestamp
timestamp_dt = datetime.strptime(record.timestamp, "%Y-%m-%d %H:%M:%S")
status_record = StatusRecord(
uuid=str(uuid_obj),
device_id=device_id,
latitude=record.latitude,
longitude=record.longitude,
timestamp=timestamp_dt,
connect_status=record.connect_status,
)
db.add(status_record)
successful_uploads += 1
except ValueError as ve:
failed_uploads += 1
error_messages.append(f"Invalid data format: {str(ve)}")
failed_records.append(str(uuid_obj))
except IntegrityError:
db.rollback()
failed_uploads += 1
error_messages.append(f"Duplicate UUID: {record.uuid}")
failed_records.append(str(uuid_obj))
except Exception as e:
db.rollback()
failed_uploads += 1
error_messages.append(f"Error processing record: {str(e)}")
failed_records.append(str(uuid_obj))
try:
db.commit()
except Exception as e:
db.rollback()
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"message": f"Error committing to database: {str(e)}"},
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "ok",
"message": "Batch upload completed",
"successful_uploads": successful_uploads,
"failed_uploads": failed_uploads,
"errors": error_messages,
"failed_records": failed_records,
},
)
@api_router.get("/health_check", summary="Check if the API is functioning correctly")
def health_check(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Perform a health check on the API.
Requires device authentication.
Returns a 200 status code if successful.
Returns a 401 Unauthorized error if authentication fails.
"""
try:
authenticate_device(device_id, device_password, db)
return JSONResponse(content={"status": "ok"}, status_code=status.HTTP_200_OK)
except HTTPException as e:
if e.status_code == status.HTTP_401_UNAUTHORIZED:
return JSONResponse(
content={"status": "error", "detail": "Unauthorized"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
raise e
@api_router.get(
"/config", summary="Get system configuration", response_model=Dict[str, int]
)
def get_config(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Retrieve the system configuration from SystemSetting.
Requires device authentication.
"""
authenticate_device(device_id, device_password, db)
system_setting = db.query(SystemSetting).first()
if not system_setting:
raise HTTPException(status_code=404, detail="System settings not found")
return {
"check_connect_period": system_setting.check_connect_period,
"data_sync_period": system_setting.data_sync_period,
"get_config_period": system_setting.get_config_period,
"point_distance": system_setting.point_distance,
}