from playwright.sync_api import sync_playwright from logger import log as logger from flask import Flask, request from tls_client import Session from queue import Queue, Empty from re import findall import threading import requests import logging import asyncio import time import jwt app = Flask(__name__) log = logging.getLogger('werkzeug') log.setLevel(logging.CRITICAL) session = Session(client_identifier='chrome_118', random_tls_extension_order=True) # asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) session.headers = { "host": 'hcaptcha.com', "connection": 'keep-alive', "accept": 'application/json', "user-agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36', "sec-ch-ua": '"Chromium";v="118", "Not A(Brand";v="24", "Google Chrome";v="118"', "sec-ch-ua-mobile": '?0', "sec-ch-ua-platform": '"Windows"', "origin": 'https://newassets.hcaptcha.com', "referer": 'https://newassets.hcaptcha.com/', "sec-fetch-site": 'same-site', "sec-fetch-mode": 'cors', "sec-fetch-dest": 'empty', "accept-encoding": 'gzip, deflate, br', "accept-language": 'en-US,en-SE;q=0.9,sv-SE;q=0.6' } class inst: instance = None class HSW: def __init__(self) -> None: self.lock = threading.Lock() self.queue = Queue() self.playwright_thread = threading.Thread(target=self.task) self.playwright_thread.start() def inject_hsw(self) -> str: js = session.get('https://hcaptcha.com/1/api.js?render=explicit&onload=hcaptchaOnLoad').text version = findall(r'v1\/([A-Za-z0-9]+)\/static', js)[1] req_token = session.post('https://api.hcaptcha.com/checksiteconfig', params={ 'v': version, 'host': 'free.vps.vc', 'sitekey': 'b5a4ebdc-6a9f-4126-bb25-016be94d54d2', 'sc': '1', 'swa': '1', 'spst': '1', }).json()["c"]["req"] url = jwt.decode(req_token, options={"verify_signature": False})['l'] version = url.split("/c/")[1] logger.info(f"Pulled hsw.js -> {version}") hsw_js = requests.get(f"{url}/hsw.js").text self.page.add_script_tag(content="Object.defineProperty(navigator, \"webdriver\", {\"get\": () => false})") self.page.add_script_tag(content=hsw_js) def process_queue(self): while True: try: req, callback = self.queue.get(timeout=1) with self.lock: hsw = self.page.evaluate(f"hsw('{req}')") callback(hsw) except Empty: continue def run(self): inst.instance = self self.page.goto("https://free.vps.vc/") self.page.wait_for_load_state('domcontentloaded') self.inject_hsw() def task(self) -> None: self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch() self.page = self.browser.new_page() self.page.route("https://free.vps.vc/", lambda r: r.fulfill(status=200, content_type="text/html",)) self.run() self.process_queue() @app.route('/hsw', methods=["POST"]) def get_hsw(): s = time.time() req = request.get_json()["req"] result = Queue() def handle(hsw): result.put(hsw) inst.instance.queue.put((req, handle)) hsw = result.get() logger.info(f"Successfully Got HSW {hsw[:80]} / Length: {len(hsw)} / Time: {str(time.time() - s)[:5]}", "Success") return hsw threading.Thread(target=lambda: app.run(host='0.0.0.0', port="5000", debug=False, use_reloader=False)).start() HSW()