|
import gradio as gr |
|
import random |
|
import time |
|
import os |
|
import requests |
|
import json |
|
from dotenv import load_dotenv |
|
import threading |
|
from queue import Queue, Empty |
|
import shutil |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
os.makedirs('static', exist_ok=True) |
|
|
|
|
|
if os.path.exists('testquestions.txt'): |
|
shutil.copy2('testquestions.txt', 'static/testquestions.txt') |
|
|
|
MAX_QUESTIONS = 10 |
|
|
|
|
|
|
|
|
|
MODELS = [ |
|
|
|
{"display_name": "Claude 3 Opus", "model_id": "anthropic/claude-3-opus-20240229"}, |
|
{"display_name": "Claude 3.5 Sonnet", "model_id": "anthropic/claude-3.5-sonnet"}, |
|
{"display_name": "Gemini Flash 2.0 ", "model_id": "google/gemini-2.0-flash-exp:free"}, |
|
{"display_name": "Mistral Large", "model_id": "mistralai/mistral-large-2411"}, |
|
|
|
{"display_name": "GPT-4o", "model_id": "openai/gpt-4o-2024-11-20"}, |
|
|
|
|
|
{"display_name": "Reasoner: O1-Mini", "model_id": "openai/o1-mini"}, |
|
{"display_name": "Reasoner: O1 Preview", "model_id": "openai/o1-preview"}, |
|
{"display_name": "Reasoner: DeepSeek R1", "model_id": "deepseek/deepseek-r1"}, |
|
{"display_name": "Reasoner: Google Gemni 2.0 Flash Thinking", "model_id": "google/gemini-2.0-flash-thinking-exp:free"} |
|
] |
|
|
|
|
|
OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY') |
|
OPENROUTER_BASE_URL = os.getenv('OPENROUTER_BASE_URL', 'https://openrouter.ai/api/v1/chat/completions') |
|
|
|
if not OPENROUTER_API_KEY: |
|
raise ValueError("Missing OPENROUTER_API_KEY. Please set it in your environment variables or .env file.") |
|
|
|
def get_response(question, model): |
|
"""Get response from OpenRouter API for the given question and model.""" |
|
headers = { |
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}", |
|
"HTTP-Referer": "${SPACE_ID}.hf.space" if os.getenv('SPACE_ID') else "http://localhost:7860", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
data = { |
|
"model": model, |
|
"messages": [ |
|
{"role": "user", "content": question} |
|
], |
|
"stream": True |
|
} |
|
|
|
try: |
|
response = requests.post( |
|
OPENROUTER_BASE_URL, |
|
headers=headers, |
|
json=data, |
|
timeout=30, |
|
stream=True |
|
) |
|
response.raise_for_status() |
|
|
|
full_response = "" |
|
for line in response.iter_lines(): |
|
if line: |
|
line = line.decode('utf-8') |
|
if line.startswith('data: '): |
|
json_str = line[6:] |
|
if json_str.strip() == '[DONE]': |
|
break |
|
try: |
|
chunk = json.loads(json_str) |
|
if chunk['choices'][0]['delta'].get('content'): |
|
content = chunk['choices'][0]['delta']['content'] |
|
full_response += content |
|
yield full_response |
|
except json.JSONDecodeError: |
|
continue |
|
|
|
return full_response |
|
|
|
except requests.exceptions.RequestException as e: |
|
return f"Error: Failed to get response from {model}: {str(e)}" |
|
|
|
def read_questions(file_obj): |
|
"""Read questions from uploaded file and return as list""" |
|
with open(file_obj.name, 'r') as file: |
|
questions = [q.strip() for q in file.readlines() if q.strip()] |
|
if len(questions) > MAX_QUESTIONS: |
|
raise gr.Error(f"Maximum {MAX_QUESTIONS} questions allowed.") |
|
return questions |
|
|
|
with gr.Blocks(title="Vibesmark Test Suite") as demo: |
|
gr.Markdown("# Vibesmark Test Suite") |
|
|
|
|
|
state = gr.State({ |
|
"questions": [], |
|
"current_index": 0, |
|
"preferences": {}, |
|
"current_model_order": {}, |
|
"test_started": False |
|
}) |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
model1_selector = gr.Dropdown( |
|
choices={model["model_id"]: model["display_name"] for model in MODELS}, |
|
label="Select First Model", |
|
value="openai/gpt-4-turbo-preview", |
|
type="value", |
|
allow_custom_value=False |
|
) |
|
with gr.Column(): |
|
model2_selector = gr.Dropdown( |
|
choices={model["model_id"]: model["display_name"] for model in MODELS}, |
|
label="Select Second Model", |
|
value="google/gemini-pro", |
|
type="value", |
|
allow_custom_value=False |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("Upload a `.txt` file with **one question per line**.") |
|
file_input = gr.File(label="Upload your questions (.txt)") |
|
with gr.Column(): |
|
gr.Markdown("Download example questions:") |
|
gr.HTML('<a href="testquestions.txt" download>Download testquestions.txt</a>') |
|
|
|
with gr.Row(): |
|
start_btn = gr.Button("Start Test", variant="primary") |
|
finish_btn = gr.Button("Finish & Show Results", variant="secondary", interactive=False) |
|
results_display = gr.Markdown("Click 'Finish & Show Results' when you're done to see the summary", visible=True) |
|
|
|
|
|
with gr.Row(visible=False) as confirm_dialog: |
|
gr.Markdown("Are you sure you want to finish the test? This will reset all progress.") |
|
with gr.Row(): |
|
confirm_btn = gr.Button("Yes, Finish Test", variant="primary") |
|
cancel_btn = gr.Button("Cancel", variant="secondary") |
|
|
|
with gr.Group(visible=False) as question_group: |
|
question_display = gr.Markdown("### Upload a file to begin") |
|
with gr.Row(): |
|
with gr.Column(): |
|
response1_display = gr.Textbox(label="Response A", interactive=False, lines=8) |
|
with gr.Column(): |
|
response2_display = gr.Textbox(label="Response B", interactive=False, lines=8) |
|
|
|
|
|
with gr.Row(): |
|
prefer_a_btn = gr.Button("Prefer Response A", interactive=False, variant="secondary") |
|
preference_display = gr.Markdown("Make your selection") |
|
prefer_b_btn = gr.Button("Prefer Response B", interactive=False, variant="secondary") |
|
|
|
|
|
gr.Row(height=30) |
|
|
|
|
|
with gr.Row(): |
|
prev_btn = gr.Button("← Previous", interactive=False) |
|
question_counter = gr.Markdown("Question 0 / 0") |
|
next_btn = gr.Button("Next →", interactive=False) |
|
|
|
def start_test(state, model_1, model_2): |
|
"""Start the test and lock model selection""" |
|
if not state["questions"]: |
|
raise gr.Error("Please upload a file first.") |
|
|
|
if model_1 == model_2: |
|
raise gr.Error("Please select different models for comparison.") |
|
|
|
new_state = state.copy() |
|
new_state["test_started"] = True |
|
current_index = state["current_index"] |
|
current_question = state["questions"][current_index] |
|
|
|
|
|
current_pref = state["preferences"].get(current_index, None) |
|
pref_display = "Make your selection" |
|
if current_pref is not None: |
|
pref_display = f"You preferred Response {current_pref}" |
|
|
|
|
|
yield [ |
|
new_state, |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(interactive=True), |
|
"", |
|
"", |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
pref_display, |
|
gr.update(visible=True) |
|
] |
|
|
|
|
|
if random.choice([True, False]): |
|
model_a, model_b = model_1, model_2 |
|
else: |
|
model_a, model_b = model_2, model_1 |
|
|
|
|
|
new_state["current_model_order"][current_index] = { |
|
"A": model_a, |
|
"B": model_b |
|
} |
|
|
|
|
|
for partial1, partial2 in get_responses_in_parallel(current_question, model_a, model_b): |
|
|
|
current_pref = new_state["preferences"].get(current_index, None) |
|
pref_display = "Make your selection" |
|
if current_pref is not None: |
|
pref_display = f"You preferred Response {current_pref}" |
|
|
|
yield [ |
|
new_state, |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(interactive=True), |
|
partial1, |
|
partial2, |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
pref_display, |
|
gr.update(visible=True) |
|
] |
|
|
|
def process_file(file, state): |
|
if file is None: |
|
raise gr.Error("Please upload a file first.") |
|
questions = read_questions(file) |
|
new_state = { |
|
"questions": questions, |
|
"current_index": 0, |
|
"preferences": {}, |
|
"current_model_order": {}, |
|
"test_started": False |
|
} |
|
|
|
|
|
return [ |
|
f"### Question 1:\n{questions[0]}", |
|
f"Question 1 / {len(questions)}", |
|
gr.update(interactive=False), |
|
gr.update(interactive=len(questions) > 1), |
|
gr.update(value=""), |
|
gr.update(value=""), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
"Make your selection", |
|
new_state, |
|
gr.update(interactive=True), |
|
gr.update(interactive=False), |
|
gr.update(visible=False) |
|
] |
|
|
|
def navigate_question(direction, state, model_1, model_2): |
|
"""Navigate to next/prev question and start fetching responses""" |
|
if not state["test_started"]: |
|
raise gr.Error("Please start the test first") |
|
|
|
questions = state["questions"] |
|
current_index = state["current_index"] |
|
|
|
if direction == "next" and current_index < len(questions) - 1: |
|
current_index += 1 |
|
elif direction == "prev" and current_index > 0: |
|
current_index -= 1 |
|
else: |
|
raise gr.Error("No more questions in that direction") |
|
|
|
new_state = state.copy() |
|
new_state["current_index"] = current_index |
|
|
|
|
|
current_pref = state["preferences"].get(current_index, None) |
|
pref_display = "Make your selection" |
|
if current_pref is not None: |
|
pref_display = f"You preferred Response {current_pref}" |
|
|
|
|
|
yield [ |
|
f"### Question {current_index + 1}:\n{questions[current_index]}", |
|
f"Question {current_index + 1} / {len(questions)}", |
|
gr.update(interactive=current_index > 0), |
|
gr.update(interactive=current_index < len(questions) - 1), |
|
"", |
|
"", |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
pref_display, |
|
new_state, |
|
gr.update(visible=True) |
|
] |
|
|
|
|
|
current_question = questions[current_index] |
|
|
|
|
|
if random.choice([True, False]): |
|
model_a, model_b = model_1, model_2 |
|
else: |
|
model_a, model_b = model_2, model_1 |
|
|
|
|
|
new_state["current_model_order"][current_index] = { |
|
"A": model_a, |
|
"B": model_b |
|
} |
|
|
|
|
|
for partial1, partial2 in get_responses_in_parallel(current_question, model_a, model_b): |
|
|
|
current_pref = new_state["preferences"].get(current_index, None) |
|
pref_display = "Make your selection" |
|
if current_pref is not None: |
|
pref_display = f"You preferred Response {current_pref}" |
|
|
|
yield [ |
|
f"### Question {current_index + 1}:\n{questions[current_index]}", |
|
f"Question {current_index + 1} / {len(questions)}", |
|
gr.update(interactive=current_index > 0), |
|
gr.update(interactive=current_index < len(questions) - 1), |
|
partial1, |
|
partial2, |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
pref_display, |
|
new_state, |
|
gr.update(visible=True) |
|
] |
|
|
|
def record_preference(choice, state): |
|
"""Record user's preference for the current question""" |
|
current_index = state["current_index"] |
|
new_state = state.copy() |
|
new_state["preferences"][current_index] = choice |
|
|
|
|
|
model_order = state["current_model_order"].get(current_index, {}) |
|
model_a = model_order.get("A", "Unknown") |
|
model_b = model_order.get("B", "Unknown") |
|
|
|
|
|
if choice == "A": |
|
preferred_model = model_a |
|
other_model = model_b |
|
else: |
|
preferred_model = model_b |
|
other_model = model_a |
|
|
|
message = f"You preferred {preferred_model} over {other_model}" |
|
|
|
return [ |
|
new_state, |
|
message |
|
] |
|
|
|
def get_responses_in_parallel(question, model1, model2): |
|
""" |
|
Spawn two threads to run get_response for each model in parallel, |
|
queuing partial responses as they arrive. Yields tuples of |
|
(partial_response_model1, partial_response_model2). |
|
""" |
|
queue1 = Queue() |
|
queue2 = Queue() |
|
|
|
def fill_queue(q, question, model): |
|
for partial_response in get_response(question, model): |
|
q.put(partial_response) |
|
q.put(None) |
|
|
|
|
|
t1 = threading.Thread(target=fill_queue, args=(queue1, question, model1)) |
|
t2 = threading.Thread(target=fill_queue, args=(queue2, question, model2)) |
|
t1.start() |
|
t2.start() |
|
|
|
|
|
partial1 = "" |
|
partial2 = "" |
|
done1 = False |
|
done2 = False |
|
|
|
|
|
while not (done1 and done2): |
|
try: |
|
item1 = queue1.get(timeout=0.1) |
|
if item1 is None: |
|
done1 = True |
|
else: |
|
partial1 = item1 |
|
except Empty: |
|
pass |
|
|
|
try: |
|
item2 = queue2.get(timeout=0.1) |
|
if item2 is None: |
|
done2 = True |
|
else: |
|
partial2 = item2 |
|
except Empty: |
|
pass |
|
|
|
yield partial1, partial2 |
|
|
|
|
|
t1.join() |
|
t2.join() |
|
|
|
def reset_interface(): |
|
"""Reset all interface elements to their initial state""" |
|
return [ |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(interactive=False), |
|
gr.update(value=""), |
|
gr.update(value=""), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
"Make your selection", |
|
gr.update(value="### Upload a file to begin"), |
|
gr.update(value="Question 0 / 0"), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
{ |
|
"questions": [], |
|
"current_index": 0, |
|
"preferences": {}, |
|
"current_model_order": {}, |
|
"test_started": False |
|
}, |
|
gr.update(visible=False) |
|
] |
|
|
|
def generate_results_summary(state): |
|
"""Generate a summary of which model was preferred for which questions""" |
|
if not state["preferences"]: |
|
return ["No preferences recorded yet."] + reset_interface() |
|
|
|
|
|
model_preferences = {} |
|
|
|
for q_idx, choice in state["preferences"].items(): |
|
|
|
model_order = state["current_model_order"].get(q_idx, {}) |
|
if not model_order: |
|
continue |
|
|
|
|
|
preferred_model = model_order["A"] if choice == "A" else model_order["B"] |
|
|
|
|
|
display_name = next((m["display_name"] for m in MODELS if m["model_id"] == preferred_model), preferred_model) |
|
|
|
if display_name not in model_preferences: |
|
model_preferences[display_name] = [] |
|
model_preferences[display_name].append(str(q_idx + 1)) |
|
|
|
|
|
summary_parts = [] |
|
for model, questions in model_preferences.items(): |
|
summary_parts.append(f"**{model}** won questions {', '.join(questions)}") |
|
|
|
summary = "### Results Summary\n" + "\n\n".join(summary_parts) |
|
|
|
|
|
return [summary] + reset_interface() + [gr.update(visible=False)] |
|
|
|
def show_confirm_dialog(state): |
|
"""Show confirmation dialog if test has started""" |
|
if not state["test_started"] or not state["questions"]: |
|
return [ |
|
gr.update(visible=False), |
|
["No test in progress to finish."] + reset_interface() + [gr.update(visible=False)] |
|
] |
|
return [ |
|
gr.update(visible=True), |
|
None |
|
] |
|
|
|
def hide_confirm_dialog(): |
|
"""Hide the confirmation dialog""" |
|
return gr.update(visible=False) |
|
|
|
|
|
file_input.change( |
|
fn=process_file, |
|
inputs=[file_input, state], |
|
outputs=[ |
|
question_display, |
|
question_counter, |
|
prev_btn, |
|
next_btn, |
|
response1_display, |
|
response2_display, |
|
prefer_a_btn, |
|
prefer_b_btn, |
|
preference_display, |
|
state, |
|
start_btn, |
|
finish_btn, |
|
question_group |
|
] |
|
) |
|
|
|
prev_btn.click( |
|
fn=navigate_question, |
|
inputs=[ |
|
gr.State("prev"), |
|
state, |
|
model1_selector, |
|
model2_selector |
|
], |
|
outputs=[ |
|
question_display, |
|
question_counter, |
|
prev_btn, |
|
next_btn, |
|
response1_display, |
|
response2_display, |
|
prefer_a_btn, |
|
prefer_b_btn, |
|
preference_display, |
|
state, |
|
question_group |
|
] |
|
) |
|
|
|
next_btn.click( |
|
fn=navigate_question, |
|
inputs=[ |
|
gr.State("next"), |
|
state, |
|
model1_selector, |
|
model2_selector |
|
], |
|
outputs=[ |
|
question_display, |
|
question_counter, |
|
prev_btn, |
|
next_btn, |
|
response1_display, |
|
response2_display, |
|
prefer_a_btn, |
|
prefer_b_btn, |
|
preference_display, |
|
state, |
|
question_group |
|
] |
|
) |
|
|
|
start_btn.click( |
|
fn=start_test, |
|
inputs=[state, model1_selector, model2_selector], |
|
outputs=[ |
|
state, |
|
model1_selector, |
|
model2_selector, |
|
start_btn, |
|
finish_btn, |
|
response1_display, |
|
response2_display, |
|
prefer_a_btn, |
|
prefer_b_btn, |
|
preference_display, |
|
question_group |
|
] |
|
) |
|
|
|
|
|
prefer_a_btn.click( |
|
fn=lambda state: record_preference("A", state), |
|
inputs=[state], |
|
outputs=[state, preference_display] |
|
) |
|
|
|
prefer_b_btn.click( |
|
fn=lambda state: record_preference("B", state), |
|
inputs=[state], |
|
outputs=[state, preference_display] |
|
) |
|
|
|
|
|
finish_btn.click( |
|
fn=show_confirm_dialog, |
|
inputs=[state], |
|
outputs=[ |
|
confirm_dialog, |
|
results_display |
|
] |
|
) |
|
|
|
|
|
cancel_btn.click( |
|
fn=hide_confirm_dialog, |
|
outputs=[confirm_dialog] |
|
) |
|
|
|
|
|
confirm_btn.click( |
|
fn=generate_results_summary, |
|
inputs=[state], |
|
outputs=[ |
|
results_display, |
|
model1_selector, |
|
model2_selector, |
|
start_btn, |
|
finish_btn, |
|
response1_display, |
|
response2_display, |
|
prefer_a_btn, |
|
prefer_b_btn, |
|
preference_display, |
|
question_display, |
|
question_counter, |
|
prev_btn, |
|
next_btn, |
|
state |
|
] |
|
).then( |
|
fn=hide_confirm_dialog, |
|
outputs=[confirm_dialog] |
|
) |
|
|
|
|
|
gr.Markdown("<p style='color: #666; font-size: 0.8em; text-align: center; margin-top: 2em;'>Homegrown software from the Chateau</p>") |
|
|
|
|
|
demo.queue(default_concurrency_limit=5) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|