xempire1 / bot /core /bot.py
yasir68's picture
Upload folder using huggingface_hub
8cdbae1 verified
raw
history blame
19.4 kB
import asyncio
import math
import random
import time
from collections.abc import Generator
from datetime import datetime
from enum import Enum
import aiohttp
from aiohttp_proxy import ProxyConnector
from aiohttp_socks import ProxyConnector as SocksProxyConnector
from pyrogram import Client
from pytz import UTC
from bot.config.headers import headers
from bot.config.logger import log
from bot.config.settings import Strategy, config
from bot.core.api_js_helpers.bet_counter import BetCounter
from .api import CryptoBotApi
from .errors import TapsError
from .models import DbSkill, DbSkills, Profile, ProfileData, SessionData, SkillLevel
from .utils import load_codes_from_files, num_prettier
class CryptoBot(CryptoBotApi):
def __init__(self, tg_client: Client, additional_data: dict) -> None:
super().__init__(tg_client)
self.temporary_stop_taps_time = 0
self.bet_calculator = BetCounter(self)
self.pvp_count = config.PVP_COUNT
self.authorized = False
self.settings_was_set = False
self.sleep_time = config.BOT_SLEEP_TIME
self.additional_data: SessionData = SessionData.model_validate(
{k: v for d in additional_data for k, v in d.items()}
)
async def claim_daily_reward(self) -> None:
for day, status in self.data_after.daily_rewards.items():
if status == "canTake":
await self.daily_reward(json_body={"data": str(day)})
self.logger.success("Daily reward claimed")
return
async def perform_taps(self, profile: Profile) -> None:
self.logger.info("Taps started")
energy = profile.energy
while True:
taps_per_second = random.randint(*config.TAPS_PER_SECOND)
seconds = random.randint(5, 8)
earned_money = profile.money_per_tap * taps_per_second * seconds
energy_spent = math.ceil(earned_money / 2)
energy -= energy_spent
if energy < 0:
self.logger.info("Taps stopped (not enough energy)")
break
await asyncio.sleep(delay=seconds)
try:
json_data = {
"data": {
"data": {"task": {"amount": earned_money, "currentEnergy": energy}},
"seconds": seconds,
}
}
energy = await self.api_perform_taps(json_body=json_data)
self.logger.success(
f"Earned money: <y>+{num_prettier(earned_money)}</y> | Energy left: <blue>{num_prettier(energy)}</blue>"
)
except TapsError as e:
self.logger.warning(f"Taps stopped (<red>{e.message}</red>)")
self.temporary_stop_taps_time = time.monotonic() + 60 * 60 * 3
break
async def execute_and_claim_daily_quest(self) -> None:
helper_data = await self.get_helper()
helper_data.youtube.update(load_codes_from_files())
all_daily_quests = await self.all_daily_quests()
for key, value in all_daily_quests.items():
desc = value["description"]
if (
value["type"] == "youtube"
and not value["isRewarded"]
and (code := helper_data.youtube.get(value["description"])) is not None
):
await self.daily_quest_reward(json_body={"data": {"quest": key, "code": str(code)}})
self.logger.info(f"Quest <g>{desc}</g> claimed")
elif desc:
self.logger.info(f"Quest not executed: \n<r>{desc}</r>")
if not value["isRewarded"] and value["isComplete"] and not value["url"]:
await self.daily_quest_reward(json_body={"data": {"quest": key, "code": None}})
self.logger.info(f"Quest <g>{key}</g> claimed")
async def claim_all_executed_quest(self) -> None:
for i in self.data_after.quests:
if not i["isRewarded"]:
if config.SKIP_IMPROVE_DISCIPLINE_BUG and i["key"] == "improve_discipline":
continue
await self.quest_reward_claim(json_body={"data": [i["key"], None]})
self.logger.info(f'Quest <g>{i["key"]}</g> claimed ')
def random_pvp_count(self) -> int:
return random.randint(config.PVP_COUNT, config.PVP_COUNT * 2)
async def _perform_pvp(self, league: dict, strategy: str) -> None:
self.pvp_count = self.random_pvp_count()
self.logger.info(
f"PvP negotiations started | League: <blue>{league['key']}</blue> | Strategy: <g>{strategy}</g>"
)
res = await self.get_pvp_info()
await self.sleeper()
if res.get("fight"):
await self.get_pvp_claim()
await self.sleeper()
current_strategy = strategy
money = 0
while self.pvp_count > 0:
if self.balance < int(league["maxContract"]):
money_str = (
f"Profit: <y>+{num_prettier(money)}</y>"
if money >= 0
else f"Loss: <red>-{num_prettier(money)}</red>"
)
self.logger.info(f"PvP negotiations stopped (<red>not enough money</red>). Pvp profit: {money_str}")
break
if strategy == "random":
current_strategy = random.choice(self.strategies)
self.logger.info("Searching opponent...")
current_strategy = current_strategy.value if isinstance(current_strategy, Enum) else current_strategy
json_data = {"data": {"league": league["key"], "strategy": current_strategy}}
response_json = await self.get_pvp_fight(json_body=json_data)
if response_json is None:
await self.sleeper(delay=10, additional_delay=5)
continue
fight = response_json.fight
opponent_strategy = (
fight.player2Strategy if fight.player1 == self.user_profile.user_id else fight.player1Strategy
)
if fight.winner == self.user_profile.user_id:
money += fight.moneyProfit
log_part = f"You <g>WIN</g> (<y>+{num_prettier(fight.moneyProfit)})</y>"
else:
money -= fight.moneyContract
log_part = f"You <red>LOSE</red> (<y>-{num_prettier(fight.moneyProfit)}</y>)"
self.logger.success(
f"Contract sum: <y>{num_prettier(fight.moneyContract)}</y> | "
f"Your strategy: <c>{current_strategy}</c> | "
f"Opponent strategy: <blue>{opponent_strategy}</blue> | "
f"{log_part}"
)
await self.sleeper(additional_delay=10)
await self.get_pvp_claim()
self.pvp_count -= 1
await self.sleeper()
self.logger.info(
"Total money after all pvp:"
+ (f"<i><g>+{num_prettier(money)}</g></i>" if money >= 0 else f"<i><red>{num_prettier(money)}</red></i>")
)
self.pvp_count = config.PVP_COUNT
async def get_friend_reward(self) -> None:
for friend in [friend for friend in self.data_after.friends if friend["bonusToTake"] > 0]:
await self.friend_reward(json_body={"data": friend["id"]})
self.logger.info(
f"Friend <g>{friend['name']}</g> claimed money <y>{num_prettier(friend['bonusToTake'])}</y>"
)
await self.sleeper()
async def solve_quiz_and_rebus(self) -> None:
for quest in self.dbs["dbQuests"]:
quest_key = quest["key"]
if quest["requiredLevel"] > self.user_profile.level:
continue
if "t.me" in (link := quest.get("actionUrl")) and not self._is_event_solved(quest_key):
if len(link.split("/")) > 4 or "muskempire" in link:
continue
if quest["checkType"] != "fakeCheck":
link = link if "/+" in link else link.split("/")[-1]
await self.join_and_archive_channel(link)
await self.quest_check(json_body={"data": [quest_key]})
self.logger.info(
f'Claimed <g>{quest["title"]}</g> Reward: <y>+{num_prettier(quest["rewardMoney"])}</y>quest'
)
if any(i in quest_key for i in ("riddle", "rebus", "tg_story")) and not self._is_event_solved(quest_key):
await self.quest_check(json_body={"data": [quest_key, quest["checkData"]]})
self.logger.info(f"Was solved <g>{quest['title']}</g>")
def _is_event_solved(self, quest_key: str) -> bool:
return self.data_after.quests and any(i["key"] == quest_key for i in self.data_after.quests)
async def set_funds(self) -> None:
helper_data = await self.get_helper()
if helper_data.funds:
current_invest = await self.get_funds_info()
already_funded = {i["fundKey"] for i in current_invest["funds"]}
for fund in list(helper_data.funds - already_funded)[: 3 - len(already_funded)]:
if self.balance > (amount := self.bet_calculator.calculate_bet()):
self.logger.info(f"Investing <y>{num_prettier(amount)}</y> to fund <blue>{fund}</blue>")
await self.invest(json_body={"data": {"fund": fund, "money": amount}})
else:
self.logger.info("Not enough money for invest")
async def starting_pvp(self) -> None:
if self.dbs:
league_data = None
for league in self.dbs["dbNegotiationsLeague"]:
if league["key"] == config.PVP_LEAGUE:
league_data = league
break
if league_data is not None:
if self.level >= int(league_data["requiredLevel"]):
self.strategies = [strategy["key"] for strategy in self.dbs["dbNegotiationsStrategy"]]
if Strategy.random == config.PVP_STRATEGY or config.PVP_STRATEGY in self.strategies:
await self._perform_pvp(
league=league_data,
strategy=config.PVP_STRATEGY.value,
)
else:
config.PVP_ENABLED = False
self.logger.warning("PVP_STRATEGY param is invalid. PvP negotiations disabled.")
else:
config.PVP_ENABLED = False
self.logger.warning(
f"Your level is too low for the {config.PVP_LEAGUE} league. PvP negotiations disabled."
)
else:
config.PVP_ENABLED = False
self.logger.warning("PVP_LEAGUE param is invalid. PvP negotiations disabled.")
else:
self.logger.warning("Database is missing. PvP negotiations will be skipped this time.")
async def upgrade_hero(self) -> None:
available_skill = list(self._get_available_skills())
if config.AUTO_UPGRADE_HERO:
await self._upgrade_hero_skill(available_skill)
if config.AUTO_UPGRADE_MINING:
await self._upgrade_mining_skill(available_skill)
async def get_box_rewards(self) -> None:
boxes = await self.get_box_list()
for key, box_count in boxes.items():
for _ in range(box_count):
res = await self.box_open(json_body={"data": key})
self.logger.info(f"Box <g>{key}</g> Was looted: <y>{res['loot']}</y>")
async def _upgrade_mining_skill(self, available_skill: list[DbSkill]) -> None:
for skill in [skill for skill in available_skill if skill.category == "mining"]:
if (
skill.key in config.MINING_ENERGY_SKILLS
and skill.next_level <= config.MAX_MINING_ENERGY_RECOVERY_UPGRADE_LEVEL
or (
skill.next_level <= config.MAX_MINING_UPGRADE_LEVEL
or skill.skill_price <= config.MAX_MINING_UPGRADE_COSTS
)
):
await self._upgrade_skill(skill)
def _is_enough_money_for_upgrade(self, skill: DbSkill) -> bool:
return (self.balance - skill.skill_price) >= config.MONEY_TO_SAVE
async def _upgrade_hero_skill(self, available_skill: list[DbSkill]) -> None:
for skill in sorted(
[skill for skill in available_skill if skill.weight],
key=lambda x: x.weight,
reverse=True,
):
if skill.title in config.SKIP_TO_UPGRADE_SKILLS:
continue
# if skill.weight >= config.SKILL_WEIGHT or skill.skill_price <= config.MAX_SKILL_UPGRADE_COSTS:
if skill.weight >= config.SKILL_WEIGHT:
await self._upgrade_skill(skill)
async def _upgrade_skill(self, skill: DbSkill) -> None:
if self._is_enough_money_for_upgrade(skill):
try:
await self.skills_improve(json_body={"data": skill.key})
self.logger.info(
f"Skill: <blue>{skill.title}</blue> upgraded to level: <c>{skill.next_level}</c> "
f"Profit: <y>{num_prettier(skill.skill_profit)}</y> "
f"Costs: <blue>{num_prettier(skill.skill_price)}</blue> "
f"Money stay: <y>{num_prettier(self.balance)}</y> "
f"Skill weight <magenta>{skill.weight:.5f}</magenta>"
)
await self.sleeper()
except ValueError:
self.logger.exception(f"Failed to upgrade skill: {skill}")
raise
def _get_available_skills(self) -> Generator[DbSkill, None, None]:
for skill in DbSkills(**self.dbs).dbSkills:
self._calkulate_skill_requirements(skill)
if self._is_available_to_upgrade_skills(skill):
yield skill
def _calkulate_skill_requirements(self, skill: DbSkill) -> None:
skill.next_level = (
self.data_after.skills[skill.key]["level"] + 1 if self.data_after.skills.get(skill.key) else 1
)
skill.skill_profit = skill.calculate_profit(skill.next_level)
skill.skill_price = skill.price_for_level(skill.next_level)
skill.weight = skill.skill_profit / skill.skill_price
skill.progress_time = skill.get_skill_time(self.data_after)
def _is_available_to_upgrade_skills(self, skill: DbSkill) -> bool:
# check the current skill is still in the process of improvement
if skill.progress_time and skill.progress_time.timestamp() + 60 > datetime.now(UTC).timestamp():
return False
if skill.next_level > skill.maxLevel:
return False
skill_requirements = skill.get_level_by_skill_level(skill.next_level)
if not skill_requirements:
return True
return (
len(self.data_after.friends) >= skill_requirements.requiredFriends
and self.user_profile.level >= skill_requirements.requiredHeroLevel
and self._is_can_learn_skill(skill_requirements)
)
def _is_can_learn_skill(self, level: SkillLevel) -> bool:
if not level.requiredSkills:
return True
for skill, level in level.requiredSkills.items():
if skill not in self.data_after.skills:
return False
if self.data_after.skills[skill]["level"] >= level:
return True
return False
async def login_to_app(self, proxy: str | None) -> bool:
if self.authorized:
return True
tg_web_data = await self.get_tg_web_data(proxy=proxy)
self.http_client.headers["Api-Key"] = tg_web_data.hash
if await self.login(json_body=tg_web_data.request_data):
self.authorized = True
return True
return False
async def run(self, proxy: str | None) -> None:
proxy = proxy or self.additional_data.proxy
if proxy and "socks" in proxy:
proxy_conn = SocksProxyConnector.from_url(proxy)
elif proxy:
proxy_conn = ProxyConnector.from_url(proxy)
else:
proxy_conn = None
async with aiohttp.ClientSession(
headers=headers,
connector=proxy_conn,
timeout=aiohttp.ClientTimeout(total=60),
) as http_client:
self.http_client = http_client
if proxy:
await self.check_proxy(proxy=proxy)
while True:
if self.errors >= config.ERRORS_BEFORE_STOP:
self.logger.error("Bot stopped (too many errors)")
break
try:
if await self.login_to_app(proxy):
# if not self.settings_was_set:
# await self.sent_eng_settings()
data = await self.get_profile_full()
self.dbs = data["dbData"]
await self.get_box_rewards()
self.user_profile: ProfileData = ProfileData(**data)
if self.user_profile.offline_bonus > 0:
await self.get_offline_bonus()
profile = await self.syn_hero_balance()
config.MONEY_TO_SAVE = self.bet_calculator.max_bet()
self.logger.info(f"Max bet for funds saved: <y>{num_prettier(config.MONEY_TO_SAVE)}</y>")
self.data_after = await self.user_data_after()
await self.claim_daily_reward()
await self.execute_and_claim_daily_quest()
await self.syn_hero_balance()
await self.get_friend_reward()
if config.TAPS_ENABLED and profile.energy and time.monotonic() > self.temporary_stop_taps_time:
await self.perform_taps(profile)
await self.set_funds()
await self.solve_quiz_and_rebus()
await self.claim_all_executed_quest()
await self.syn_hero_balance()
await self.upgrade_hero()
if config.PVP_ENABLED:
await self.starting_pvp()
await self.syn_hero_balance()
sleep_time = random.randint(*config.BOT_SLEEP_TIME)
self.logger.info(f"Sleep minutes {sleep_time // 60} minutes")
await asyncio.sleep(sleep_time)
except RuntimeError as error:
raise error from error
except Exception:
self.errors += 1
self.authorized = False
self.logger.exception("Unknown error")
await self.sleeper(additional_delay=self.errors * 8)
else:
self.errors = 0
self.authorized = False
async def run_bot(tg_client: Client, proxy: str | None, additional_data: dict) -> None:
try:
await CryptoBot(tg_client=tg_client, additional_data=additional_data).run(proxy=proxy)
except RuntimeError:
log.bind(session_name=tg_client.name).exception("Session error")