Spaces:
Runtime error
Runtime error
import gradio as gr | |
import random | |
import time | |
from rank_bm25 import BM25Okapi, BM25Plus | |
import re | |
import numpy as np | |
from underthesea import text_normalize | |
import pandas as pd | |
from pyvi import ViTokenizer | |
import heapq | |
import torch | |
from transformers import AutoModel, AutoTokenizer | |
from pyvi.ViTokenizer import tokenize | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sentence_transformers import CrossEncoder | |
import heapq | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from sentence_transformers import SentenceTransformer | |
from pyvi.ViTokenizer import tokenize | |
from Levenshtein import ratio as lev | |
from Levenshtein import ratio as lev | |
from openai import OpenAI | |
import re | |
import numpy as np | |
from underthesea import text_normalize | |
def chuan_hoa_unicode_go_dau(text): | |
return text_normalize(text) | |
def viet_thuong(text): | |
return text.lower() | |
def chuan_hoa_dau_cau(text): | |
text = re.sub(r'[^\s\wáàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệóòỏõọôốồổỗộơớờởỡợíìỉĩịúùủũụưứừửữựýỳỷỹỵđ_]',' ',text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def chuan_hoa_cau(doc): | |
pattern = r'(\w)([^\s\w])' | |
result1 = re.sub(pattern, r'\1 \2', doc) | |
pattern = r'([^\s\w])(\w)' | |
result2 = re.sub(pattern, r'\1 \2', result1) | |
pattern = r'\s+' | |
# Loại bỏ khoảng trắng thừa | |
result = re.sub(pattern, ' ', result2) | |
return result | |
def my_pre_processing(doc): | |
doc = chuan_hoa_unicode_go_dau(doc) | |
doc = chuan_hoa_dau_cau(doc) | |
doc = chuan_hoa_cau(doc) | |
doc = viet_thuong(doc) | |
return doc | |
def levenshtein_similarity(sentence1, sentence2): | |
return lev(sentence1, sentence2) | |
def jaccard_similarity(sentence1, sentence2): | |
# Tokenize sentences into words | |
words1 = set(sentence1.lower().split()) | |
words2 = set(sentence2.lower().split()) | |
# Calculate intersection and union of the sets | |
intersection = len(words1.intersection(words2)) | |
union = len(words1.union(words2)) | |
# Calculate Jaccard Similarity | |
jaccard_similarity = intersection / union | |
# Define min and max Jaccard similarity scores (0 and 1.0 in this case) | |
min_score = 0.0 | |
max_score = 1.0 | |
# Normalize Jaccard Similarity to range from 0 to 1.0 | |
normalized_similarity = (jaccard_similarity - min_score) / (max_score - min_score) | |
return normalized_similarity | |
def filter_similarity(sentence1, sentence2, debug = False): | |
score_leve = levenshtein_similarity(sentence1, sentence2) | |
score_jac = jaccard_similarity(sentence1, sentence2) | |
if debug: | |
print(sentence2) | |
print("Levenshtein similarity", score_leve) | |
print("Jaccard similarity", score_jac) | |
return (score_leve + score_jac) / 2 | |
def top_n_indexes(lst, n): | |
top_items = heapq.nlargest(n, enumerate(lst), key=lambda x: x[1]) | |
return [i for i, s in top_items] | |
def BM25_retrieval(query, seg_question_corpus, top_BM25): | |
query = my_pre_processing(query) | |
word_tokenized_query = ViTokenizer.tokenize(query).split(" ") | |
# xử lý ở level word với question | |
tokenized_word_question_corpus = [doc.split(" ") for doc in seg_question_corpus] | |
bm25_word_question = BM25Plus(tokenized_word_question_corpus) | |
word_score_question = bm25_word_question.get_scores(word_tokenized_query) | |
BM25_result = top_n_indexes(word_score_question, n=top_BM25) | |
return BM25_result | |
def SimCSE_retrieval(query, SimCSE_set, top_Sim): | |
from sentence_transformers import CrossEncoder | |
query = my_pre_processing(query) | |
# Sim_CSE_model_question = SentenceTransformer('VoVanPhuc/sup-SimCSE-VietNamese-phobert-base') | |
# Sim_CSE_word_ques_embeddings = torch.load('/content/drive/MyDrive/Study/Năm 3/CS336-IR/model/word_ques_embeddings.pth') | |
Sim_CSE_model_question = SimCSE_set[0] | |
Sim_CSE_word_ques_embeddings = SimCSE_set[1] | |
seg_query = ViTokenizer.tokenize(query) | |
query_vector = Sim_CSE_model_question.encode(seg_query) | |
SimCSE_word_scores = list(cosine_similarity([query_vector], Sim_CSE_word_ques_embeddings)[0]) | |
SimCSE_result = top_n_indexes(SimCSE_word_scores, n=top_Sim) | |
return SimCSE_result | |
def Para_retriveval(query, para_set, top_para): | |
query = my_pre_processing(query) | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
import torch | |
# retri_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-mpnet-base-v2') | |
# para_question_embeddings = torch.load('/content/drive/MyDrive/Study/Năm 3/CS336-IR/model/para_embeddings.pth') | |
retri_model = para_set[0] | |
para_question_embeddings = para_set[1] | |
query_embed = retri_model.encode([query], device = device) | |
para_score = cosine_similarity(query_embed, para_question_embeddings)[0] | |
Para_result = top_n_indexes(para_score, n = top_para) | |
return Para_result | |
def Rerank(query, retrieval_result, question_corpus, reranker, top_n): | |
#rerank_model_name = 'unicamp-dl/mMiniLM-L6-v2-mmarco-v2' | |
query = my_pre_processing(query) | |
#reranker = CrossEncoder(rerank_model_name) | |
scores = reranker.predict([(query, question_corpus[i]) for i in retrieval_result]) | |
id_score = list(zip(retrieval_result, scores)) | |
sorted_id_score = sorted(id_score, key=lambda x: x[1], reverse=True)[:(min(len(retrieval_result), top_n))] | |
return sorted_id_score | |
def retrieval(query, question_corpus, seg_question_corpus, models, top_n = 15, thread_hold = 0.2, rerank = True): | |
BM25_result = BM25_retrieval(query, seg_question_corpus, top_n) | |
SimCSE_result = SimCSE_retrieval(query, models['Sim_CSE'], top_n) | |
Para_result = Para_retriveval(query, models['para'], top_n) | |
retrieval_result = list(set(BM25_result + SimCSE_result + Para_result)) | |
#sents_retri = [question_corpus[i] for i in retrieval_result] | |
scores_filter = [] | |
while len(scores_filter) == 0 and thread_hold >= 0: | |
scores_filter = [] | |
for id in retrieval_result: | |
score = filter_similarity(my_pre_processing(query), question_corpus[id]) | |
if score >= thread_hold: | |
scores_filter.append((score, id)) | |
thread_hold -= 0.1 | |
scores_filter = sorted(scores_filter, key = lambda x : x[0], reverse=True) | |
sent_filter = [i[1] for i in scores_filter] | |
if rerank == False: | |
return retrieval_result | |
rerank_result = Rerank(query, sent_filter, question_corpus, models['rerank'], top_n) | |
sent_rerank = [i[0] for i in rerank_result] | |
sent_rerank.append(-1) | |
score_rerank = [i[1] for i in rerank_result] | |
score_rerank = [(i - min(score_rerank))/(max(score_rerank) - min(score_rerank)) for i in score_rerank] | |
data_rerank = {} | |
for i in sent_rerank: | |
data_rerank[i] = [] | |
for idx, id in enumerate(sent_rerank): | |
for j in range(idx + 1, len(sent_rerank)): | |
if id == -1: | |
sent1 = my_pre_processing(query) | |
else: | |
sent1 = question_corpus[id] | |
if sent_rerank[j] == -1: | |
sent2 = my_pre_processing(query) | |
else: | |
sent2 = question_corpus[sent_rerank[j]] | |
score = filter_similarity(sent1, sent2) * score_rerank[idx] | |
data_rerank[id].append(score) | |
data_rerank[sent_rerank[j]].append(score) | |
del data_rerank[-1] | |
data_rerank = {key: sum(data)/len(data) for key, data in data_rerank.items()} | |
scores_rerank = [{'corpus_id': key, 'score': score} for key, score in sorted(data_rerank.items(), key = lambda x: x[1], reverse = True)] | |
return scores_rerank | |
client = OpenAI( | |
# defaults to os.environ.get("OPENAI_API_KEY") | |
api_key="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", | |
) | |
def chat_gpt(prompt): | |
response = client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}] | |
) | |
return response.choices[0].message.content.strip() | |
if torch.cuda.is_available(): | |
device = 'cuda' | |
else: | |
device = 'cpu' | |
df = pd.read_csv('.\source\corpus.csv') | |
question_corpus = list(df['question_corpus']) | |
seg_question_corpus = list(df['seg_question_corpus']) | |
Sim_CSE_model = SentenceTransformer('VoVanPhuc/sup-SimCSE-VietNamese-phobert-base') | |
Sim_CSE_word_ques_embeddings = torch.load('.\source\word_ques_embeddings.pth') | |
para_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-mpnet-base-v2') | |
para_question_embeddings = torch.load('.\source\para_embeddings.pth') | |
rerank_model = CrossEncoder('unicamp-dl/mMiniLM-L6-v2-mmarco-v2') | |
models = {'rerank': rerank_model, 'para': [para_model, para_question_embeddings], 'Sim_CSE': [Sim_CSE_model, Sim_CSE_word_ques_embeddings]} | |
source_corpus = pd.read_csv("./source/new_tthc.csv") | |
def RAG(query): | |
answer = {'query': query} | |
retri_result = retrieval(query, question_corpus, seg_question_corpus, models, top_n = 25, rerank = True) | |
if len(retri_result) == 0: | |
answer['answer'] = "Không tìm thấy thủ tục hành chính phù hợp" | |
return answer | |
corpus_id = retri_result[0]['corpus_id'] | |
info = source_corpus.loc[corpus_id] | |
answer['tthc'] = info['PROCEDURE_NAME'] | |
prompt = f"Chỉ dựa vào thông tin ngữ cảnh tôi cung cấp để trả lời câu hỏi. Chú ý giản cách dòng hợp lý: \n Câu hỏi: {answer['query']} \n Ngữ cảnh: {info['IMPL_ORDER']}" | |
#print("RAG function Propmt", prompt) | |
answer['answer'] = chat_gpt(prompt) | |
answer['reference'] = f"https://dichvucong.gov.vn/p/home/dvc-tthc-thu-tuc-hanh-chinh-chi-tiet.html?ma_thu_tuc={info['ID']}" | |
return answer | |
#print(RAG("tôi muốn biết cách làm biển sổ xe lần lần đầu")) | |
with gr.Blocks() as demo: | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox() | |
clear = gr.ClearButton([msg, chatbot]) | |
def respond(message, chat_history): | |
answer = RAG(message) | |
bot_message = f"Tên thủ tục hành chính: {answer['tthc']}\nCâu trả lời:\n{answer['answer']}\nNguồn: {answer['reference']}" | |
chat_history.append((message, bot_message)) | |
time.sleep(2) | |
return "", chat_history | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
if __name__ == "__main__": | |
demo.launch(inline = False) | |