import os import glob import asyncio import argparse from itertools import cycle from pyrogram import Client from better_proxy import Proxy from bot.config import settings from bot.utils import logger from bot.core.tapper import run_tapper from bot.core.registrator import register_sessions start_text = """ Select an action: 1. Run clicker 2. Create session """ global tg_clients def get_session_names() -> list[str]: session_names = glob.glob("sessions/*.session") session_names = [ os.path.splitext(os.path.basename(file))[0] for file in session_names ] return session_names def get_proxies() -> list[Proxy]: if settings.USE_PROXY_FROM_FILE: with open(file="bot/config/proxies.txt", encoding="utf-8-sig") as file: proxies = [Proxy.from_str(proxy=row.strip()).as_url for row in file] else: proxies = [] return proxies async def get_tg_clients() -> list[Client]: global tg_clients session_names = get_session_names() if not session_names: raise FileNotFoundError("Not found session files") if not settings.API_ID or not settings.API_HASH: raise ValueError("API_ID and API_HASH not found in the .env file.") tg_clients = [ Client( name=session_name, api_id=settings.API_ID, api_hash=settings.API_HASH, workdir="sessions/", plugins=dict(root="bot/plugins"), ) for session_name in session_names ] return tg_clients async def process() -> None: parser = argparse.ArgumentParser() parser.add_argument("-a", "--action", type=int, help="Action to perform") logger.info(f"Detected {len(get_session_names())} sessions | {len(get_proxies())} proxies") action = parser.parse_args().action if not action: print(start_text) while True: action = input("> ") if not action.isdigit(): logger.warning("Action must be number") elif action not in ["1", "2"]: logger.warning("Action must be 1 or 2") else: action = int(action) break if action == 2: await register_sessions() elif action == 1: tg_clients = await get_tg_clients() await run_tasks(tg_clients=tg_clients) async def run_tasks(tg_clients: list[Client]): proxies = get_proxies() proxies_cycle = cycle(proxies) if proxies else None tasks = [ asyncio.create_task( run_tapper( tg_client=tg_client, proxy=next(proxies_cycle) if proxies_cycle else None, ) ) for tg_client in tg_clients ] await asyncio.gather(*tasks)