File size: 4,205 Bytes
65d3d39
 
 
 
 
 
 
 
 
5b6bac4
65d3d39
 
 
 
 
 
 
 
 
6ed2c3e
65d3d39
5b6bac4
 
 
 
 
 
 
 
 
 
 
65d3d39
 
 
5b6bac4
65d3d39
5b6bac4
 
65d3d39
 
5b6bac4
 
65d3d39
 
 
 
5b6bac4
 
65d3d39
 
5b6bac4
65d3d39
 
5b6bac4
 
65d3d39
5b6bac4
 
65d3d39
 
5b6bac4
65d3d39
5b6bac4
 
65d3d39
5b6bac4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65d3d39
 
5cf3f91
5b6bac4
 
 
 
 
65d3d39
8786c0f
65d3d39
 
5b6bac4
 
65d3d39
 
 
5b6bac4
 
65d3d39
5b6bac4
 
65d3d39
5b6bac4
 
 
65d3d39
5b6bac4
65d3d39
 
5b6bac4
65d3d39
 
 
 
5b6bac4
 
 
 
65d3d39
 
 
5b6bac4
 
65d3d39
 
 
 
5b6bac4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# coding:utf-8
import sys
import asyncio

from datetime import datetime
from os.path import abspath, dirname, join
from playwright.async_api import async_playwright

sys.path.append(join(dirname(abspath(__file__)), "../"))
from utils import push_msg
from __init__ import (
    browser_headless,
    browser_proxy,
    bilibili_url,
    bilibili_sign_url,
    bilibili_live_url,
    bilibili_cookie_file,
    bilibili_title,
    bilibili_logger,
    bilibili_img,
)
from option_mysql import get_aiomysql_instance


async def get_bilibili_live_rooms_from_pi() -> dict:
    sql = f"SELECT room_id, username FROM live_rooms"
    pool = await get_aiomysql_instance()
    sql_data = await pool.query(sql)
    bilibili_live_rooms = {}
    for sd in sql_data:
        bilibili_live_rooms[sd["room_id"]] = sd["username"]
    return bilibili_live_rooms


# 判断登录状态
async def is_login(page) -> bool:
    await page.goto(bilibili_url)
    if await page.get_by_text("每日奖励", exact=True).is_enabled():
        login_res = bilibili_logger.info("Cookie 有效")
        return True
    else:
        login_res = "Cookie 失效"
        bilibili_logger.error(login_res)
    return False


# 签到
async def sign(page) -> list:
    message = [f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]

    if not await is_login(page):
        return "Cookie 失效"

    await page.goto(bilibili_sign_url)
    if await page.get_by_text("今日已签到").is_enabled():
        sign_res = f"签到状态:今日已签到"
        bilibili_logger.info(sign_res)
    elif await page.get_by_text('"code":0').is_enabled():
        sign_res = f"签到状态:签到成功"
        bilibili_logger.info(sign_res)
    else:
        sign_res = f"签到状态:签到失败"
        bilibili_logger.error(sign_res)
    message.append(sign_res)
    return message


async def bilibili_sign():
    async with async_playwright() as playwright:
        browser = await playwright.firefox.launch(headless=browser_headless)
        context = await browser.new_context(storage_state=bilibili_cookie_file)
        # 签到
        page = await context.new_page()
        task = asyncio.create_task(sign(page))
        await asyncio.gather(task)
        await page.close()
        # 退出
        await browser.close()
        # 推送
        message = "\n".join(task.result())
        push_msg(title=bilibili_title, message=message, img_url=bilibili_img)


# 直播间打卡时间是午夜,区分 0 点签到
async def live(page) -> list:
    message = [f"打卡时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]

    if not await is_login(page):
        return "Cookie 失效"

    bilibili_live_rooms = await get_bilibili_live_rooms_from_pi()
    for room_id in list(bilibili_live_rooms.keys()):
        await page.goto(f"{bilibili_live_url}/{str(room_id)}")
        # 输入框
        input_box = page.get_by_role("textbox", name="发个弹幕呗~")
        # 输入内容
        await input_box.fill("1")
        # 发送按钮
        send_button = page.get_by_role("button", name="发送")
        if await send_button.is_enabled():
            await send_button.click()
            room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功"
            bilibili_logger.info(room_res)
        else:
            room_res = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败】"
            bilibili_logger.error(room_res)
        message.append(room_res)

    return message


async def bilibili_live():
    async with async_playwright() as playwright:
        browser = await playwright.firefox.launch(headless=browser_headless)
        context = await browser.new_context(storage_state=bilibili_cookie_file)
        # 直播间打卡
        page = await context.new_page()
        task = asyncio.create_task(live(page))
        await asyncio.gather(task)
        await page.close()
        # 退出
        await browser.close()
        # 推送
        message = "\n".join(task.result())
        push_msg(title=bilibili_title, message=message, img_url=bilibili_img)


if __name__ == "__main__":
    asyncio.run(bilibili_sign())
    asyncio.run(bilibili_live())