cyberosa
new scripts and new live data
10cf834
from string import Template
from pathlib import Path
import sys
import json
import os
import time
from tqdm import tqdm
from typing import List, Any, Optional, Union
import numpy as np
import pandas as pd
import gc
import re
from dataclasses import dataclass
from enum import Enum
from json.decoder import JSONDecodeError
CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec"
BATCH_SIZE = 1000
OMEN_SUBGRAPH_URL = Template(
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz"""
)
SCRIPTS_DIR = Path(__file__).parent
ROOT_DIR = SCRIPTS_DIR.parent
DATA_DIR = ROOT_DIR / "live_data"
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None)
def measure_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time:.6f} seconds")
return result
return wrapper
def _to_content(q: str) -> dict[str, Any]:
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes."""
finalized_query = {
"query": q,
"variables": None,
"extensions": {"headers": None},
}
return finalized_query