Spaces:
Restarting
on
CPU Upgrade
Restarting
on
CPU Upgrade
#!/usr/bin/env python | |
import os | |
import json | |
import socket | |
import random | |
from datetime import datetime | |
from src.backend.run_eval_suite import run_evaluation | |
from src.backend.manage_requests import check_completed_evals, get_eval_requests, set_eval_request | |
from src.backend.sort_queue import sort_models_by_priority | |
from src.backend.envs import Tasks, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND, DEVICE, LIMIT, Task | |
from src.backend.manage_requests import EvalRequest | |
from src.leaderboard.read_evals import EvalResult | |
from src.envs import QUEUE_REPO, RESULTS_REPO, API | |
from src.utils import my_snapshot_download | |
from src.leaderboard.read_evals import get_raw_eval_results | |
from typing import Optional | |
import time | |
import logging | |
import pprint | |
def my_set_eval_request(api, eval_request, set_to_status, hf_repo, local_dir): | |
for i in range(10): | |
try: | |
set_eval_request(api=api, eval_request=eval_request, set_to_status=set_to_status, hf_repo=hf_repo, local_dir=local_dir) | |
return | |
except Exception: | |
time.sleep(60) | |
return | |
logging.getLogger("openai").setLevel(logging.WARNING) | |
logging.basicConfig(level=logging.ERROR) | |
pp = pprint.PrettyPrinter(width=80) | |
PENDING_STATUS = "PENDING" | |
RUNNING_STATUS = "RUNNING" | |
FINISHED_STATUS = "FINISHED" | |
FAILED_STATUS = "FAILED" | |
TASKS_HARNESS = [task.value for task in Tasks] | |
my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
def sanity_checks(): | |
print(f'Device: {DEVICE}') | |
# pull the eval dataset from the hub and parse any eval requests | |
# check completed evals and set them to finished | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
check_completed_evals(api=API, checked_status=RUNNING_STATUS, completed_status=FINISHED_STATUS, | |
failed_status=FAILED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND, | |
hf_repo_results=RESULTS_REPO, local_dir_results=EVAL_RESULTS_PATH_BACKEND) | |
return | |
def request_to_result_name(request: EvalRequest) -> str: | |
# Request: EvalRequest(model='meta-llama/Llama-2-13b-hf', private=False, status='FINISHED', | |
# json_filepath='./eval-queue-bk/meta-llama/Llama-2-13b-hf_eval_request_False_False_False.json', | |
# weight_type='Original', model_type='pretrained', precision='float32', base_model='', revision='main', | |
# submitted_time='2023-09-09T10:52:17Z', likes=389, params=13.016, license='?') | |
# | |
# EvalResult(eval_name='meta-llama_Llama-2-13b-hf_float32', full_model='meta-llama/Llama-2-13b-hf', | |
# org='meta-llama', model='Llama-2-13b-hf', revision='main', | |
# results={'nq_open': 33.739612188365655, 'triviaqa': 74.12505572893447}, | |
# precision=<Precision.float32: ModelDetails(name='float32', symbol='')>, | |
# model_type=<ModelType.PT: ModelDetails(name='pretrained', symbol='🟢')>, | |
# weight_type=<WeightType.Original: ModelDetails(name='Original', symbol='')>, | |
# architecture='LlamaForCausalLM', license='?', likes=389, num_params=13.016, date='2023-09-09T10:52:17Z', still_on_hub=True) | |
# | |
org_and_model = request.model.split("/", 1) | |
if len(org_and_model) == 1: | |
model = org_and_model[0] | |
res = f"{model}_{request.precision}" | |
else: | |
org = org_and_model[0] | |
model = org_and_model[1] | |
res = f"{org}_{model}_{request.precision}" | |
return res | |
def process_evaluation(task: Task, eval_request: EvalRequest) -> dict: | |
batch_size = "auto" | |
try: | |
results = run_evaluation(eval_request=eval_request, task_names=[task.benchmark], num_fewshot=task.num_fewshot, | |
batch_size=batch_size, device=DEVICE, use_cache=None, limit=LIMIT) | |
except RuntimeError as e: | |
if "No executable batch size found" in str(e): | |
batch_size = 1 | |
results = run_evaluation(eval_request=eval_request, task_names=[task.benchmark], num_fewshot=task.num_fewshot, | |
batch_size=batch_size, device=DEVICE, use_cache=None, limit=LIMIT) | |
else: | |
raise | |
print('RESULTS', results) | |
dumped = json.dumps(results, indent=2, default=lambda o: '<not serializable>') | |
print(dumped) | |
output_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, *eval_request.model.split("/"), f"results_{datetime.now()}.json") | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
with open(output_path, "w") as f: | |
f.write(dumped) | |
my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
API.upload_file(path_or_fileobj=output_path, path_in_repo=f"{eval_request.model}/results_{datetime.now()}.json", | |
repo_id=RESULTS_REPO, repo_type="dataset") | |
return results | |
def process_finished_requests(thr: int, hard_task_lst: Optional[list[str]] = None) -> bool: | |
sanity_checks() | |
current_finished_status = [FINISHED_STATUS, FAILED_STATUS] | |
# Get all eval request that are FINISHED, if you want to run other evals, change this parameter | |
eval_requests: list[EvalRequest] = get_eval_requests(job_status=current_finished_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
# Sort the evals by priority (first submitted, first run) | |
eval_requests: list[EvalRequest] = sort_models_by_priority(api=API, models=eval_requests) | |
random.shuffle(eval_requests) | |
eval_results: list[EvalResult] = get_raw_eval_results(EVAL_RESULTS_PATH_BACKEND, EVAL_REQUESTS_PATH_BACKEND, True) | |
result_name_to_request = {request_to_result_name(r): r for r in eval_requests} | |
result_name_to_result = {r.eval_name: r for r in eval_results} | |
for eval_request in eval_requests: | |
if eval_request.likes >= thr: | |
result_name: str = request_to_result_name(eval_request) | |
# Check the corresponding result | |
eval_result: Optional[EvalResult] = result_name_to_result[result_name] if result_name in result_name_to_result else None | |
# breakpoint() | |
task_lst = TASKS_HARNESS.copy() | |
random.shuffle(task_lst) | |
# Iterate over tasks and, if we do not have results for a task, run the relevant evaluations | |
for task in task_lst: | |
task_name = task.benchmark | |
do_run_task = False | |
if hard_task_lst is None or any(ss in task_name for ss in hard_task_lst): | |
do_run_task = True | |
if (eval_result is None or task_name not in eval_result.results) and do_run_task: | |
eval_request: EvalRequest = result_name_to_request[result_name] | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
results = process_evaluation(task, eval_request) | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
return True | |
return False | |
def maybe_refresh_results(thr: int, hard_task_lst: Optional[list[str]] = None) -> bool: | |
sanity_checks() | |
current_finished_status = [PENDING_STATUS, FINISHED_STATUS, FAILED_STATUS] | |
# Get all eval request that are FINISHED, if you want to run other evals, change this parameter | |
eval_requests: list[EvalRequest] = get_eval_requests(job_status=current_finished_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
# Sort the evals by priority (first submitted, first run) | |
eval_requests: list[EvalRequest] = sort_models_by_priority(api=API, models=eval_requests) | |
random.shuffle(eval_requests) | |
eval_results: list[EvalResult] = get_raw_eval_results(EVAL_RESULTS_PATH_BACKEND, EVAL_REQUESTS_PATH_BACKEND, True) | |
result_name_to_request = {request_to_result_name(r): r for r in eval_requests} | |
result_name_to_result = {r.eval_name: r for r in eval_results} | |
for eval_request in eval_requests: | |
if eval_request.likes >= thr: | |
result_name: str = request_to_result_name(eval_request) | |
# Check the corresponding result | |
eval_result: Optional[EvalResult] = result_name_to_result[result_name] if result_name in result_name_to_result else None | |
task_lst = TASKS_HARNESS.copy() | |
random.shuffle(task_lst) | |
# Iterate over tasks and, if we do not have results for a task, run the relevant evaluations | |
for task in task_lst: | |
task_name = task.benchmark | |
do_run_task = False | |
if hard_task_lst is None or any(ss in task_name for ss in hard_task_lst): | |
do_run_task = True | |
task_lst = ['nq', 'trivia', 'tqa', 'self'] | |
if (eval_result is None or do_run_task or task_name not in eval_result.results or | |
any(ss in task_name for ss in task_lst)): | |
eval_request: EvalRequest = result_name_to_request[result_name] | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
results = process_evaluation(task, eval_request) | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
return True | |
return False | |
def process_pending_requests() -> bool: | |
sanity_checks() | |
current_pending_status = [PENDING_STATUS] | |
# Get all eval request that are PENDING, if you want to run other evals, change this parameter | |
eval_requests = get_eval_requests(job_status=current_pending_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
# Sort the evals by priority (first submitted, first run) | |
eval_requests = sort_models_by_priority(api=API, models=eval_requests) | |
random.shuffle(eval_requests) | |
print(f"Found {len(eval_requests)} {','.join(current_pending_status)} eval requests") | |
if len(eval_requests) == 0: | |
return False | |
eval_request = eval_requests[0] | |
pp.pprint(eval_request) | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
task_lst = TASKS_HARNESS.copy() | |
random.shuffle(task_lst) | |
for task in task_lst: | |
results = process_evaluation(task, eval_request) | |
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) | |
my_set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) | |
return True | |
if __name__ == "__main__": | |
wait = True | |
hard_task_lst = None | |
if socket.gethostname() in {'hamburg', 'neuromancer'} or os.path.isdir("/home/pminervi"): | |
wait = False | |
hard_task_lst = ['nq', 'trivia', 'tqa'] | |
if wait: | |
time.sleep(60 * random.randint(5, 10)) | |
res = False | |
if random.randint(0, 10) == 0: | |
res = process_pending_requests() | |
time.sleep(60) | |
if res is False: | |
if random.randint(0, 5) == 0: | |
res = maybe_refresh_results(100, hard_task_lst=hard_task_lst) | |
else: | |
res = process_finished_requests(100, hard_task_lst=hard_task_lst) | |
time.sleep(60) | |
if res is False: | |
if random.randint(0, 5) == 0: | |
res = maybe_refresh_results(0, hard_task_lst=hard_task_lst) | |
else: | |
res = process_finished_requests(0, hard_task_lst=hard_task_lst) | |