|
from string import Template |
|
from pathlib import Path |
|
import sys |
|
import json |
|
import os |
|
import time |
|
from tqdm import tqdm |
|
from typing import List, Any, Optional, Union |
|
import numpy as np |
|
import pandas as pd |
|
import gc |
|
import re |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
from json.decoder import JSONDecodeError |
|
|
|
|
|
CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec" |
|
BATCH_SIZE = 1000 |
|
OMEN_SUBGRAPH_URL = Template( |
|
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" |
|
) |
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "live_data" |
|
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" |
|
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) |
|
|
|
|
|
def measure_execution_time(func): |
|
def wrapper(*args, **kwargs): |
|
start_time = time.time() |
|
result = func(*args, **kwargs) |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
print(f"Execution time: {execution_time:.6f} seconds") |
|
return result |
|
|
|
return wrapper |
|
|
|
|
|
def _to_content(q: str) -> dict[str, Any]: |
|
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" |
|
finalized_query = { |
|
"query": q, |
|
"variables": None, |
|
"extensions": {"headers": None}, |
|
} |
|
return finalized_query |
|
|