ai_school_hw5 / app.py
complynx's picture
Init client each time
b342136
raw
history blame
7.23 kB
"""
Credit to Derek Thomas, [email protected]
"""
import os
import logging
from pathlib import Path
from time import perf_counter
import gradio as gr
from jinja2 import Environment, FileSystemLoader
from backend.query_llm import generate_hf, generate_openai, hf_models, openai_models
from backend.semantic_search import retrieve
import itertools
from gradio_client import Client
def run_llama(_, msg, *__):
client = Client("Be-Bo/llama-3-chatbot_70b")
yield client.predict(
message=msg,
api_name="/chat"
)
inf_models = list(hf_models.keys()) + list(openai_models)
emb_models = ["bge", "minilm"]
splitters = ['ct', 'rct', 'nltk']
chunk_sizes = ["500", "2000"]
sub_vectors = ["8", "16", "32"]
# Create all combinations of the provided arrays
combinations = itertools.product(emb_models, splitters, chunk_sizes, sub_vectors)
TOP_K = int(os.getenv("TOP_K", 4))
proj_dir = Path(__file__).parent
# Setting up the logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set up the template environment with the templates directory
env = Environment(loader=FileSystemLoader(proj_dir / 'templates'))
# Load the templates directly from the environment
template = env.get_template('template.j2')
template_html = env.get_template('template_html.j2')
def add_text(history, text):
history = [] if history is None else history
history = history + [(text, None)]
return history, gr.Textbox(value="", interactive=False)
def has_balanced_backticks(markdown_str):
in_code_block = False
lines = markdown_str.split('\n')
for line in lines:
stripped_line = line.strip()
# Check if the line contains triple backticks
if stripped_line.startswith("```"):
# Toggle the in_code_block flag
in_code_block = not in_code_block
# If in_code_block is False at the end, all backticks are balanced
return not in_code_block
def bot(history, model_name, oepnai_api_key,
reranker_enabled,reranker_kind,num_prerank_docs,
num_docs, model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once):
query = history[-1][0]
if not query:
raise gr.Warning("Please submit a non-empty string as a prompt")
logger.info('Retrieving documents...')
# Retrieve documents relevant to query
document_start = perf_counter()
if reranker_enabled and not all_at_once:
documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type,reranker_kind,num_prerank_docs)
else:
documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type)
document_time = perf_counter() - document_start
logger.info(f'Finished Retrieving documents in {round(document_time, 2)} seconds...')
# Create Prompt
prompt = template.render(documents=documents, query=query)
prompt_html = template_html.render(documents=documents, query=query)
if model_name == "llama 3":
generate_fn = run_llama
elif model_name in hf_models:
generate_fn = generate_hf
elif model_name in openai_models:
generate_fn = generate_openai
else:
raise gr.Error(f"Model {model_name} is not supported")
history[-1][1] = ""
if all_at_once:
for emb_model, doc, size, sub_vector in combinations:
documents_i = retrieve(query, int(num_docs), emb_model, sub_vector, size, doc)
prompt_i = template.render(documents=documents_i, query=query)
prompt_html = template_html.render(documents=documents, query=query)
hist_chunk = ""
prev_hist = history[-1][1]
if not has_balanced_backticks(prev_hist):
prev_hist += "\n```\n"
prev_hist += f"\n\n## model {emb_model}, splitter {doc}, size {size}, sub vector {sub_vector}\n\n"
for character in generate_fn(model_name, prompt_i, history[:-1], oepnai_api_key):
hist_chunk = character
history[-1][1] = prev_hist + hist_chunk
yield history, prompt_html
else:
for character in generate_fn(model_name, prompt, history[:-1], oepnai_api_key):
history[-1][1] = character
yield history, prompt_html
with gr.Blocks() as demo:
chatbot = gr.Chatbot(
[],
elem_id="chatbot",
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg',
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'),
bubble_full_width=False,
show_copy_button=True,
show_share_button=True,
)
with gr.Row():
txt = gr.Textbox(
scale=3,
show_label=False,
placeholder="Enter text and press enter",
container=False,
)
txt_btn = gr.Button(value="Submit text", scale=1)
with gr.Row():
emb_model_kind = gr.Radio(choices=emb_models, value="bge", label="embedding model")
sub_vector_size = gr.Radio(choices=sub_vectors, value="32", label="sub-vector size")
chunk_size = gr.Radio(choices=chunk_sizes, value="2000", label="chunk size")
splitter_type = gr.Radio(choices=splitters, value="nltk", label="splitter")
all_at_once = gr.Checkbox(value=False, label="Run all at once (no reranker)")
with gr.Row():
reranker_enabled = gr.Checkbox(value=False, label="Reranker enabled")
reranker_kind = gr.Radio(choices=emb_models, value="bge", label="Reranker model")
num_prerank_docs = gr.Slider(5, 80, label="Number of docs before reranker", step=1, value=20)
with gr.Row():
num_docs = gr.Slider(1, 20, label="number of docs", step=1, value=4)
model_name = gr.Radio(choices=inf_models, value=inf_models[0], label="Chat model")
oepnai_api_key = gr.Textbox(
show_label=False,
placeholder="OpenAI API key",
container=False,
)
prompt_html = gr.HTML()
# Turn off interactivity while generating if you click
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then(
bot, [chatbot, model_name, oepnai_api_key,
reranker_enabled,reranker_kind,num_prerank_docs,
num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once
], [chatbot, prompt_html])
# Turn it back on
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False)
# Turn off interactivity while generating if you hit enter
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then(
bot, [chatbot, model_name, oepnai_api_key,
reranker_enabled,reranker_kind,num_prerank_docs,
num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type
], [chatbot, prompt_html])
# Turn it back on
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False)
demo.queue()
demo.launch(debug=True)