""" https://github.com/agronholm/apscheduler/blob/3.x/examples/schedulers/asyncio_.py """ import asyncio import threading import time import pytz from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler from loguru import logger from _playwright.bilibili import bilibili_sign, bilibili_live from _playwright.v2ex import v2ex_sign from _playwright.vits import vits_sign from _playwright.tsdm import tsdm_sign from _playwright.wuaipojie import wuaipojie_sign from _requests.aliyundrive import aliyundrive_sign, validate_access_token def tick(): logger.debug(f"Tick! The time is: {datetime.now():%Y-%m-%d %H:%M:%S}") def run_scheduler(loop): asyncio.set_event_loop(loop) # add task task = AsyncIOScheduler() task.configure(timezone=pytz.timezone("Asia/Shanghai")) task.add_job(tick, "cron", hour="*/2", next_run_time=datetime.now(), coalesce=True) task.add_job( bilibili_sign, "cron", hour=0, minute=1, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( bilibili_live, "cron", hour=5, minute=1, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( v2ex_sign, "cron", hour=0, minute=2, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( tsdm_sign, "cron", hour=12, minute=3, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( wuaipojie_sign, "cron", hour=12, minute=4, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( vits_sign, "cron", hour="0, 12", minute=6, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( aliyundrive_sign, "cron", hour=12, minute=7, next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.add_job( validate_access_token, "cron", hour="*/2", next_run_time=datetime.now(), coalesce=True, misfire_grace_time=60 * 60 * 12, ) task.start() try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass def start_daemon_scheduler(loop): t = threading.Thread(target=run_scheduler, args=(loop,), name="schedule", daemon=True) t.start() async def kill_daemon_scheduler(): loop = asyncio._get_running_loop() loop.stop() if __name__ == "__main__": schedule_loop = asyncio.new_event_loop() start_daemon_scheduler(schedule_loop) time.sleep(3) asyncio.run_coroutine_threadsafe(kill_daemon_scheduler(), loop=schedule_loop)