import copy import json import re import requests from curl_cffi import requests as cffi_requests from tclogger import logger from constants.models import MODEL_MAP from constants.envs import PROXIES from constants.headers import HUGGINGCHAT_POST_HEADERS, HUGGINGCHAT_SETTINGS_POST_DATA from messagers.message_outputer import OpenaiStreamOutputer from messagers.message_composer import MessageComposer from messagers.token_checker import TokenChecker class HuggingchatRequester: def __init__(self, model: str): if model in MODEL_MAP.keys(): self.model = model else: self.model = "nous-mixtral-8x7b" self.model_fullname = MODEL_MAP[self.model] def get_hf_chat_id(self): request_url = "https://huggingface.co/chat/settings" request_body = copy.deepcopy(HUGGINGCHAT_SETTINGS_POST_DATA) extra_body = { "activeModel": self.model_fullname, } request_body.update(extra_body) logger.note(f"> hf-chat ID:", end=" ") res = cffi_requests.post( request_url, headers=HUGGINGCHAT_POST_HEADERS, json=request_body, proxies=PROXIES, timeout=10, impersonate="chrome", ) self.hf_chat_id = res.cookies.get("hf-chat") if self.hf_chat_id: logger.success(f"[{self.hf_chat_id}]") else: logger.warn(f"[{res.status_code}]") logger.warn(res.text) raise ValueError(f"Failed to get hf-chat ID: {res.text}") def get_conversation_id(self, system_prompt: str = ""): request_url = "https://huggingface.co/chat/conversation" request_headers = HUGGINGCHAT_POST_HEADERS extra_headers = { "Cookie": f"hf-chat={self.hf_chat_id}", } request_headers.update(extra_headers) request_body = { "model": self.model_fullname, "preprompt": system_prompt, } logger.note(f"> Conversation ID:", end=" ") res = requests.post( request_url, headers=request_headers, json=request_body, proxies=PROXIES, timeout=10, ) if res.status_code == 200: conversation_id = res.json()["conversationId"] logger.success(f"[{conversation_id}]") else: logger.warn(f"[{res.status_code}]") raise ValueError("Failed to get conversation ID!") self.conversation_id = conversation_id return conversation_id def get_last_message_id(self): request_url = f"https://huggingface.co/chat/conversation/{self.conversation_id}/__data.json?x-sveltekit-invalidated=11" request_headers = HUGGINGCHAT_POST_HEADERS extra_headers = { "Cookie": f"hf-chat={self.hf_chat_id}", } request_headers.update(extra_headers) logger.note(f"> Message ID:", end=" ") message_id = None res = requests.post( request_url, headers=request_headers, proxies=PROXIES, timeout=10, ) if res.status_code == 200: data = res.json()["nodes"][1]["data"] # find the last element which matches the format of uuid4 uuid_pattern = re.compile( r"^[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}$" ) for item in data: if type(item) == str and uuid_pattern.match(item): message_id = item logger.success(f"[{message_id}]") else: logger.warn(f"[{res.status_code}]") raise ValueError("Failed to get message ID!") return message_id def log_request(self, url, method="GET"): logger.note(f"> {method}:", end=" ") logger.mesg(f"{url}", end=" ") def log_response( self, res: requests.Response, stream=False, iter_lines=False, verbose=False ): status_code = res.status_code status_code_str = f"[{status_code}]" if status_code == 200: logger_func = logger.success else: logger_func = logger.warn logger.enter_quiet(not verbose) logger_func(status_code_str) if status_code != 200: logger_func(res.text) if stream: if not iter_lines: return for line in res.iter_lines(): line = line.decode("utf-8") line = re.sub(r"^data:\s*", "", line) line = line.strip() if line: try: data = json.loads(line, strict=False) msg_type = data.get("type") if msg_type == "status": msg_status = data.get("status") elif msg_type == "stream": content = data.get("token", "") logger_func(content, end="") elif msg_type == "finalAnswer": full_content = data.get("text") logger.success("\n[Finished]") break else: pass except Exception as e: logger.warn(e) else: logger_func(res.json()) logger.exit_quiet(not verbose) def chat_completions(self, messages: list[dict], iter_lines=False, verbose=False): composer = MessageComposer(model=self.model) system_prompt, input_prompt = composer.decompose_to_system_and_input_prompt( messages ) checker = TokenChecker(input_str=system_prompt + input_prompt, model=self.model) checker.check_token_limit() self.get_hf_chat_id() self.get_conversation_id(system_prompt=system_prompt) message_id = self.get_last_message_id() request_url = f"https://huggingface.co/chat/conversation/{self.conversation_id}" request_headers = copy.deepcopy(HUGGINGCHAT_POST_HEADERS) extra_headers = { "Content-Type": "text/event-stream", "Referer": request_url, "Cookie": f"hf-chat={self.hf_chat_id}", } request_headers.update(extra_headers) request_body = { "files": [], "id": message_id, "inputs": input_prompt, "is_continue": False, "is_retry": False, "web_search": False, } self.log_request(request_url, method="POST") res = requests.post( request_url, headers=request_headers, json=request_body, proxies=PROXIES, stream=True, ) self.log_response(res, stream=True, iter_lines=iter_lines, verbose=verbose) return res class HuggingchatStreamer: def __init__(self, model: str): if model in MODEL_MAP.keys(): self.model = model else: self.model = "nous-mixtral-8x7b" self.model_fullname = MODEL_MAP[self.model] self.message_outputer = OpenaiStreamOutputer(model=self.model) def chat_response(self, messages: list[dict], verbose=False): requester = HuggingchatRequester(model=self.model) return requester.chat_completions( messages=messages, iter_lines=False, verbose=verbose ) def chat_return_generator(self, stream_response: requests.Response, verbose=False): is_finished = False for line in stream_response.iter_lines(): line = line.decode("utf-8") line = re.sub(r"^data:\s*", "", line) line = line.strip() if not line: continue content = "" content_type = "Completions" try: data = json.loads(line, strict=False) msg_type = data.get("type") if msg_type == "status": msg_status = data.get("status") continue elif msg_type == "stream": content_type = "Completions" content = data.get("token", "") if verbose: logger.success(content, end="") elif msg_type == "finalAnswer": content_type = "Finished" content = "" full_content = data.get("text") if verbose: logger.success("\n[Finished]") is_finished = True break else: continue except Exception as e: logger.warn(e) output = self.message_outputer.output( content=content, content_type=content_type ) yield output if not is_finished: yield self.message_outputer.output(content="", content_type="Finished") def chat_return_dict(self, stream_response: requests.Response): final_output = self.message_outputer.default_data.copy() final_output["choices"] = [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": ""}, } ] final_content = "" for item in self.chat_return_generator(stream_response): try: data = json.loads(item) delta = data["choices"][0]["delta"] delta_content = delta.get("content", "") if delta_content: final_content += delta_content except Exception as e: logger.warn(e) final_output["choices"][0]["message"]["content"] = final_content.strip() return final_output if __name__ == "__main__": # model = "command-r-plus" model = "llama3-70b" # model = "zephyr-141b" streamer = HuggingchatStreamer(model=model) messages = [ { "role": "system", "content": "You are an LLM developed by CloseAI.\nYour name is Niansuh-Copilot.", }, {"role": "user", "content": "Hello, what is your role?"}, {"role": "assistant", "content": "I am an LLM."}, {"role": "user", "content": "What is your name?"}, ] streamer.chat_response(messages=messages) # HF_ENDPOINT=https://hf-mirror.com python -m networks.huggingchat_streamer