File size: 4,205 Bytes
65d3d39 5b6bac4 65d3d39 6ed2c3e 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5cf3f91 5b6bac4 65d3d39 8786c0f 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 65d3d39 5b6bac4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# coding:utf-8
import sys
import asyncio
from datetime import datetime
from os.path import abspath, dirname, join
from playwright.async_api import async_playwright
sys.path.append(join(dirname(abspath(__file__)), "../"))
from utils import push_msg
from __init__ import (
browser_headless,
browser_proxy,
bilibili_url,
bilibili_sign_url,
bilibili_live_url,
bilibili_cookie_file,
bilibili_title,
bilibili_logger,
bilibili_img,
)
from option_mysql import get_aiomysql_instance
async def get_bilibili_live_rooms_from_pi() -> dict:
sql = f"SELECT room_id, username FROM live_rooms"
pool = await get_aiomysql_instance()
sql_data = await pool.query(sql)
bilibili_live_rooms = {}
for sd in sql_data:
bilibili_live_rooms[sd["room_id"]] = sd["username"]
return bilibili_live_rooms
# 判断登录状态
async def is_login(page) -> bool:
await page.goto(bilibili_url)
if await page.get_by_text("每日奖励", exact=True).is_enabled():
login_res = bilibili_logger.info("Cookie 有效")
return True
else:
login_res = "Cookie 失效"
bilibili_logger.error(login_res)
return False
# 签到
async def sign(page) -> list:
message = [f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not await is_login(page):
return "Cookie 失效"
await page.goto(bilibili_sign_url)
if await page.get_by_text("今日已签到").is_enabled():
sign_res = f"签到状态:今日已签到"
bilibili_logger.info(sign_res)
elif await page.get_by_text('"code":0').is_enabled():
sign_res = f"签到状态:签到成功"
bilibili_logger.info(sign_res)
else:
sign_res = f"签到状态:签到失败"
bilibili_logger.error(sign_res)
message.append(sign_res)
return message
async def bilibili_sign():
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=browser_headless)
context = await browser.new_context(storage_state=bilibili_cookie_file)
# 签到
page = await context.new_page()
task = asyncio.create_task(sign(page))
await asyncio.gather(task)
await page.close()
# 退出
await browser.close()
# 推送
message = "\n".join(task.result())
push_msg(title=bilibili_title, message=message, img_url=bilibili_img)
# 直播间打卡时间是午夜,区分 0 点签到
async def live(page) -> list:
message = [f"打卡时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not await is_login(page):
return "Cookie 失效"
bilibili_live_rooms = await get_bilibili_live_rooms_from_pi()
for room_id in list(bilibili_live_rooms.keys()):
await page.goto(f"{bilibili_live_url}/{str(room_id)}")
# 输入框
input_box = page.get_by_role("textbox", name="发个弹幕呗~")
# 输入内容
await input_box.fill("1")
# 发送按钮
send_button = page.get_by_role("button", name="发送")
if await send_button.is_enabled():
await send_button.click()
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功"
bilibili_logger.info(room_res)
else:
room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败】"
bilibili_logger.error(room_res)
message.append(room_res)
return message
async def bilibili_live():
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=browser_headless)
context = await browser.new_context(storage_state=bilibili_cookie_file)
# 直播间打卡
page = await context.new_page()
task = asyncio.create_task(live(page))
await asyncio.gather(task)
await page.close()
# 退出
await browser.close()
# 推送
message = "\n".join(task.result())
push_msg(title=bilibili_title, message=message, img_url=bilibili_img)
if __name__ == "__main__":
asyncio.run(bilibili_sign())
asyncio.run(bilibili_live())
|