sig / _playwright /bilibili.py
Evilmass
fix tsdm cliwork
5cf3f91
# coding:utf-8
import sys
import asyncio
from datetime import datetime
from os.path import abspath, dirname, join
from playwright.async_api import async_playwright
sys.path.append(join(dirname(abspath(__file__)), "../"))
from utils import push_msg
from __init__ import (
browser_headless,
browser_proxy,
bilibili_url,
bilibili_sign_url,
bilibili_live_url,
bilibili_cookie_file,
bilibili_title,
bilibili_logger,
bilibili_img,
)
from option_mysql import get_aiomysql_instance
async def get_bilibili_live_rooms_from_pi() -> dict:
sql = f"SELECT room_id, username FROM live_rooms"
pool = await get_aiomysql_instance()
sql_data = await pool.query(sql)
bilibili_live_rooms = {}
for sd in sql_data:
bilibili_live_rooms[sd["room_id"]] = sd["username"]
return bilibili_live_rooms
# 判断登录状态
async def is_login(page) -> bool:
await page.goto(bilibili_url)
if await page.get_by_text("每日奖励", exact=True).is_enabled():
login_res = bilibili_logger.info("Cookie 有效")
return True
else:
login_res = "Cookie 失效"
bilibili_logger.error(login_res)
return False
# 签到
async def sign(page) -> list:
message = [f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not await is_login(page):
return "Cookie 失效"
await page.goto(bilibili_sign_url)
if await page.get_by_text("今日已签到").is_enabled():
sign_res = f"签到状态:今日已签到"
bilibili_logger.info(sign_res)
elif await page.get_by_text('"code":0').is_enabled():
sign_res = f"签到状态:签到成功"
bilibili_logger.info(sign_res)
else:
sign_res = f"签到状态:签到失败"
bilibili_logger.error(sign_res)
message.append(sign_res)
return message
async def bilibili_sign():
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=browser_headless)
context = await browser.new_context(storage_state=bilibili_cookie_file)
# 签到
page = await context.new_page()
task = asyncio.create_task(sign(page))
await asyncio.gather(task)
await page.close()
# 退出
await browser.close()
# 推送
message = "\n".join(task.result())
push_msg(title=bilibili_title, message=message, img_url=bilibili_img)
# 直播间打卡时间是午夜,区分 0 点签到
async def live(page) -> list:
message = [f"打卡时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not await is_login(page):
return "Cookie 失效"
bilibili_live_rooms = await get_bilibili_live_rooms_from_pi()
for room_id in list(bilibili_live_rooms.keys()):
await page.goto(f"{bilibili_live_url}/{str(room_id)}")
# 输入框
input_box = page.get_by_role("textbox", name="发个弹幕呗~")
# 输入内容
await input_box.fill("1")
# 发送按钮
send_button = page.get_by_role("button", name="发送")
if await send_button.is_enabled():
await send_button.click()
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功"
bilibili_logger.info(room_res)
else:
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败】"
bilibili_logger.error(room_res)
message.append(room_res)
return message
async def bilibili_live():
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=browser_headless)
context = await browser.new_context(storage_state=bilibili_cookie_file)
# 直播间打卡
page = await context.new_page()
task = asyncio.create_task(live(page))
await asyncio.gather(task)
await page.close()
# 退出
await browser.close()
# 推送
message = "\n".join(task.result())
push_msg(title=bilibili_title, message=message, img_url=bilibili_img)
if __name__ == "__main__":
asyncio.run(bilibili_sign())
asyncio.run(bilibili_live())