sig / _playwright /tsdm.py
Evilmass
tsdm: add has_cliwork timeout
58c0fbc
# coding:utf-8
import sys
import random
import asyncio
from os.path import abspath, dirname, join
from playwright.async_api import async_playwright
sys.path.append(join(dirname(abspath(__file__)), "../"))
from utils import push_msg, get_streevoice_today_song
from __init__ import (
browser_headless,
browser_proxy,
tsdm_url,
tsdm_sign_url,
tsdm_cliwork_url,
tsdm_plugin_id,
tsdm_minds,
tsdm_cookie_file,
tsdm_title,
tsdm_logger,
tsdm_img,
)
# on 监听弹窗事件
async def close_popup(popup):
# await popup.wait_for_load_state()
print(await popup.title())
await popup.close()
# 判断登录状态
async def is_login(page):
await page.goto(tsdm_url)
await page.wait_for_load_state()
if await page.locator("#pm_ntc").is_enabled(): # 消息按钮
tsdm_logger.info("登录成功")
return True
else:
stderr = "Cookie 过期"
tsdm_logger.error(stderr)
push_msg(tsdm_title, stderr, tsdm_img)
return False
# 签到
async def sign(page):
msg = []
if not await is_login(page):
return "tsdm_not_login"
await page.goto(tsdm_sign_url)
if await page.get_by_text("今天已签到").is_visible():
sign_res = "今天已签到"
tsdm_logger.info(sign_res)
msg.append(sign_res)
else:
song = await get_streevoice_today_song(page)
mind = random.choice(list(tsdm_minds.keys()))
tsdm_logger.info(f"签到心情选择:{tsdm_minds[mind]}")
await page.get_by_role("combobox").select_option(mind)
await page.get_by_label("自己填写").check() # 不想填写
await page.locator("#todaysay").type(song, delay=200)
await page.get_by_text("点我签到!").click()
sign_res = "签到成功"
tsdm_logger.info(sign_res)
msg.append(sign_res)
return "\n".join(msg)
# 打工
async def cliwork(page):
msg = []
if not await is_login(page):
return "tsdm_not_login"
await page.goto(tsdm_url) # 切换回电脑版
await page.goto(tsdm_cliwork_url)
# 已经打工会跳转回首页
if await page.get_by_text("需要等待").is_visible(timeout=3) or await page.get_by_text("欢迎新会员").is_visible(timeout=3):
has_work = "今天已打工"
tsdm_logger.info(has_work)
msg.append(has_work)
else:
await page.locator("#workstart").is_enabled()
for pid in tsdm_plugin_id:
await page.locator(f"#np_advid{pid}").get_by_role("link").click()
page.on("popup", close_popup) # handle popup 不能 await
await page.pause()
await page.bring_to_front() # 回到第一个标签页
await page.locator("#workstart").click() # 领取奖励
work_res = "打工成功"
tsdm_logger.info(work_res)
msg.append(work_res)
return "\n".join(msg)
async def tsdm_sign():
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=browser_headless)
context = await browser.new_context(storage_state=tsdm_cookie_file)
# 签到
sign_page = await context.new_page()
sign_task = asyncio.create_task(sign(sign_page))
await asyncio.gather(sign_task)
await sign_page.close()
# # 打工
cliwork_page = await context.new_page()
cliwork_task = asyncio.create_task(cliwork(cliwork_page))
await asyncio.gather(cliwork_task)
await cliwork_page.close()
# 退出
await browser.close()
# 推送
sign_result = sign_task.result()
cliwork_result = cliwork_task.result()
# ""分割两个服务的消息
push_msg(tsdm_title, "\n".join([sign_result, "", cliwork_result]), tsdm_img)
if __name__ == "__main__":
asyncio.run(tsdm_sign())