Spaces:
Sleeping
Sleeping
import nest_asyncio | |
import gradio as gr | |
import tiktoken | |
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader | |
from llama_index.core.postprocessor import LLMRerank | |
import logging | |
import sys | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from llama_index.legacy.llms.huggingface import HuggingFaceInferenceAPI, HuggingFaceLLM | |
from llama_index.core import Settings | |
from llama_index.llms.huggingface import HuggingFaceLLM | |
import torch | |
from transformers import BitsAndBytesConfig | |
from llama_index.core.prompts import PromptTemplate | |
from llama_index.llms.openai import OpenAI | |
import os | |
import pandas as pd | |
from llama_index.core import Document | |
from llama_index.core.retrievers import VectorIndexRetriever | |
from llama_index.core import QueryBundle | |
import time | |
from huggingface_hub import login | |
from gradio import ChatMessage | |
nest_asyncio.apply() | |
hf_token = os.getenv('hf_token') | |
# Replace 'your_token_here' with your actual Hugging Face API token | |
login(token=hf_token) | |
# quantize to save memory | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_use_double_quant=True, | |
) | |
llm = HuggingFaceLLM( | |
model_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit", | |
tokenizer_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit", | |
context_window=3900, | |
max_new_tokens=2560, | |
model_kwargs={"quantization_config": quantization_config}, | |
generate_kwargs={"temperature": 0.01, "top_k": 0.95, "top_p": 0.95}, | |
device_map="cuda:0", | |
) | |
embed_model = HuggingFaceEmbedding( | |
model_name="jinaai/jina-embeddings-v3", | |
) | |
Settings.llm=llm | |
Settings.embed_model=embed_model | |
# Replace 'file_path.json' with the path to your JSON file | |
file_path = 'response_metropo_cleaned.json' | |
data = pd.read_json(file_path) | |
documents = [Document(text=row['values'],metadata={"filename": row['file_name'], "description":row['file_description']},) for index, row in data.iterrows()] | |
index = VectorStoreIndex.from_documents(documents, show_progress=True) | |
def get_retrieved_nodes( | |
query_str, vector_top_k=10, reranker_top_n=3, with_reranker=False | |
): | |
query_bundle = QueryBundle(query_str) | |
# configure retriever | |
phase_01_start = time.time() | |
retriever = VectorIndexRetriever( | |
index=index, | |
similarity_top_k=vector_top_k, | |
) | |
retrieved_nodes = retriever.retrieve(query_bundle) | |
phase_01_end = time.time() | |
print(f"Phase 01 <RETRIEVING> took : {phase_01_end-phase_01_start}") | |
phase_02_start = time.time() | |
if with_reranker: | |
# configure reranker | |
reranker = LLMRerank( | |
choice_batch_size=5, | |
top_n=reranker_top_n, | |
) | |
retrieved_nodes = reranker.postprocess_nodes( | |
retrieved_nodes, query_bundle | |
) | |
phase_02_end = time.time() | |
print(f"Phase 02 <RERANKING> took : {phase_02_end-phase_02_start}") | |
return retrieved_nodes | |
def get_all_text(new_nodes): | |
texts = [] | |
for i, node in enumerate(new_nodes, 1): | |
texts.append(f"\nDocument {i} : {node.get_text()}") | |
return ' '.join(texts) | |
# Charger le tokenizer cl100k_base | |
encoding = tiktoken.get_encoding("cl100k_base") | |
def estimate_tokens(text): | |
# Encoder le texte pour obtenir les tokens | |
tokens = encoding.encode(text) | |
return len(tokens) | |
def process_final(user_prom,history): | |
import time | |
all_process_start = time.time() | |
system_p = ''' | |
You are a conversational AI assistant tasked with helping public agents in Nice guide residents and citizens to appropriate services. Your role is to respond to user queries using only the information provided in the documents. You are not allowed to invent or infer information beyond what is given in the documents. | |
Always respond in French, ensuring that your answers are clear, concise, and grounded in the document content. Make sure to provide helpful and accurate responses based solely on the documents provided, while engaging in conversation to assist based on user questions.''' | |
new_nodes = get_retrieved_nodes( | |
user_prom, | |
vector_top_k=5, | |
reranker_top_n=3, | |
with_reranker=True, | |
) | |
get_texts = get_all_text(new_nodes) | |
print("PHASE 03 passing to LLM\n") | |
sys_p = f"<|im_start|>system \n{system_p}\n DOCUMENTS {get_texts}\n<|im_end|>" | |
prompt_f="" | |
total_tokens = estimate_tokens(prompt_f) | |
for val in reversed(history): | |
if val[0]: | |
user_p = f" <|im_start|>user \n {val[0]}\n<|im_end|>" | |
if val[1]: | |
assistant_p = f" <|im_start|>assistant \n {val[1]}\n<|im_end|>" | |
current_tokens = estimate_tokens(user_p+assistant_p) | |
# Vérifier si l'ajout de cet historique dépasse la limite | |
if total_tokens + current_tokens > 3000: | |
break # Arrêter l'ajout si on dépasse la limite | |
else: | |
# Ajouter à `prompt_f` et mettre à jour le nombre total de tokens | |
prompt_f = user_p + assistant_p + prompt_f | |
total_tokens += current_tokens | |
prompt_f=f"{sys_p} {prompt_f} <|im_start|>user \n{user_prom} \n<|im_end|><|im_start|>assistant \n" | |
phase_03_start = time.time() | |
gen =llm.stream_complete(formatted=True, prompt=prompt_f) | |
print (f"le nombre TOTAL de tokens : {total_tokens}\n") | |
print("_"*100) | |
print(prompt_f) | |
print("o"*100) | |
for response in gen: | |
yield response.text | |
description = """ | |
<p> | |
<center> | |
<img src="https://www.nicecotedazur.org/wp-content/themes/mnca/images/logo-metropole-nca.png" alt="rick" width="250"/> | |
</center> | |
</p> | |
<p style="text-align:right"> Made by KHEOPS AI</p> | |
""" | |
demo = gr.ChatInterface( | |
fn=process_final, | |
title="METROPOLE CHATBOT", | |
description=description, | |
) | |
demo.launch(share=True, debug =True) |