Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import torch.nn.functional as F | |
from torch import Tensor | |
from transformers import AutoTokenizer, AutoModel | |
from sklearn.metrics.pairwise import cosine_similarity | |
def average_pool(last_hidden_states: Tensor, | |
attention_mask: Tensor) -> Tensor: | |
last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) | |
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
df = pd.read_csv('wiki.csv') | |
data_embeddings = np.load("wiki-embeddings.npy") | |
print("loading the model...") | |
tokenizer = AutoTokenizer.from_pretrained('intfloat/multilingual-e5-large') | |
model = AutoModel.from_pretrained('intfloat/multilingual-e5-large') | |
with gr.Blocks() as demo: | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(label="simple wikipedia semantic search query", placeholder="for example, \"medieval battles\"") | |
clear = gr.ClearButton([msg, chatbot]) | |
def _search(message, chat_history): | |
batch_dict = tokenizer(["query: " + message], max_length=512, padding=True, truncation=True, return_tensors='pt') | |
outputs = model(**batch_dict) | |
input_embedding = average_pool(outputs.last_hidden_state, batch_dict['attention_mask']) | |
# normalize embeddings | |
input_embedding = F.normalize(input_embedding, p=2, dim=1) | |
input_embedding = input_embedding[0].tolist() | |
# Compute cosine similarities | |
input_embedding = np.array(input_embedding).reshape(1, -1) | |
cos_similarities = cosine_similarity(data_embeddings, input_embedding).flatten() | |
# Get top k similar points' indices | |
k = 5 # replace with your value of k | |
top_k_idx = cos_similarities.argsort()[-k:][::-1] | |
# Get corresponding 'text' for top k similar points | |
top_k_text = df['title'].iloc[top_k_idx].tolist() | |
bot_message = "\n".join(f"{i+1}. {top_k_text[i]} // {top_k_idx[i]}" for i in range(len(top_k_text))) | |
chat_history.append((message, "results:\n" + bot_message)) | |
return "", chat_history | |
def _retrieve(message, chat_history): | |
idx = int(message) | |
for _, m in chat_history[::-1]: | |
if m.startswith("results:\n"): | |
for n in m.split("\n")[1:]: | |
if str(idx) == n.split(".")[0]: | |
df_idx = int(n.split(" // ")[-1]) | |
chat_history.append((message, f"contents of {n}:\n{df[df_idx]}")) | |
return "", chat_history | |
def respond(message, chat_history): | |
try: | |
int(message) | |
return _retrieve(message, chat_history) | |
except: | |
return _search(message, chat_history) | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
demo.launch() |