# coding:utf-8 import sys import random import asyncio from os.path import abspath, dirname, join from playwright.async_api import async_playwright sys.path.append(join(dirname(abspath(__file__)), "../")) from utils import push_msg, get_streevoice_today_song from __init__ import ( browser_headless, browser_proxy, tsdm_url, tsdm_sign_url, tsdm_cliwork_url, tsdm_plugin_id, tsdm_minds, tsdm_cookie_file, tsdm_title, tsdm_logger, tsdm_img, ) # on 监听弹窗事件 async def close_popup(popup): # await popup.wait_for_load_state() print(await popup.title()) await popup.close() # 判断登录状态 async def is_login(page): await page.goto(tsdm_url) await page.wait_for_load_state() if await page.locator("#pm_ntc").is_enabled(): # 消息按钮 tsdm_logger.info("登录成功") return True else: stderr = "Cookie 过期" tsdm_logger.error(stderr) push_msg(tsdm_title, stderr, tsdm_img) return False # 签到 async def sign(page): msg = [] if not await is_login(page): return "tsdm_not_login" await page.goto(tsdm_sign_url) if await page.get_by_text("今天已签到").is_visible(): sign_res = "今天已签到" tsdm_logger.info(sign_res) msg.append(sign_res) else: song = await get_streevoice_today_song(page) mind = random.choice(list(tsdm_minds.keys())) tsdm_logger.info(f"签到心情选择:{tsdm_minds[mind]}") await page.get_by_role("combobox").select_option(mind) await page.get_by_label("自己填写").check() # 不想填写 await page.locator("#todaysay").type(song, delay=200) await page.get_by_text("点我签到!").click() sign_res = "签到成功" tsdm_logger.info(sign_res) msg.append(sign_res) return "\n".join(msg) # 打工 async def cliwork(page): msg = [] if not await is_login(page): return "tsdm_not_login" await page.goto(tsdm_url) # 切换回电脑版 await page.goto(tsdm_cliwork_url) # 已经打工会跳转回首页 if await page.get_by_text("需要等待").is_visible(timeout=3) or await page.get_by_text("欢迎新会员").is_visible(timeout=3): has_work = "今天已打工" tsdm_logger.info(has_work) msg.append(has_work) else: await page.locator("#workstart").is_enabled() for pid in tsdm_plugin_id: await page.locator(f"#np_advid{pid}").get_by_role("link").click() page.on("popup", close_popup) # handle popup 不能 await await page.pause() await page.bring_to_front() # 回到第一个标签页 await page.locator("#workstart").click() # 领取奖励 work_res = "打工成功" tsdm_logger.info(work_res) msg.append(work_res) return "\n".join(msg) async def tsdm_sign(): async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=browser_headless) context = await browser.new_context(storage_state=tsdm_cookie_file) # 签到 sign_page = await context.new_page() sign_task = asyncio.create_task(sign(sign_page)) await asyncio.gather(sign_task) await sign_page.close() # # 打工 cliwork_page = await context.new_page() cliwork_task = asyncio.create_task(cliwork(cliwork_page)) await asyncio.gather(cliwork_task) await cliwork_page.close() # 退出 await browser.close() # 推送 sign_result = sign_task.result() cliwork_result = cliwork_task.result() # ""分割两个服务的消息 push_msg(tsdm_title, "\n".join([sign_result, "", cliwork_result]), tsdm_img) if __name__ == "__main__": asyncio.run(tsdm_sign())