sig / api.py
Evilmass
change gotify priority = 0
365ad3d
"""
https://github.com/agronholm/apscheduler/blob/3.x/examples/schedulers/asyncio_.py
"""
import asyncio
import threading
import time
import pytz
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
from _playwright.bilibili import bilibili_sign, bilibili_live
from _playwright.v2ex import v2ex_sign
from _playwright.vits import vits_sign
from _playwright.tsdm import tsdm_sign
from _playwright.wuaipojie import wuaipojie_sign
from _requests.aliyundrive import aliyundrive_sign, validate_access_token
def tick():
logger.debug(f"Tick! The time is: {datetime.now():%Y-%m-%d %H:%M:%S}")
def run_scheduler(loop):
asyncio.set_event_loop(loop)
# add task
task = AsyncIOScheduler()
task.configure(timezone=pytz.timezone("Asia/Shanghai"))
task.add_job(tick, "cron", hour="*/2", next_run_time=datetime.now(), coalesce=True)
task.add_job(
bilibili_sign,
"cron",
hour=0,
minute=1,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
bilibili_live,
"cron",
hour=5,
minute=1,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
v2ex_sign,
"cron",
hour=0,
minute=2,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
tsdm_sign,
"cron",
hour=12,
minute=3,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
wuaipojie_sign,
"cron",
hour=12,
minute=4,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
vits_sign,
"cron",
hour="0, 12",
minute=6,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
aliyundrive_sign,
"cron",
hour=12,
minute=7,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
validate_access_token,
"cron",
hour="*/2",
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
def start_daemon_scheduler(loop):
t = threading.Thread(target=run_scheduler, args=(loop,), name="schedule", daemon=True)
t.start()
async def kill_daemon_scheduler():
loop = asyncio._get_running_loop()
loop.stop()
if __name__ == "__main__":
schedule_loop = asyncio.new_event_loop()
start_daemon_scheduler(schedule_loop)
time.sleep(3)
asyncio.run_coroutine_threadsafe(kill_daemon_scheduler(), loop=schedule_loop)