|
|
|
import os |
|
import sys |
|
import socket |
|
import asyncio |
|
import psutil |
|
import requests |
|
|
|
from os.path import join |
|
from tld import get_tld |
|
from loguru import logger |
|
from playwright.async_api import async_playwright, Page |
|
from playwright_stealth import stealth_async |
|
|
|
from option_mysql import OptionMysql |
|
from __init__ import ( |
|
gotify_host, |
|
gotify_port, |
|
gotify_token, |
|
gotify_img, |
|
mysql_option, |
|
DIR_PATH, |
|
DEBUG, |
|
browser_headless, |
|
browser_proxy, |
|
ignore_https_errors, |
|
user_agent, |
|
) |
|
|
|
|
|
|
|
async def init_page(storage_state=None, browser_type: str = "chromium") -> Page: |
|
if browser_type not in ("chromium", "firefox", "webkit"): |
|
raise TypeError("unspported browser") |
|
|
|
async with async_playwright() as p: |
|
if browser_type == "chromium": |
|
browser = await p.chromium.launch(headless=browser_headless) |
|
elif browser_type == "firefox": |
|
browser = await p.firefox.launch(headless=browser_headless) |
|
else: |
|
logger.error("初始化浏览器失败") |
|
exit(-1) |
|
|
|
context = await browser.new_context( |
|
proxy=browser_proxy, |
|
ignore_https_errors=ignore_https_errors, |
|
user_agent=user_agent, |
|
) |
|
|
|
context = await browser.new_context(storage_state=storage_state) |
|
page = await context.new_page() |
|
|
|
await stealth_async(page) |
|
|
|
await goto_github(page) |
|
await dump_cookie(page) |
|
await browser.close() |
|
|
|
|
|
async def goto_github(page): |
|
await page.goto("https://github.com") |
|
|
|
|
|
async def dump_cookie(page): |
|
|
|
|
|
|
|
|
|
|
|
url = "https://huggingface.co/login" |
|
save_cookie = join(DIR_PATH, f"{get_domain(url)}.json") |
|
await page.goto(url) |
|
logger.debug(DIR_PATH) |
|
await page.screenshot( |
|
path=join(DIR_PATH, "screenshot.png"), |
|
full_page=True, |
|
) |
|
while input("input c to continue: ") != "c": |
|
continue |
|
await page.context.storage_state(path=save_cookie) |
|
|
|
|
|
|
|
def get_local_ip(): |
|
net_card_info = [] |
|
info = psutil.net_if_addrs() |
|
for k, v in info.items(): |
|
for item in v: |
|
if item[0] == 2 and not item[1] == "127.0.0.1": |
|
net_card_info.append(item[1]) |
|
local_ip = net_card_info[0] |
|
return local_ip |
|
|
|
|
|
|
|
def get_domain(url: str) -> str: |
|
result = get_tld(url, as_object=True) |
|
domain = result.domain |
|
return domain |
|
|
|
|
|
|
|
def get_domain_ip(host: str) -> str: |
|
domain_ip = socket.gethostbyname(host) |
|
return domain_ip |
|
|
|
|
|
|
|
def push_msg(title: str = "无标题", message: str = "无内容", img_url: str = gotify_img): |
|
gotify_ip = get_domain_ip(gotify_host) |
|
url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}" |
|
resp = requests.post( |
|
url, |
|
json={ |
|
"title": title, |
|
"message": message, |
|
"priority": 10, |
|
"extras": { |
|
"client::display": {"contentType": "text/markdown"}, |
|
"client::notification": {"bigImageUrl": img_url}, |
|
}, |
|
}, |
|
) |
|
return resp.json() |
|
|
|
|
|
def get_bilibili_live_rooms_from_pi() -> dict: |
|
logger.info("getting bilibili_live_rooms from pi") |
|
bilibili_live_rooms = {} |
|
mysql = OptionMysql(mysql_option) |
|
sql = f"SELECT room_id, username FROM live_rooms" |
|
data = mysql.get_dict_data(sql) |
|
if data: |
|
for d in data: |
|
bilibili_live_rooms[d["room_id"]] = d["username"] |
|
return bilibili_live_rooms |
|
|
|
|
|
def get_cookie_from_pi() -> dict: |
|
logger.info("getting cookie from pi") |
|
mysql = OptionMysql(mysql_option) |
|
sql = f"SELECT cookie_name, cookie_value FROM cookie" |
|
data = mysql.get_dict_data(sql) |
|
for d in data: |
|
log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json") |
|
with open(log_file, "w") as lf: |
|
lf.write(d["cookie_value"]) |
|
return data[0]["cookie_name"] |
|
|
|
|
|
def get_aliyundrive_refresh_token(): |
|
mysql = OptionMysql(mysql_option) |
|
sql = f"SELECT token FROM aliyundrive WHERE id=1" |
|
data = mysql.get_dict_data(sql) |
|
if data: |
|
logger.info("获取阿里云盘 refresh_token 成功") |
|
return data[0]["token"] |
|
return False |
|
|
|
|
|
def update_aliyundrive_access_token(access_token): |
|
mysql = OptionMysql(mysql_option) |
|
sql = f"UPDATE aliyundrive SET token=%s WHERE id=2" |
|
data = mysql.update_data(sql, [access_token]) |
|
if data: |
|
logger.info("update aliyundrive access token") |
|
return True |
|
return False |
|
|
|
|
|
def get_log(log_file: str) -> str: |
|
with open(log_file, "r", encoding="utf-8") as f: |
|
data = f.read() |
|
return data |
|
|
|
|
|
if __name__ == "__main__": |
|
logger.debug(f"DEBUG Mode: {DEBUG}") |
|
logger.debug(f"DIR_PATH: {DIR_PATH}") |
|
|
|
|
|
|
|
|
|
logger.debug(f'push_msg: {push_msg("utils.py", "hello world")}') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|