# coding:utf-8 import os import sys import socket import asyncio import psutil import requests from os.path import join from tld import get_tld from loguru import logger from playwright.async_api import async_playwright, Page from playwright_stealth import stealth_async from option_mysql import OptionMysql from __init__ import ( gotify_host, gotify_port, gotify_token, gotify_img, mysql_option, DIR_PATH, DEBUG, browser_headless, browser_proxy, ignore_https_errors, user_agent, ) # webdriver async def init_page(storage_state=None, browser_type: str = "chromium") -> Page: if browser_type not in ("chromium", "firefox", "webkit"): raise TypeError("unspported browser") async with async_playwright() as p: if browser_type == "chromium": browser = await p.chromium.launch(headless=browser_headless) elif browser_type == "firefox": browser = await p.firefox.launch(headless=browser_headless) else: logger.error("初始化浏览器失败") exit(-1) context = await browser.new_context( proxy=browser_proxy, ignore_https_errors=ignore_https_errors, user_agent=user_agent, ) # 从文件读取 Cookie context = await browser.new_context(storage_state=storage_state) page = await context.new_page() # 隐藏 webdriver 特征 await stealth_async(page) # await page.pause() # async 跳出上下文就会 playwright._impl._api_types.Error: Connection closed await goto_github(page) await dump_cookie(page) # 创建函数:一个 browser 对应一个 context、page,在 with 内完成多个任务 await browser.close() async def goto_github(page): await page.goto("https://github.com") async def dump_cookie(page): # url = "https://www.52pojie.cn/home.php" # url = "https://v2ex.com" # url = "https://www.bilibili.com" # url = "https://www.tsdm39.com/forum.php?mobile=no" # url = "https://www.aliyundrive.com/drive" url = "https://huggingface.co/login" save_cookie = join(DIR_PATH, f"{get_domain(url)}.json") await page.goto(url) logger.debug(DIR_PATH) await page.screenshot( path=join(DIR_PATH, "screenshot.png"), full_page=True, ) # 截图 while input("input c to continue: ") != "c": # 阻塞等待填写登录信息 continue await page.context.storage_state(path=save_cookie) # 保存 Cookie 到文件 # 获取容器内 IP def get_local_ip(): net_card_info = [] info = psutil.net_if_addrs() for k, v in info.items(): for item in v: if item[0] == 2 and not item[1] == "127.0.0.1": net_card_info.append(item[1]) local_ip = net_card_info[0] return local_ip # 从 url 获取域名 def get_domain(url: str) -> str: result = get_tld(url, as_object=True) domain = result.domain return domain # 从域名获取 IP def get_domain_ip(host: str) -> str: domain_ip = socket.gethostbyname(host) return domain_ip # gotify 推送 def push_msg(title: str = "无标题", message: str = "无内容", img_url: str = gotify_img): gotify_ip = get_domain_ip(gotify_host) url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}" resp = requests.post( url, json={ "title": title, "message": message, "priority": 10, "extras": { "client::display": {"contentType": "text/markdown"}, "client::notification": {"bigImageUrl": img_url}, }, }, ) return resp.json() def get_bilibili_live_rooms_from_pi() -> dict: logger.info("getting bilibili_live_rooms from pi") bilibili_live_rooms = {} mysql = OptionMysql(mysql_option) sql = f"SELECT room_id, username FROM live_rooms" data = mysql.get_dict_data(sql) if data: for d in data: bilibili_live_rooms[d["room_id"]] = d["username"] return bilibili_live_rooms def get_cookie_from_pi() -> dict: logger.info("getting cookie from pi") mysql = OptionMysql(mysql_option) sql = f"SELECT cookie_name, cookie_value FROM cookie" data = mysql.get_dict_data(sql) for d in data: log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json") with open(log_file, "w") as lf: lf.write(d["cookie_value"]) return data[0]["cookie_name"] def get_aliyundrive_refresh_token(): mysql = OptionMysql(mysql_option) sql = f"SELECT token FROM aliyundrive WHERE id=1" data = mysql.get_dict_data(sql) if data: logger.info("获取阿里云盘 refresh_token 成功") return data[0]["token"] return False def update_aliyundrive_access_token(access_token): mysql = OptionMysql(mysql_option) sql = f"UPDATE aliyundrive SET token=%s WHERE id=2" data = mysql.update_data(sql, [access_token]) if data: logger.info("update aliyundrive access token") return True return False def get_log(log_file: str) -> str: with open(log_file, "r", encoding="utf-8") as f: data = f.read() return data if __name__ == "__main__": logger.debug(f"DEBUG Mode: {DEBUG}") logger.debug(f"DIR_PATH: {DIR_PATH}") # asyncio.run(init_page()) # logger.debug(f"get_domain_ip: {get_domain_ip(gotify_host)}") # logger.debug(f'get_domain: {get_domain("https://github.com")}') # logger.debug(f"list_path: {list_path()}") # logger.debug(f'push_msg: {push_msg("utils.py", "hello world")}') # logger.debug(f"get_aliyun_token_from_pi(): {get_aliyun_token_from_pi()}") # logger.debug(f"get_bilibili_live_rooms_from_pi(): {get_bilibili_live_rooms_from_pi()}") # logger.debug(f"get_cookie_from_pi(): {get_cookie_from_pi()}") # logger.debug(f'get_cron_log(): {get_cron_log("log/tsdm.log")}') # logger.debug(f"get_aliyundrive_access_token: {get_aliyundrive_access_token()}") # logger.debug(f"update_aliyundrive_access_token: {update_aliyundrive_access_token('123')}")