|
|
|
import sys |
|
import asyncio |
|
|
|
from datetime import datetime |
|
from os.path import abspath, dirname, join |
|
from playwright.async_api import async_playwright |
|
|
|
sys.path.append(join(dirname(abspath(__file__)), "../")) |
|
from utils import push_msg |
|
from __init__ import ( |
|
browser_headless, |
|
browser_proxy, |
|
bilibili_url, |
|
bilibili_sign_url, |
|
bilibili_live_url, |
|
bilibili_cookie_file, |
|
bilibili_title, |
|
bilibili_logger, |
|
bilibili_img, |
|
) |
|
from option_mysql import get_aiomysql_instance |
|
|
|
|
|
async def get_bilibili_live_rooms_from_pi() -> dict: |
|
sql = f"SELECT room_id, username FROM live_rooms" |
|
pool = await get_aiomysql_instance() |
|
sql_data = await pool.query(sql) |
|
bilibili_live_rooms = {} |
|
for sd in sql_data: |
|
bilibili_live_rooms[sd["room_id"]] = sd["username"] |
|
return bilibili_live_rooms |
|
|
|
|
|
|
|
async def is_login(page) -> bool: |
|
await page.goto(bilibili_url) |
|
if await page.get_by_text("每日奖励", exact=True).is_enabled(): |
|
login_res = bilibili_logger.info("Cookie 有效") |
|
return True |
|
else: |
|
login_res = "Cookie 失效" |
|
bilibili_logger.error(login_res) |
|
return False |
|
|
|
|
|
|
|
async def sign(page) -> list: |
|
message = [f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"] |
|
|
|
if not await is_login(page): |
|
return "Cookie 失效" |
|
|
|
await page.goto(bilibili_sign_url) |
|
if await page.get_by_text("今日已签到").is_enabled(): |
|
sign_res = f"签到状态:今日已签到" |
|
bilibili_logger.info(sign_res) |
|
elif await page.get_by_text('"code":0').is_enabled(): |
|
sign_res = f"签到状态:签到成功" |
|
bilibili_logger.info(sign_res) |
|
else: |
|
sign_res = f"签到状态:签到失败" |
|
bilibili_logger.error(sign_res) |
|
message.append(sign_res) |
|
return message |
|
|
|
|
|
async def bilibili_sign(): |
|
async with async_playwright() as playwright: |
|
browser = await playwright.firefox.launch(headless=browser_headless) |
|
context = await browser.new_context(storage_state=bilibili_cookie_file) |
|
|
|
page = await context.new_page() |
|
task = asyncio.create_task(sign(page)) |
|
await asyncio.gather(task) |
|
await page.close() |
|
|
|
await browser.close() |
|
|
|
message = "\n".join(task.result()) |
|
push_msg(title=bilibili_title, message=message, img_url=bilibili_img) |
|
|
|
|
|
|
|
async def live(page) -> list: |
|
message = [f"打卡时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"] |
|
|
|
if not await is_login(page): |
|
return "Cookie 失效" |
|
|
|
bilibili_live_rooms = await get_bilibili_live_rooms_from_pi() |
|
for room_id in list(bilibili_live_rooms.keys()): |
|
await page.goto(f"{bilibili_live_url}/{str(room_id)}") |
|
|
|
input_box = page.get_by_role("textbox", name="发个弹幕呗~") |
|
|
|
await input_box.fill("1") |
|
|
|
send_button = page.get_by_role("button", name="发送") |
|
if await send_button.is_enabled(): |
|
await send_button.click() |
|
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功" |
|
bilibili_logger.info(room_res) |
|
else: |
|
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败】" |
|
bilibili_logger.error(room_res) |
|
message.append(room_res) |
|
|
|
return message |
|
|
|
|
|
async def bilibili_live(): |
|
async with async_playwright() as playwright: |
|
browser = await playwright.firefox.launch(headless=browser_headless) |
|
context = await browser.new_context(storage_state=bilibili_cookie_file) |
|
|
|
page = await context.new_page() |
|
task = asyncio.create_task(live(page)) |
|
await asyncio.gather(task) |
|
await page.close() |
|
|
|
await browser.close() |
|
|
|
message = "\n".join(task.result()) |
|
push_msg(title=bilibili_title, message=message, img_url=bilibili_img) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(bilibili_sign()) |
|
asyncio.run(bilibili_live()) |
|
|