""" This file has functions to update the meilisearch index with new comments. Payload from HF webhooklooks like this: { "event": { "action": "update", "scope": "discussion.comment" }, "repo": { "type": "dataset", "name": "allenai/objaverse", "id": "63977bb96bdef8095268ded0", "private": false, "url": { "web": "https://huggingface.co/datasets/allenai/objaverse", "api": "https://huggingface.co/api/datasets/allenai/objaverse" }, "owner": { "id": "5e70f3648ce3c604d78fe132" } }, "discussion": { "id": "66f1a1092eb1ea2422555d24", "title": "PullRequest", "url": { "web": "https://huggingface.co/datasets/allenai/objaverse/discussions/63", "api": "https://huggingface.co/api/datasets/allenai/objaverse/discussions/63" }, "status": "draft", "author": { "id": "6673e848436907f83a815ab0" }, "num": 63, "isPullRequest": true, "changes": { "base": "refs/heads/main" } }, "comment": { "id": "66f1a1092eb1ea2422555d25", "author": { "id": "6673e848436907f83a815ab0" }, "hidden": true, "url": { "web": "https://huggingface.co/datasets/allenai/objaverse/discussions/63#66f1a1092eb1ea2422555d25" } }, "webhook": { "id": "66d7991f9b7da501cd100d95", "version": 3 } } """ import time import json import os from datetime import datetime, timezone import requests from dotenv import load_dotenv from huggingface_hub import HfApi from meilisearch import Client from huggingface_hub import HfApi from constants import MeilisearchIndexFields load_dotenv(".env", override=True) WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") MEILISEARCH_URL = os.getenv("MS_URL") MEILISEARCH_KEY = os.getenv("MS_ADMIN_KEY") ms_client = Client(MEILISEARCH_URL, MEILISEARCH_KEY) api = HfApi(token=os.environ["HF_WEBHOOK_TOKEN"]) async def process_webhook(request): payload = await request.body() payload = payload.decode("utf-8") print(payload) payload = json.loads(payload) secret = request.headers.get("X-Webhook-Secret") if secret != WEBHOOK_SECRET: print("Invalid secret") return {"error": "Invalid secret"}, 400 if payload["repo"]["type"] == "model": if "discussion" not in payload or payload["discussion"]["isPullRequest"] or payload["repo"]["private"]: return {"status": "skipped"}, 200 changing_status = "comment" not in payload and payload["event"]["action"] == "update" if changing_status: update_discussion_status(payload) else: add_new_comment(payload) return {"status": "success"}, 200 def user_id_to_username(user_id): api_url = f"https://huggingface.co/api/users/{user_id}/overview" try: response = requests.get(api_url) return response.json()["user"] except Exception as e: print(f"Couldn't get username for id {user_id}: {e}") return user_id def add_new_comment(payload): comment = payload["comment"].get("content", "") comment_id = payload["comment"]["id"] repo_id = payload["repo"]["name"] title = payload["discussion"]["title"] author_id = payload["comment"]["author"]["id"] author = user_id_to_username(author_id) url = payload["discussion"]["url"]["web"] updatedAt = int(datetime.now(timezone.utc).timestamp()) status = payload["discussion"]["status"] melisearch_payload = { MeilisearchIndexFields.ID.value: comment_id, MeilisearchIndexFields.TITLE.value: title, MeilisearchIndexFields.STATUS.value: status, MeilisearchIndexFields.AUTHOR.value: author, MeilisearchIndexFields.URL.value: url, MeilisearchIndexFields.REPO_ID.value: repo_id, MeilisearchIndexFields.CONTENT.value: comment, MeilisearchIndexFields.UPDATED_AT.value: updatedAt, } ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).add_documents([melisearch_payload]) def update_discussion_status(payload): # If closing and commenting at the same time, # the comment comes with status = open after the webhook that says the discussion is closed. # Adding the sleep ensures the update comes afterwards time.sleep(1) url = payload["discussion"]["url"]["web"] status = payload["discussion"]["status"] existing_results = ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).search( query="", opt_params={"filter": f"url = '{url}'"} ) if len(existing_results["hits"]) > 0: docs2update = [ {MeilisearchIndexFields.ID.value: d[MeilisearchIndexFields.ID.value], MeilisearchIndexFields.STATUS.value: status} for d in existing_results["hits"] ] update_request = ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).update_documents(docs2update) print("Update request:", update_request) def is_user(user_or_org): api_url = f"https://huggingface.co/api/users/{user_or_org}/overview" response = requests.get(api_url) return response.status_code == 200 def update_webhooks(): """ Update the old webhook every so often with trending models. """ print("Updating webhook") existing_webhooks = api.list_webhooks() webhook_url = os.environ["HF_WEBHOOK_URL"] webhook2update = [x for x in existing_webhooks if x.url == webhook_url] if len(webhook2update) > 1: print("More than one webhook found") print(webhook2update) print("updating the first one") id2update = webhook2update[0].id watch_dict = {} for ww in webhook2update[0].watched: watch_dict[ww.name] = ww.type # get trending models trending_models = api.list_models(sort="likes7d", direction=-1, limit=1000) to_add = [] for m in trending_models: org_or_user = m.id.split("/")[0] if org_or_user in watch_dict: continue if is_user(org_or_user): to_add.append({"name": m.id, "type": "user"}) else: to_add.append({"name": m.id, "type": "org"}) new_watched = webhook2update[0].watched + to_add print("There are now", len(new_watched), "items in the watched list") api.update_webhook( id=id2update, url=webhook_url, watched=new_watched, domains=["discussion"], secret=WEBHOOK_SECRET, )