Spaces:
Runtime error
Runtime error
""" | |
Credit to Derek Thomas, [email protected] | |
""" | |
import os | |
import logging | |
from pathlib import Path | |
from time import perf_counter | |
import gradio as gr | |
from jinja2 import Environment, FileSystemLoader | |
from backend.query_llm import generate_hf, generate_openai, hf_models, openai_models | |
from backend.semantic_search import retrieve | |
import itertools | |
from gradio_client import Client | |
def run_llama(_, msg, *__): | |
client = Client("Be-Bo/llama-3-chatbot_70b") | |
yield client.predict( | |
message=msg, | |
api_name="/chat" | |
) | |
inf_models = list(hf_models.keys()) + list(openai_models) | |
emb_models = ["bge", "minilm"] | |
splitters = ['ct', 'rct', 'nltk'] | |
chunk_sizes = ["500", "2000"] | |
sub_vectors = ["8", "16", "32"] | |
# Create all combinations of the provided arrays | |
combinations = itertools.product(emb_models, splitters, chunk_sizes, sub_vectors) | |
TOP_K = int(os.getenv("TOP_K", 4)) | |
proj_dir = Path(__file__).parent | |
# Setting up the logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Set up the template environment with the templates directory | |
env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) | |
# Load the templates directly from the environment | |
template = env.get_template('template.j2') | |
template_html = env.get_template('template_html.j2') | |
def add_text(history, text): | |
history = [] if history is None else history | |
history = history + [(text, None)] | |
return history, gr.Textbox(value="", interactive=False) | |
def has_balanced_backticks(markdown_str): | |
in_code_block = False | |
lines = markdown_str.split('\n') | |
for line in lines: | |
stripped_line = line.strip() | |
# Check if the line contains triple backticks | |
if stripped_line.startswith("```"): | |
# Toggle the in_code_block flag | |
in_code_block = not in_code_block | |
# If in_code_block is False at the end, all backticks are balanced | |
return not in_code_block | |
def bot(history, model_name, oepnai_api_key, | |
reranker_enabled,reranker_kind,num_prerank_docs, | |
num_docs, model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once): | |
query = history[-1][0] | |
if not query: | |
raise gr.Warning("Please submit a non-empty string as a prompt") | |
logger.info('Retrieving documents...') | |
# Retrieve documents relevant to query | |
document_start = perf_counter() | |
if reranker_enabled and not all_at_once: | |
documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type,reranker_kind,num_prerank_docs) | |
else: | |
documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type) | |
document_time = perf_counter() - document_start | |
logger.info(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') | |
# Create Prompt | |
prompt = template.render(documents=documents, query=query) | |
prompt_html = template_html.render(documents=documents, query=query) | |
if model_name == "llama 3": | |
generate_fn = run_llama | |
elif model_name in hf_models: | |
generate_fn = generate_hf | |
elif model_name in openai_models: | |
generate_fn = generate_openai | |
else: | |
raise gr.Error(f"Model {model_name} is not supported") | |
history[-1][1] = "" | |
if all_at_once: | |
for emb_model, doc, size, sub_vector in combinations: | |
documents_i = retrieve(query, int(num_docs), emb_model, sub_vector, size, doc) | |
prompt_i = template.render(documents=documents_i, query=query) | |
prompt_html = template_html.render(documents=documents, query=query) | |
hist_chunk = "" | |
prev_hist = history[-1][1] | |
if not has_balanced_backticks(prev_hist): | |
prev_hist += "\n```\n" | |
prev_hist += f"\n\n## model {emb_model}, splitter {doc}, size {size}, sub vector {sub_vector}\n\n" | |
for character in generate_fn(model_name, prompt_i, history[:-1], oepnai_api_key): | |
hist_chunk = character | |
history[-1][1] = prev_hist + hist_chunk | |
yield history, prompt_html | |
else: | |
for character in generate_fn(model_name, prompt, history[:-1], oepnai_api_key): | |
history[-1][1] = character | |
yield history, prompt_html | |
with gr.Blocks() as demo: | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', | |
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), | |
bubble_full_width=False, | |
show_copy_button=True, | |
show_share_button=True, | |
) | |
with gr.Row(): | |
txt = gr.Textbox( | |
scale=3, | |
show_label=False, | |
placeholder="Enter text and press enter", | |
container=False, | |
) | |
txt_btn = gr.Button(value="Submit text", scale=1) | |
with gr.Row(): | |
emb_model_kind = gr.Radio(choices=emb_models, value="bge", label="embedding model") | |
sub_vector_size = gr.Radio(choices=sub_vectors, value="32", label="sub-vector size") | |
chunk_size = gr.Radio(choices=chunk_sizes, value="2000", label="chunk size") | |
splitter_type = gr.Radio(choices=splitters, value="nltk", label="splitter") | |
all_at_once = gr.Checkbox(value=False, label="Run all at once (no reranker)") | |
with gr.Row(): | |
reranker_enabled = gr.Checkbox(value=False, label="Reranker enabled") | |
reranker_kind = gr.Radio(choices=emb_models, value="bge", label="Reranker model") | |
num_prerank_docs = gr.Slider(5, 80, label="Number of docs before reranker", step=1, value=20) | |
with gr.Row(): | |
num_docs = gr.Slider(1, 20, label="number of docs", step=1, value=4) | |
model_name = gr.Radio(choices=inf_models, value=inf_models[0], label="Chat model") | |
oepnai_api_key = gr.Textbox( | |
show_label=False, | |
placeholder="OpenAI API key", | |
container=False, | |
) | |
prompt_html = gr.HTML() | |
# Turn off interactivity while generating if you click | |
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
bot, [chatbot, model_name, oepnai_api_key, | |
reranker_enabled,reranker_kind,num_prerank_docs, | |
num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once | |
], [chatbot, prompt_html]) | |
# Turn it back on | |
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
# Turn off interactivity while generating if you hit enter | |
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
bot, [chatbot, model_name, oepnai_api_key, | |
reranker_enabled,reranker_kind,num_prerank_docs, | |
num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type | |
], [chatbot, prompt_html]) | |
# Turn it back on | |
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
demo.queue() | |
demo.launch(debug=True) | |