cyberosa
initial files and scripts. App under construction
a134d9b
raw
history blame
1.53 kB
from string import Template
from pathlib import Path
import sys
import json
import os
import time
from tqdm import tqdm
from typing import List, Any, Optional, Union
import numpy as np
import pandas as pd
import gc
import re
from dataclasses import dataclass
from enum import Enum
from json.decoder import JSONDecodeError
CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec"
BATCH_SIZE = 1000
# OMEN_SUBGRAPH = "https://api.thegraph.com/subgraphs/name/protofire/omen-xdai"
OMEN_SUBGRAPH_URL = Template(
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz"""
)
SCRIPTS_DIR = Path(__file__).parent
ROOT_DIR = SCRIPTS_DIR.parent
DATA_DIR = ROOT_DIR / "live_data"
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None)
def measure_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time:.6f} seconds")
return result
return wrapper
def _to_content(q: str) -> dict[str, Any]:
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes."""
finalized_query = {
"query": q,
"variables": None,
"extensions": {"headers": None},
}
return finalized_query