run-duckdb-jobs / start_app.py
lhoestq's picture
lhoestq HF staff
minor
287c8b4
import json
import os
import re
import subprocess
import time
import yaml
import gradio as gr
import pandas as pd
import requests
from huggingface_hub import HfApi, get_token
CMD = ["python" ,"run_job.py"]
ARG_NAMES = ["<src>", "<dst>", "<query>", "[-c config]", "[-s split]", "[-p private]"]
SPACE_ID = os.environ.get("SPACE_ID") or "lhoestq/run-duckdb-jobs"
CONTENT = """
## Usage:
```bash
curl -L 'https://huggingface.co/api/jobs/<username>' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <hf_token>' \
-d '{{
"spaceId": "{SPACE_ID}",
"command": {CMD},
"arguments": {ARG_NAMES},
"environment": {{"HF_TOKEN": <hf_token>}},
"flavor": "cpu-basic"
}}'
```
## Example:
"""
with open("README.md") as f:
METADATA = yaml.safe_load(f.read().split("---\n")[1])
TITLE = METADATA["title"]
SHORT_DESCRIPTION = METADATA.get("short_description")
EMOJI = METADATA["emoji"]
try:
process = subprocess.run(CMD + ["--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
HELP = not process.returncode and (process.stdout or process.stderr).decode()
except Exception:
HELP = False
DRY_RUN = bool(HELP) and bool(m :=re.search("--dry(-|_)run", HELP)) and m.group(0)
def parse_log(line: str, pbars: dict[str, float] = None):
if line.startswith("data: {"):
data = json.loads(line[len("data: "):])
data, timestamp = data["data"], data["timestamp"]
if pbars is not None and data.startswith("===== Job started at"):
pbars.pop("Starting βš™οΈ", None)
pbars["Running πŸƒ"] = 0.0
return f"[{timestamp}] {data}\n\n"
elif pbars is not None and (percent_match := re.search("\\d+(?:\\.\\d+)?%", data)) and any(c in data.split("%")[1][:10] for c in "|β–ˆβ–Œ"):
pbars.pop("Running πŸƒ", None)
[pbars.pop(desc) for desc, percent in pbars.items() if percent == 1.]
percent = float(percent_match.group(0)[:-1]) / 100
desc = data[:percent_match.start()].strip() or "Progress"
pbars[desc] = percent
else:
return f"[{timestamp}] {data}\n\n"
return ""
def dry_run(src, config, split, dst, query):
if not all([src, dst, query]):
raise gr.Error("Please fill source, destination and query.")
args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query, DRY_RUN]
cmd = CMD + args
logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n"
yield {output_markdown: logs, progress_labels: gr.Label(visible=False), details_accordion: gr.Accordion(open=True)}
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, b""):
logs += line.decode()
yield {output_markdown: logs}
def run(src, config, split, dst, query, oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None):
if not all([src, dst, query]):
raise gr.Error("Please fill source, destination and query.")
if oauth_token and profile:
token = oauth_token.token
username = profile.username
elif (token := get_token()):
username = HfApi().whoami(token=token)["name"]
else:
raise gr.Error("Please log in to run the job.")
args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query]
cmd = CMD + args
logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n"
pbars = {}
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
resp = requests.post(
f"https://huggingface.co/api/jobs/{username}",
json={
"spaceId": SPACE_ID,
"arguments": args,
"command": CMD,
"environment": {"HF_TOKEN": token},
"flavor": "cpu-basic"
},
headers={"Authorization": f"Bearer {token}"}
)
if resp.status_code != 200:
logs += resp.text
pbars = {"Finished with an error ❌": 1.0}
else:
job_id = resp.json()["metadata"]["job_id"]
pbars = {"Starting βš™οΈ": 0.0}
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
resp = requests.get(
f"https://huggingface.co/api/jobs/{username}/{job_id}/logs-stream",
headers={"Authorization": f"Bearer {token}"},
stream=True
)
for line in resp.iter_lines():
logs += parse_log(line.decode("utf-8"), pbars=pbars)
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
job_status = {"status": {"stage": "RUNNING"}}
while True:
job_status = requests.get(
f"https://huggingface.co/api/jobs/{username}/{job_id}",
headers={"Authorization": f"Bearer {token}"}
).json()
if job_status["status"]["stage"] == "RUNNING":
time.sleep(1)
else:
break
if job_status["status"]["stage"] == "COMPLETED":
pbars = {"Finished βœ…": 1.0}
else:
logs += f'{job_status["status"]["message"]} ({job_status["status"]["error"]})'
pbars = {"Finished with an error ❌": 1.0}
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
READ_FUNCTIONS = ("pl.read_parquet", "pl.read_csv", "pl.read_json")
NUM_TRENDING_DATASETS = 10
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=10):
gr.Markdown(f"# {TITLE} {EMOJI}")
if SHORT_DESCRIPTION:
gr.Markdown(SHORT_DESCRIPTION)
with gr.Column():
gr.LoginButton()
gr.Markdown(CONTENT.format(SPACE_ID=SPACE_ID, CMD=json.dumps(CMD), ARG_NAMES=json.dumps(ARG_NAMES)))
with gr.Row():
with gr.Column(scale=10):
with gr.Row():
loading_codes_json = gr.JSON([], visible=False)
dataset_dropdown = gr.Dropdown(label="Source Dataset", allow_custom_value=True, scale=10)
subset_dropdown = gr.Dropdown(info="Subset", allow_custom_value=True, show_label=False, visible=False)
split_dropdown = gr.Dropdown(info="Split", allow_custom_value=True, show_label=False, visible=False)
with gr.Column(min_width=60):
gr.HTML("<div style='font-size: 4em;'>β†’</div>")
with gr.Column(scale=10):
dst_dropdown = gr.Dropdown(label="Destination Dataset", allow_custom_value=True)
query_textarea = gr.Textbox(label="SQL Query", lines=2, max_lines=300, placeholder="SELECT * FROM src;", value="SELECT * FROM src;")
with gr.Row():
run_button = gr.Button("Run", scale=10, variant="primary")
if DRY_RUN:
dry_run_button = gr.Button("Dry-Run")
progress_labels= gr.Label(visible=False, label="Progress")
with gr.Accordion("Details", open=False) as details_accordion:
output_markdown = gr.Markdown(label="Output logs")
run_button.click(run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown])
if DRY_RUN:
dry_run_button.click(dry_run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown])
def show_subset_dropdown(dataset: str):
if dataset and "/" not in dataset.strip().strip("/"):
return []
resp = requests.get(f"https://datasets-server.huggingface.co/compatible-libraries?dataset={dataset}", timeout=3).json()
loading_codes = ([lib["loading_codes"] for lib in resp.get("libraries", []) if lib["function"] in READ_FUNCTIONS] or [[]])[0] or []
subsets = [loading_code["config_name"] for loading_code in loading_codes]
subset = (subsets or [""])[0]
return dict(choices=subsets, value=subset, visible=len(subsets) > 1, key=hash(str(loading_codes))), loading_codes
def show_split_dropdown(subset: str, loading_codes: list[dict]):
splits = ([list(loading_code["arguments"]["splits"]) for loading_code in loading_codes if loading_code["config_name"] == subset] or [[]])[0]
split = (splits or [""])[0]
return dict(choices=splits, value=split, visible=len(splits) > 1, key=hash(str(loading_codes) + subset))
@demo.load(outputs=[dataset_dropdown, loading_codes_json, subset_dropdown, split_dropdown])
def _fetch_datasets(request: gr.Request):
dataset = "CohereForAI/Global-MMLU"
datasets = [dataset] + [ds.id for ds in HfApi().list_datasets(limit=NUM_TRENDING_DATASETS, sort="trendingScore", direction=-1) if ds.id != dataset]
subsets, loading_codes = show_subset_dropdown(dataset)
splits = show_split_dropdown(subsets["value"], loading_codes)
return {
dataset_dropdown: gr.Dropdown(choices=datasets, value=dataset),
loading_codes_json: loading_codes,
subset_dropdown: gr.Dropdown(**subsets),
split_dropdown: gr.Dropdown(**splits),
}
@dataset_dropdown.select(inputs=[dataset_dropdown], outputs=[subset_dropdown, split_dropdown])
def _show_subset_dropdown(dataset: str):
subsets, loading_codes = show_subset_dropdown(dataset)
splits = show_split_dropdown(subsets["value"], loading_codes)
return {
subset_dropdown: gr.Dropdown(**subsets),
split_dropdown: gr.Dropdown(**splits),
}
@subset_dropdown.select(inputs=[dataset_dropdown, subset_dropdown, loading_codes_json], outputs=[split_dropdown])
def _show_split_dropdown(dataset: str, subset: str, loading_codes: list[dict]):
splits = show_split_dropdown(subset, loading_codes)
return {
split_dropdown: gr.Dropdown(**splits),
}
if HELP:
with demo.route("Help", "/help"):
gr.Markdown(f"# Help\n\n```\n{HELP}\n```")
with demo.route("Jobs", "/jobs") as page:
gr.Markdown("# Jobs")
jobs_dataframe = gr.DataFrame(datatype="markdown")
@page.load(outputs=[jobs_dataframe])
def list_jobs(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None):
if oauth_token and profile:
token = oauth_token.token
username = profile.username
elif (token := get_token()):
username = HfApi().whoami(token=token)["name"]
else:
return pd.DataFrame({"Log in to see jobs": []})
resp = requests.get(
f"https://huggingface.co/api/jobs/{username}",
headers={"Authorization": f"Bearer {token}"}
)
return pd.DataFrame([
{
"id": job["metadata"]["id"],
"created_at": job["metadata"]["created_at"],
"stage": job["compute"]["status"]["stage"],
"output": f'[logs](https://huggingface.co/api/jobs/{username}/{job["metadata"]["id"]}/logs-stream)',
"command": str(job["compute"]["spec"]["extra"]["command"]),
"args": str(job["compute"]["spec"]["extra"]["args"]),
}
for job in resp.json()
if job["compute"]["spec"]["extra"]["input"]["spaceId"] == SPACE_ID
])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0")