aiala's picture
Upload 5 files
0712e23 verified
raw
history blame
10.9 kB
import os
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
import pandas as pd
from datetime import datetime, timedelta, timezone
import torch
from config import hugging_face_token, init_google_sheets_client, models, quantized_models, default_model_name, user_names, google_sheets_name, MAX_INTERACTIONS
import spaces
# Hack for ZeroGPU
torch.jit.script = lambda f: f
# Initialize Google Sheets client
client = init_google_sheets_client()
sheet = client.open(google_sheets_name)
stories_sheet = sheet.worksheet("Stories")
system_prompts_sheet = sheet.worksheet("System Prompts")
# Load stories from Google Sheets
def load_stories():
stories_data = stories_sheet.get_all_values()
stories = [{"title": story[0], "story": story[1]} for story in stories_data if story[0] != "Title"] # Skip header row
return stories
# Load system prompts from Google Sheets
def load_system_prompts():
system_prompts_data = system_prompts_sheet.get_all_values()
system_prompts = [prompt[0] for prompt in system_prompts_data[1:]] # Skip header row
return system_prompts
# Load available stories and system prompts
stories = load_stories()
system_prompts = load_system_prompts()
# Initialize the selected model
selected_model = default_model_name
tokenizer, model = None, None
# Initialize the data list
data = []
# Load the model and tokenizer once at the beginning
def load_model(model_name):
global tokenizer, model, selected_model
try:
# Release the memory of the previous model if exists
if model is not None:
del model
torch.cuda.empty_cache()
# Check if the model is in models or quantized_models and load accordingly
if model_name in models:
model_path = models[model_name]
elif model_name in quantized_models:
model_path = quantized_models[model_name]
else:
raise ValueError(f"Model {model_name} not found in either models or quantized_models.")
tokenizer = AutoTokenizer.from_pretrained(
model_path,
padding_side='left',
token=hugging_face_token,
trust_remote_code=True
)
# Ensure the padding token is set
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.add_special_tokens({'pad_token': tokenizer.eos_token})
model = AutoModelForCausalLM.from_pretrained(
model_path,
token=hugging_face_token,
trust_remote_code=True
)
# Only move to CUDA if it's not a quantized model
if model_name not in quantized_models:
model = model.to("cuda")
selected_model = model_name
except Exception as e:
print(f"Error loading model {model_name}: {e}")
raise e
return tokenizer, model
# Ensure the initial model is loaded
tokenizer, model = load_model(selected_model)
# Chat history
chat_history = []
# Function to handle interaction with model
@spaces.GPU
def interact(user_input, history, interaction_count, model_name):
global tokenizer, model
try:
if tokenizer is None or model is None:
raise ValueError("Tokenizer or model is not initialized.")
# Determine the device to use (either CUDA if available, or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Only move the model to the device if it's not a quantized model
if model_name not in quantized_models:
model = model.to(device)
if interaction_count >= MAX_INTERACTIONS:
user_input += ". Thank you for your questions. Our session is now over. Goodbye!"
messages = history + [{"role": "user", "content": user_input}]
# Ensure roles alternate correctly
for i in range(1, len(messages)):
if messages[i-1].get("role") == messages[i].get("role"):
raise ValueError("Conversation roles must alternate user/assistant/user/assistant/...")
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
# Move input tensor to the correct device
input_ids = tokenizer(prompt, return_tensors='pt').input_ids.to(device)
chat_history_ids = model.generate(input_ids, max_new_tokens=100, pad_token_id=tokenizer.eos_token_id, temperature=0.1)
response = tokenizer.decode(chat_history_ids[:, input_ids.shape[-1]:][0], skip_special_tokens=True)
# Update chat history with generated response
history.append({"role": "user", "content": user_input})
history.append({"role": "assistant", "content": response})
interaction_count += 1
formatted_history = [(entry["content"], None) if entry["role"] == "user" else (None, entry["content"]) for entry in history if entry["role"] in ["user", "assistant"]]
return "", formatted_history, history, interaction_count
except Exception as e:
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(f"Error during interaction: {e}")
raise gr.Error(f"An error occurred during interaction: {str(e)}")
# Function to send selected story and initial message
def send_selected_story(title, model_name, system_prompt):
global chat_history
global selected_story
global data # Ensure data is reset
data = [] # Reset data for new story
interaction_count = 1 # Reset interaction count for new story
tokenizer, model = load_model(model_name) # Load the appropriate model
selected_story = title
for story in stories:
if story["title"] == title:
system_prompt = f"""
{system_prompt}
Here is the story:
---
{story['story']}
---
"""
combined_message = system_prompt.strip()
if combined_message:
chat_history = [] # Reset chat history
chat_history.append({"role": "system", "content": combined_message})
question_prompt = "Please ask a simple question about the story to encourage interaction."
_, formatted_history, chat_history, interaction_count = interact(question_prompt, chat_history, interaction_count, model_name)
return formatted_history, chat_history, gr.update(value=[]), story["story"]
else:
print("Combined message is empty.")
else:
print("Story title does not match.")
# Function to save comment and score
def save_comment_score(chat_responses, score, comment, story_name, user_name, system_prompt):
full_chat_history = ""
# Create formatted chat history with roles
for message in chat_responses:
if message[0]: # User message
full_chat_history += f"User: {message[0]}\n"
if message[1]: # Assistant message
full_chat_history += f"Assistant: {message[1]}\n"
timestamp = datetime.now(timezone.utc) - timedelta(hours=3) # Adjust to GMT-3
timestamp_str = timestamp.strftime("%Y-%m-%d %H:%M:%S")
model_name = selected_model
# Append data to local data storage
data.append([
timestamp_str,
user_name,
model_name,
system_prompt,
story_name,
full_chat_history,
score,
comment
])
# Append data to Google Sheets
try:
user_sheet = client.open(google_sheets_name).worksheet(user_name)
except gspread.exceptions.WorksheetNotFound:
user_sheet = client.open(google_sheets_name).add_worksheet(title=user_name, rows="100", cols="20")
user_sheet.append_row([timestamp_str, user_name, model_name, system_prompt, story_name, full_chat_history, score, comment])
df = pd.DataFrame(data, columns=["Timestamp", "User Name", "Model Name", "System Prompt", "Story Name", "Chat History", "Score", "Comment"])
return df[["Chat History", "Score", "Comment"]], gr.update(value="") # Show only the required columns and clear the comment input box
# Function to load user guide from a file
def load_user_guide():
with open('user_guide.txt', 'r') as file:
return file.read()
# Combine both model dictionaries
all_models = {**models, **quantized_models}
# Create the chat interface using Gradio Blocks
with gr.Blocks() as demo:
with gr.Tabs():
with gr.TabItem("Chat"):
gr.Markdown("# Demo Chatbot V3")
gr.Markdown("## Context")
with gr.Group():
model_dropdown = gr.Dropdown(choices=list(all_models.keys()), label="Select Model", value=default_model_name)
user_dropdown = gr.Dropdown(choices=user_names, label="Select User Name")
initial_story = stories[0]["title"] if stories else None
story_dropdown = gr.Dropdown(choices=[story["title"] for story in stories], label="Select Story", value=initial_story)
system_prompt_dropdown = gr.Dropdown(choices=system_prompts, label="Select System Prompt", value=system_prompts[0])
send_story_button = gr.Button("Send Story")
gr.Markdown("## Chat")
with gr.Group():
selected_story_textbox = gr.Textbox(label="Selected Story", lines=10, interactive=False)
chatbot_output = gr.Chatbot(label="Chat History")
chatbot_input = gr.Textbox(placeholder="Type your message here...", label="User Input")
send_message_button = gr.Button("Send")
gr.Markdown("## Evaluation")
with gr.Group():
score_input = gr.Slider(minimum=0, maximum=5, step=1, label="Score")
comment_input = gr.Textbox(placeholder="Add a comment...", label="Comment")
save_button = gr.Button("Save Score and Comment")
data_table = gr.DataFrame(headers=["Chat History", "Score", "Comment"])
with gr.TabItem("User Guide"):
gr.Textbox(label="User Guide", value=load_user_guide(), lines=20)
chat_history_json = gr.JSON(value=[], visible=False)
interaction_count = gr.Number(value=0, visible=False)
send_story_button.click(fn=send_selected_story, inputs=[story_dropdown, model_dropdown, system_prompt_dropdown], outputs=[chatbot_output, chat_history_json, data_table, selected_story_textbox])
send_message_button.click(fn=interact, inputs=[chatbot_input, chat_history_json, interaction_count, model_dropdown], outputs=[chatbot_input, chatbot_output, chat_history_json, interaction_count])
save_button.click(fn=save_comment_score, inputs=[chatbot_output, score_input, comment_input, story_dropdown, user_dropdown, system_prompt_dropdown], outputs=[data_table, comment_input])
demo.launch()