import gradio as gr
import torch
import itertools
import pandas as pd
import spaces
import random
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel
from sklearn.metrics import pairwise_distances
from collections import Counter
from itertools import chain
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import math
model_name = 'philipp-zettl/t5-small-long-qa'
qa_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model_name = 'philipp-zettl/t5-small-qg'
qg_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained('google/flan-t5-small')
embedding_model = AutoModel.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2')
embedding_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2')
# Move only the student model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
qa_model = qa_model.to(device)
qg_model = qg_model.to(device)
embedding_model = embedding_model.to(device)
max_questions = 1
max_answers = 1
max_elem_value = 100
def ngrams(sequence, n):
return [tuple(sequence[i:i+n]) for i in range(len(sequence)-n+1)]
def count_ngrams(sequence, max_n):
counts = Counter()
for n in range(1, max_n + 1):
counts.update(ngrams(sequence, n))
return counts
def self_bleu(outputs):
smoothing_function = SmoothingFunction().method1
scores = []
for i in range(len(outputs)):
references = outputs[:i] + outputs[i+1:]
# Avoid calculating BLEU score for empty references
if references:
scores.append(sentence_bleu(references, outputs[i], smoothing_function=smoothing_function))
# If all references are empty, return a default value
if not scores:
return 0
return sum(scores) / len(scores)
def dist_n(outputs, n):
all_ngrams = list(chain(*[ngrams(output, n) for output in outputs]))
unique_ngrams = set(all_ngrams)
return len(unique_ngrams) / len(all_ngrams) if all_ngrams else 0
def perplexity(model, tokenizer, texts):
encodings = tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
max_length = model.config.n_positions
stride = 512
lls = []
for i in range(0, encodings.input_ids.size(1), stride):
begin_loc = max(i + stride - max_length, 0)
end_loc = i + stride
trg_len = end_loc - i
input_ids = encodings.input_ids[:, begin_loc:end_loc].to(model.device)
target_ids = input_ids.clone()
target_ids[:, :-trg_len] = -100
with torch.no_grad():
outputs = model(input_ids, labels=target_ids)
log_likelihood = outputs.loss * trg_len
lls.append(log_likelihood)
ppl = torch.exp(torch.stack(lls).sum() / end_loc)
return ppl.item()
def embedding_similarity(inputs, outputs):
global embedding_model, embedding_tokenizer, device
def embed(texts):
inputs = embedding_tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(device)
with torch.no_grad():
outputs = embedding_model(**inputs)
return outputs.last_hidden_state.mean(dim=1).cpu().numpy()
input_embeddings = embed(inputs)
output_embeddings = embed(outputs)
similarities = pairwise_distances(input_embeddings, output_embeddings, metric='cosine')
return sum(similarities) / len(similarities)
def js_divergence(p, q):
def kl_divergence(p, q):
return sum(p[i] * math.log(p[i] / q[i]) for i in range(len(p)) if p[i] != 0 and q[i] != 0)
p_norm = [float(i)/sum(p) for i in p]
q_norm = [float(i)/sum(q) for i in q]
m = [(p_norm[i] + q_norm[i]) / 2 for i in range(len(p_norm))]
return (kl_divergence(p_norm, m) + kl_divergence(q_norm, m)) / 2
def evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=85):
generated_outputs = []
for input_text in eval_data:
input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
outputs = model.generate(
input_ids,
num_beams=num_beams,
num_beam_groups=num_beam_groups,
diversity_penalty=1.0,
max_new_tokens=max_length,
)
decoded_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
generated_outputs.append(decoded_text.split())
# Self-BLEU for diversity
diversity_score = self_bleu(generated_outputs)
# Dist-1 and Dist-2 for diversity
dist1 = dist_n(generated_outputs, 1)
dist2 = dist_n(generated_outputs, 2)
# Perplexity for fluency and relevance
fluency_score = perplexity(model, tokenizer, [" ".join(output) for output in generated_outputs])
# Embedding similarity for contextual relevance
contextual_score = embedding_similarity(eval_data, [" ".join(output) for output in generated_outputs])
# Jensen-Shannon Divergence for distribution similarity
generated_ngrams = count_ngrams(list(chain(*generated_outputs)), 4)
reference_ngrams = count_ngrams(list(chain(*[tokenizer.tokenize(text) for text in eval_data])), 4)
all_ngrams = set(generated_ngrams.keys()).union(set(reference_ngrams.keys()))
p = [generated_ngrams[ngram] for ngram in all_ngrams]
q = [reference_ngrams[ngram] for ngram in all_ngrams]
jsd_score = js_divergence(p, q)
return {
"diversity_score": diversity_score,
"dist1": dist1,
"dist2": dist2,
"fluency_score": fluency_score,
"contextual_score": contextual_score,
"jsd_score": jsd_score
}
def find_best_parameters(eval_data, model, tokenizer, max_length=85):
# Parameter ranges
parameter_map = {
2: [2],
4: [2],
6: [2], # 6x3 == 4x2
8: [2], # 8x4 == 6x3 == 4x2
9: [3],
10: [2], # 10x5 == 8x4 == 6x3 == 4x2
}
# Find the best parameters
best_score = -float('inf')
best_params = None
for num_beams in parameter_map.keys():
for num_beam_groups in parameter_map[num_beams]:
if num_beam_groups > num_beams:
continue # num_beam_groups should not be greater than num_beams
scores = evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=max_length)
# Combine scores to determine the best parameters
combined_score = (scores['dist1'] + scores['dist2'] - scores['fluency_score'] + scores['contextual_score'] - scores['jsd_score']).mean()
print(f"num_beams={num_beams}, num_beam_groups={num_beam_groups}, avg combined score={combined_score}")
if combined_score > best_score:
best_score = combined_score
best_params = (num_beams, num_beam_groups)
print(f"Best parameters: num_beams={best_params[0]}, num_beam_groups={best_params[1]} with combined score={best_score}")
return best_params
def run_model(inputs, tokenizer, model, num_beams=2, num_beam_groups=2, temperature=0.5, num_return_sequences=1, max_length=85, seed=42069):
all_outputs = []
torch.manual_seed(seed)
for input_text in inputs:
model_inputs = tokenizer([input_text], max_length=512, padding=True, truncation=True)
input_ids = torch.tensor(model_inputs['input_ids']).to(device)
for sample in input_ids:
sample_outputs = []
with torch.no_grad():
sample_output = model.generate(
input_ids[:1],
max_length=max_length,
#temperature=temperature,
#do_sample=True,
num_return_sequences=num_return_sequences,
low_memory=True,
#top_p=temperature,
#num_beams=max(2, num_return_sequences),
use_cache=True,
# Contrastive search
#penalty_alpha=0.6,
#top_k=4,
# Multi-nomial sampling
#do_sample=True,
#num_beams=1,
# Beam search
#num_beams=5,
# Beam search multinomial sampling
#num_beams=5,
#do_sample=True,
# Diverse Beam search decoding
num_beams=max(2, num_return_sequences),
num_beam_groups=max(2, num_return_sequences),
diversity_penalty=temperature,
#do_sample=True,
)
for i, sample_output in enumerate(sample_output):
sample_output = sample_output.unsqueeze(0)
sample_output = tokenizer.decode(sample_output[0], skip_special_tokens=True)
sample_outputs.append(sample_output)
all_outputs.append(sample_outputs)
return all_outputs
@spaces.GPU
def gen(content, temperature_qg=0.5, temperature_qa=0.75, num_return_sequences_qg=1, num_return_sequences_qa=1, max_length=85, seed=42069, optimize_questions=False):
inputs = [
f'context: {content}'
]
question = run_model(
inputs,
tokenizer,
qg_model,
num_beams=num_return_sequences_qg,
num_beam_groups=num_return_sequences_qg,
temperature=temperature_qg,
num_return_sequences=num_return_sequences_qg,
max_length=max_length,
seed=seed
)
if optimize_questions:
q_params = find_best_parameters(
list(chain.from_iterable(question)), qg_model, tokenizer, max_length=max_length
)
question = run_model(
inputs,
tokenizer,
qg_model,
num_beams=q_params[0],
num_beam_groups=q_params[1],
temperature=temperature_qg,
num_return_sequences=num_return_sequences_qg,
max_length=max_length,
seed=seed
)
inputs = list(chain.from_iterable([
[f'question: {q} context: {content}' for q in q_set] for q_set in question
]))
answer = run_model(
inputs,
tokenizer,
qa_model,
num_beams=num_return_sequences_qa,
num_beam_groups=num_return_sequences_qa,
temperature=temperature_qa,
num_return_sequences=num_return_sequences_qa,
max_length=max_length,
seed=seed
)
questions = list(chain.from_iterable(question))
answers = list(chain.from_iterable(answer))
results = []
for idx, ans in enumerate(answers):
results.append({'question': questions[idx % num_return_sequences_qg], 'answer': ans})
return results
def variable_outputs(k, max_elems=10):
global max_elem_value
k = int(k)
return [gr.Text(visible=True)] * k + [gr.Text(visible=False)] * (max(max_elems, max_elem_value)- k)
def set_outputs(content, max_elems=10):
c = eval(content)
print('received content: ', c)
return [gr.Text(value=t, visible=True) for t in c] + [gr.Text(visible=False)] * (max(max_elems, 10) - len(c))
def create_file_download(qnas):
with open('qnas.tsv', 'w') as f:
for idx, qna in qnas.iterrows():
f.write(qna['Question'] + '\t' + qna['Answer'])
if idx < len(qnas) - 1:
f.write('\n')
return 'qnas.tsv'
with gr.Blocks() as demo:
with gr.Tab(label='Description'):
with gr.Row(equal_height=True):
with gr.Column():
gr.Markdown(
"""
# QA-Generator
A combination of fine-tuned flan-T5(-small) models chained into sequence
to generate:
a) a versatile set of questions
b) an accurate set of matching answers
according to a given piece of text content.""")
with gr.Column():
gr.Markdown(
"""
The idea is simple:
1. Add your content
2. Select the amount of questions you want to generate
3. (optional) Select the amount of answers you want to generate per goven question
4. Press generate
5. ???
6. Profit
""")
with gr.Row(equal_height=True):
gr.Markdown("""
If you're satisfied with the generated data set, you can export it as TSV
to edit or import it into your favourite tool.
""")
with gr.Row(equal_height=True):
with gr.Accordion(label='Optimization', open=False):
gr.Markdown("""
For optimization of the question generation we apply the following combined score:
$$\\text{combined} = \\text{dist1} + \\text{dist2} - \\text{fluency} + \\text{contextual} - \\text{jsd}$$
Here's a brief explanation of each component:
1. **dist1 and dist2**: These represent the diversity of the generated outputs. dist1 measures the ratio of unique unigrams to total unigrams, and dist2 measures the ratio of unique bigrams to total bigrams. **Higher values indicate more diverse outputs.**
2. **fluency**: This is the perplexity of the generated outputs, which measures how well the outputs match the language model's expectations. **Lower values indicate better fluency.**
3. **contextual**: This measures the similarity between the input and generated outputs using embedding similarity. **Higher values indicate better contextual relevance.**
4. **jsd**: This is the Jensen-Shannon Divergence between the n-gram distributions of the generated outputs and the reference data. **Lower values indicate greater similarity between distributions.**
""", latex_delimiters=[{'display': False, 'left': '$$', 'right': '$$'}])
with gr.Tab(label='QA Generator'):
with gr.Row(equal_height=True):
with gr.Group("Content"):
content = gr.Textbox(label='Content', lines=15, placeholder='Enter text here', max_lines=10_000)
with gr.Group("Settings"):
temperature_qg = gr.Slider(label='Diversity Penalty QG', value=0.2, minimum=0, maximum=1, step=0.01)
temperature_qa = gr.Slider(label='Diversity Penalty QA', value=0.5, minimum=0, maximum=1, step=0.01)
max_length = gr.Number(label='Max Length', value=85, minimum=1, step=1, maximum=512)
num_return_sequences_qg = gr.Number(label='Number Questions', value=max_questions, minimum=1, step=1, maximum=max(max_questions, max_elem_value))
num_return_sequences_qa = gr.Number(label="Number Answers", value=max_answers, minimum=1, step=1, maximum=max(max_questions, max_elem_value))
seed = gr.Number(label="seed", value=42069)
optimize_questions = gr.Checkbox(label="Optimize questions?", value=False)
with gr.Row():
gen_btn = gr.Button("Generate")
@gr.render(
inputs=[
content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa,
max_length, seed, optimize_questions
],
triggers=[gen_btn.click]
)
def render_results(content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions):
if not content.strip():
raise gr.Error('Please enter some content to generate questions and answers.')
qnas = gen(
content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa,
max_length, seed, optimize_questions
)
df = gr.Dataframe(
value=[u.values() for u in qnas],
headers=['Question', 'Answer'],
col_count=2,
wrap=True
)
pd_df = pd.DataFrame([u.values() for u in qnas], columns=['Question', 'Answer'])
download = gr.DownloadButton(label='Download (without headers)', value=create_file_download(pd_df))
content.change(lambda x: x.strip(), content)
demo.queue()
demo.launch()