""" https://github.com/agronholm/apscheduler/blob/3.x/examples/schedulers/asyncio_.py """ import asyncio import threading import time import pytz from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler from loguru import logger from _playwright.bilibili import bilibili_sign from _playwright.v2ex import v2ex_sign from _playwright.vits import vits_sign from _playwright.tsdm import tsdm_sign from _playwright.wuaipojie import wuaipojie_sign from _requests.aliyundrive import aliyundrive_sign from utils import get_cookie_from_pi def tick(): logger.debug(f"Tick! The time is: {datetime.now():%Y-%m-%d %H:%M:%S}") def run_scheduler(loop): asyncio.set_event_loop(loop) # pi get_cookie_from_pi() # add task sign_scheduler = AsyncIOScheduler() sign_scheduler.configure(timezone=pytz.timezone("Asia/Shanghai")) # sign_scheduler.add_job(tick, "cron", second="*/59") sign_scheduler.add_job(bilibili_sign, "cron", hour=6, minute=1) sign_scheduler.add_job(v2ex_sign, "cron", hour=6, minute=2) sign_scheduler.add_job(tsdm_sign, "cron", hour="*/6", minute=3) sign_scheduler.add_job(wuaipojie_sign, "cron", hour=6, minute=4) sign_scheduler.add_job(vits_sign, "cron", hour="0, 12", minute=6) sign_scheduler.add_job(aliyundrive_sign, "cron", hour="*/2") sign_scheduler.start() try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass def start_daemon_scheduler(loop): t = threading.Thread(target=run_scheduler, args=(loop,), name="schedule", daemon=True) t.start() async def kill_daemon_scheduler(): loop = asyncio._get_running_loop() loop.stop() if __name__ == "__main__": schedule_loop = asyncio.new_event_loop() start_daemon_scheduler(schedule_loop) time.sleep(3) asyncio.run_coroutine_threadsafe(kill_daemon_scheduler(), loop=schedule_loop)