|
|
|
import socket |
|
import asyncio |
|
import aiofiles |
|
import psutil |
|
import requests |
|
|
|
from os.path import join |
|
from tld import get_tld |
|
from loguru import logger |
|
from playwright.async_api import async_playwright, Page |
|
from playwright_stealth import stealth_async |
|
|
|
from option_mysql import get_aiomysql_instance |
|
from __init__ import ( |
|
gotify_host, |
|
gotify_port, |
|
gotify_token, |
|
gotify_img, |
|
priority, |
|
DIR_PATH, |
|
DEBUG, |
|
browser_headless, |
|
browser_proxy, |
|
ignore_https_errors, |
|
user_agent, |
|
) |
|
|
|
|
|
|
|
async def init_page(storage_state=None, browser_type: str = "chromium") -> Page: |
|
if browser_type not in ("chromium", "firefox", "webkit"): |
|
raise TypeError("unspported browser") |
|
|
|
async with async_playwright() as p: |
|
if browser_type == "chromium": |
|
browser = await p.chromium.launch(headless=browser_headless) |
|
elif browser_type == "firefox": |
|
browser = await p.firefox.launch(headless=browser_headless) |
|
else: |
|
logger.error("初始化浏览器失败") |
|
exit(-1) |
|
|
|
context = await browser.new_context( |
|
proxy=browser_proxy, |
|
ignore_https_errors=ignore_https_errors, |
|
user_agent=user_agent, |
|
) |
|
|
|
context = await browser.new_context(storage_state=storage_state) |
|
page = await context.new_page() |
|
|
|
await stealth_async(page) |
|
|
|
await goto_github(page) |
|
await dump_cookie(page) |
|
await browser.close() |
|
|
|
|
|
async def goto_github(page): |
|
await page.goto("https://github.com") |
|
|
|
|
|
async def dump_cookie(page): |
|
url = "https://huggingface.co/login" |
|
save_cookie = join(DIR_PATH, f"{get_domain(url)}.json") |
|
await page.goto(url) |
|
logger.debug(DIR_PATH) |
|
await page.screenshot(path=join(DIR_PATH, "screenshot.png"), full_page=True) |
|
while input("input c to continue: ") != "c": |
|
continue |
|
await page.context.storage_state(path=save_cookie) |
|
|
|
|
|
|
|
def get_local_ip(): |
|
net_card_info = [] |
|
info = psutil.net_if_addrs() |
|
for k, v in info.items(): |
|
for item in v: |
|
if item[0] == 2 and not item[1] == "127.0.0.1": |
|
net_card_info.append(item[1]) |
|
local_ip = net_card_info[0] |
|
return local_ip |
|
|
|
|
|
|
|
def get_domain(url: str) -> str: |
|
result = get_tld(url, as_object=True) |
|
domain = result.domain |
|
return domain |
|
|
|
|
|
|
|
def get_pi_ip(host: str) -> str: |
|
domain_ip = socket.gethostbyname(host) |
|
return domain_ip |
|
|
|
|
|
|
|
def push_msg( |
|
title: str = "无标题", message: str = "无内容", img_url: str = gotify_img |
|
) -> dict: |
|
gotify_ip = get_pi_ip(gotify_host) |
|
url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}" |
|
data = { |
|
"title": title, |
|
"message": message, |
|
"priority": priority, |
|
"extras": { |
|
"client::display": {"contentType": "text/plaintext"}, |
|
"client::notification": {"bigImageUrl": img_url}, |
|
}, |
|
} |
|
resp = requests.post(url=url, json=data, timeout=20) |
|
return {"code": resp.status_code, "res": resp.content} |
|
|
|
|
|
async def get_cookie_from_pi() -> dict: |
|
sql = f"SELECT cookie_name, cookie_value FROM cookie" |
|
pool = await get_aiomysql_instance() |
|
data = await pool.query(sql) |
|
for d in data: |
|
log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json") |
|
async with aiofiles.open(log_file, mode="w") as handle: |
|
await handle.write(d["cookie_value"]) |
|
return data[0]["cookie_name"] |
|
|
|
|
|
async def read_logfile(log_file: str) -> str: |
|
async with aiofiles.open(log_file, mode="r", encoding="utf-8") as f: |
|
data = await f.read() |
|
return data |
|
|
|
|
|
|
|
async def get_streevoice_today_song(page): |
|
await page.goto("https://streetvoice.com/") |
|
song = await page.locator("#player-collapse h4").get_by_role("link").text_content() |
|
logger.info(f"街声今日歌曲:{song}") |
|
await page.go_back() |
|
return song |
|
|
|
|
|
async def main(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
res = await read_logfile("log/bilibili.log") |
|
logger.debug(f"utils test: {res}") |
|
|
|
|
|
if __name__ == "__main__": |
|
logger.debug(f"DEBUG Mode: {DEBUG}") |
|
logger.debug(f"DIR_PATH: {DIR_PATH}") |
|
|
|
|