sig / app.py
Evilmass
only one fastapi instance
522e726
# coding: utf-8
import asyncio
import os
import requests
from os.path import join
# from uvicorn import run
from fastapi import FastAPI
from fastapi.responses import Response
from loguru import logger
from __init__ import DIR_PATH, api_token
from utils import push_msg, read_logfile, get_cookie_from_pi
from api import start_daemon_scheduler, kill_daemon_scheduler
schedule_loop = asyncio.new_event_loop()
app = FastAPI()
@app.get("/")
def home():
html_content = open("index.html").read()
return Response(content=html_content, status_code=200)
# 开启定时任务
@app.get("/run")
async def do_run(token: str):
if token != api_token:
return {"code": 422}
await get_cookie_from_pi()
start_daemon_scheduler(schedule_loop)
res = {"code": 0, "data": "start scheduler"}
return res
# 关闭定时任务
@app.get("/kill")
def do_kill(token: str):
if token != api_token:
return {"code": 422}
asyncio.run_coroutine_threadsafe(kill_daemon_scheduler(), loop=schedule_loop)
res = {"code": 0, "data": "kill scheduler"}
return res
# 查询日志
@app.get("/log")
async def do_log(token: str, name: str):
if token != api_token:
return {"code": 422}
log_file = join(DIR_PATH, f"log/{name}.log")
data = await read_logfile(log_file)
res = {"code": 200, "data": data}
return res
# 列出当前目录所有文件
@app.get("/path")
def do_path(token: str):
if token != api_token:
return {"code": 422}
data = []
for root, _, files in os.walk(DIR_PATH):
for file in files:
data.append(join(root, file))
res = {"code": 0, "data": data}
return res
# 测试连通性
@app.get("/ip")
def do_ip(token: str):
if token != api_token:
return {"code": 422}
ip = requests.get("https://checkip.amazonaws.com").text.strip()
logger.debug(f"checking ip: {ip}")
res = {"code": 0, "ip": ip}
return res
# 测试推送消息
@app.get("/push")
def do_push(token: str):
if token != api_token:
return {"code": 422}
data = push_msg("send from huggingface docker", "hello world")
return {"code": 0, "data": data}
## 不要开两个实例
# if __name__ == "__main__":
# run("app:app", host="0.0.0.0", port=8000, reload=False)