# coding:utf-8 import sys import asyncio from datetime import datetime from os.path import abspath, dirname, join from playwright.async_api import async_playwright sys.path.append(join(dirname(abspath(__file__)), "../")) from utils import push_msg from __init__ import ( browser_headless, browser_proxy, bilibili_url, bilibili_sign_url, bilibili_live_url, bilibili_cookie_file, bilibili_title, bilibili_logger, bilibili_img, ) from option_mysql import get_aiomysql_instance async def get_bilibili_live_rooms_from_pi() -> dict: sql = f"SELECT room_id, username FROM live_rooms" pool = await get_aiomysql_instance() sql_data = await pool.query(sql) bilibili_live_rooms = {} for sd in sql_data: bilibili_live_rooms[sd["room_id"]] = sd["username"] return bilibili_live_rooms # 判断登录状态 async def is_login(page) -> bool: await page.goto(bilibili_url) if await page.get_by_text("每日奖励", exact=True).is_enabled(): login_res = bilibili_logger.info("Cookie 有效") return True else: login_res = "Cookie 失效" bilibili_logger.error(login_res) return False # 签到 async def sign(page) -> list: message = [f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"] if not await is_login(page): return "Cookie 失效" await page.goto(bilibili_sign_url) if await page.get_by_text("今日已签到").is_enabled(): sign_res = f"签到状态:今日已签到" bilibili_logger.info(sign_res) elif await page.get_by_text('"code":0').is_enabled(): sign_res = f"签到状态:签到成功" bilibili_logger.info(sign_res) else: sign_res = f"签到状态:签到失败" bilibili_logger.error(sign_res) message.append(sign_res) return message async def bilibili_sign(): async with async_playwright() as playwright: browser = await playwright.firefox.launch(headless=browser_headless) context = await browser.new_context(storage_state=bilibili_cookie_file) # 签到 page = await context.new_page() task = asyncio.create_task(sign(page)) await asyncio.gather(task) await page.close() # 退出 await browser.close() # 推送 message = "\n".join(task.result()) push_msg(title=bilibili_title, message=message, img_url=bilibili_img) # 直播间打卡时间是午夜,区分 0 点签到 async def live(page) -> list: message = [f"打卡时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"] if not await is_login(page): return "Cookie 失效" bilibili_live_rooms = await get_bilibili_live_rooms_from_pi() for room_id in list(bilibili_live_rooms.keys()): await page.goto(f"{bilibili_live_url}/{str(room_id)}") # 输入框 input_box = page.get_by_role("textbox", name="发个弹幕呗~") # 输入内容 await input_box.fill("1") # 发送按钮 send_button = page.get_by_role("button", name="发送") if await send_button.is_enabled(): await send_button.click() room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功" bilibili_logger.info(room_res) else: room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败】" bilibili_logger.error(room_res) message.append(room_res) return message async def bilibili_live(): async with async_playwright() as playwright: browser = await playwright.firefox.launch(headless=browser_headless) context = await browser.new_context(storage_state=bilibili_cookie_file) # 直播间打卡 page = await context.new_page() task = asyncio.create_task(live(page)) await asyncio.gather(task) await page.close() # 退出 await browser.close() # 推送 message = "\n".join(task.result()) push_msg(title=bilibili_title, message=message, img_url=bilibili_img) if __name__ == "__main__": asyncio.run(bilibili_sign()) asyncio.run(bilibili_live())