|
|
|
|
|
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
from App_Function_Libraries.DB_Manager import get_conversation_name, save_chat_history_to_database
|
|
from App_Function_Libraries.LLM_API_Calls import chat_with_openai, chat_with_anthropic, chat_with_cohere, \
|
|
chat_with_groq, chat_with_openrouter, chat_with_deepseek, chat_with_mistral, chat_with_huggingface, chat_with_vllm
|
|
from App_Function_Libraries.LLM_API_Calls_Local import chat_with_aphrodite, chat_with_local_llm, chat_with_ollama, \
|
|
chat_with_kobold, chat_with_llama, chat_with_oobabooga, chat_with_tabbyapi
|
|
from App_Function_Libraries.SQLite_DB import load_media_content
|
|
from App_Function_Libraries.Utils import generate_unique_filename
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chat(message, history, media_content, selected_parts, api_endpoint, api_key, prompt, temperature,
|
|
system_message=None):
|
|
try:
|
|
logging.info(f"Debug - Chat Function - Message: {message}")
|
|
logging.info(f"Debug - Chat Function - Media Content: {media_content}")
|
|
logging.info(f"Debug - Chat Function - Selected Parts: {selected_parts}")
|
|
logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
|
|
|
|
|
|
|
|
if not isinstance(selected_parts, (list, tuple)):
|
|
selected_parts = [selected_parts] if selected_parts else []
|
|
|
|
|
|
|
|
|
|
combined_content = "\n\n".join(
|
|
[f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if part in media_content])
|
|
|
|
|
|
|
|
|
|
if not history:
|
|
input_data = f"{combined_content}\n\nUser: {message}\n"
|
|
else:
|
|
input_data = f"User: {message}\n"
|
|
|
|
|
|
|
|
if system_message:
|
|
print(f"System message: {system_message}")
|
|
logging.debug(f"Debug - Chat Function - System Message: {system_message}")
|
|
temperature = float(temperature) if temperature else 0.7
|
|
temp = temperature
|
|
|
|
logging.debug("Debug - Chat Function - Temperature: {temperature}")
|
|
logging.debug(f"Debug - Chat Function - API Key: {api_key[:10]}")
|
|
logging.debug(f"Debug - Chat Function - Prompt: {prompt}")
|
|
|
|
|
|
logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
|
|
if api_endpoint.lower() == 'openai':
|
|
response = chat_with_openai(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "anthropic":
|
|
response = chat_with_anthropic(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "cohere":
|
|
response = chat_with_cohere(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "groq":
|
|
response = chat_with_groq(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "openrouter":
|
|
response = chat_with_openrouter(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "deepseek":
|
|
response = chat_with_deepseek(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "mistral":
|
|
response = chat_with_mistral(api_key, input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "llama.cpp":
|
|
response = chat_with_llama(input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "kobold":
|
|
response = chat_with_kobold(input_data, api_key, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "ooba":
|
|
response = chat_with_oobabooga(input_data, api_key, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "tabbyapi":
|
|
response = chat_with_tabbyapi(input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "vllm":
|
|
response = chat_with_vllm(input_data, prompt, system_message)
|
|
elif api_endpoint.lower() == "local-llm":
|
|
response = chat_with_local_llm(input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "huggingface":
|
|
response = chat_with_huggingface(api_key, input_data, prompt, temp)
|
|
elif api_endpoint.lower() == "ollama":
|
|
response = chat_with_ollama(input_data, prompt, temp, system_message)
|
|
elif api_endpoint.lower() == "aphrodite":
|
|
response = chat_with_aphrodite(input_data, prompt, temp, system_message)
|
|
else:
|
|
raise ValueError(f"Unsupported API endpoint: {api_endpoint}")
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in chat function: {str(e)}")
|
|
return f"An error occurred: {str(e)}"
|
|
|
|
|
|
def save_chat_history_to_db_wrapper(chatbot, conversation_id, media_content):
|
|
logging.info(f"Attempting to save chat history. Media content type: {type(media_content)}")
|
|
try:
|
|
|
|
media_id = None
|
|
media_name = None
|
|
if isinstance(media_content, dict):
|
|
logging.debug(f"Media content keys: {media_content.keys()}")
|
|
if 'content' in media_content:
|
|
try:
|
|
content = media_content['content']
|
|
if isinstance(content, str):
|
|
content_json = json.loads(content)
|
|
elif isinstance(content, dict):
|
|
content_json = content
|
|
else:
|
|
raise ValueError(f"Unexpected content type: {type(content)}")
|
|
|
|
|
|
media_id = content_json.get('webpage_url')
|
|
|
|
media_name = content_json.get('title')
|
|
|
|
logging.info(f"Extracted media_id: {media_id}, media_name: {media_name}")
|
|
except json.JSONDecodeError:
|
|
logging.error("Failed to decode JSON from media_content['content']")
|
|
except Exception as e:
|
|
logging.error(f"Error processing media_content: {str(e)}")
|
|
else:
|
|
logging.warning("'content' key not found in media_content")
|
|
else:
|
|
logging.warning(f"media_content is not a dictionary. Type: {type(media_content)}")
|
|
|
|
if media_id is None:
|
|
|
|
media_id = "unknown_media"
|
|
logging.warning(f"Unable to extract media_id from media_content. Using placeholder: {media_id}")
|
|
|
|
if media_name is None:
|
|
media_name = "Unnamed Media"
|
|
logging.warning(f"Unable to extract media_name from media_content. Using placeholder: {media_name}")
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
conversation_name = f"Chat_{media_id}_{timestamp}"
|
|
|
|
new_conversation_id = save_chat_history_to_database(chatbot, conversation_id, media_id, media_name,
|
|
conversation_name)
|
|
return new_conversation_id, f"Chat history saved successfully as {conversation_name}!"
|
|
except Exception as e:
|
|
error_message = f"Failed to save chat history: {str(e)}"
|
|
logging.error(error_message, exc_info=True)
|
|
return conversation_id, error_message
|
|
|
|
|
|
def save_chat_history(history, conversation_id, media_content):
|
|
try:
|
|
content, conversation_name = generate_chat_history_content(history, conversation_id, media_content)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_conversation_name = re.sub(r'[^a-zA-Z0-9_-]', '_', conversation_name)
|
|
base_filename = f"{safe_conversation_name}_{timestamp}.json"
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
|
|
temp_file.write(content)
|
|
temp_file_path = temp_file.name
|
|
|
|
|
|
unique_filename = generate_unique_filename(os.path.dirname(temp_file_path), base_filename)
|
|
final_path = os.path.join(os.path.dirname(temp_file_path), unique_filename)
|
|
|
|
|
|
os.rename(temp_file_path, final_path)
|
|
|
|
return final_path
|
|
except Exception as e:
|
|
logging.error(f"Error saving chat history: {str(e)}")
|
|
return None
|
|
|
|
|
|
def generate_chat_history_content(history, conversation_id, media_content):
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
conversation_name = get_conversation_name(conversation_id)
|
|
|
|
if not conversation_name:
|
|
media_name = extract_media_name(media_content)
|
|
if media_name:
|
|
conversation_name = f"{media_name}-chat"
|
|
else:
|
|
conversation_name = f"chat-{timestamp}"
|
|
|
|
chat_data = {
|
|
"conversation_id": conversation_id,
|
|
"conversation_name": conversation_name,
|
|
"timestamp": timestamp,
|
|
"history": [
|
|
{
|
|
"role": "user" if i % 2 == 0 else "bot",
|
|
"content": msg[0] if isinstance(msg, tuple) else msg
|
|
}
|
|
for i, msg in enumerate(history)
|
|
]
|
|
}
|
|
|
|
return json.dumps(chat_data, indent=2), conversation_name
|
|
|
|
|
|
def extract_media_name(media_content):
|
|
if isinstance(media_content, dict):
|
|
content = media_content.get('content', {})
|
|
if isinstance(content, str):
|
|
try:
|
|
content = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
logging.warning("Failed to parse media_content JSON string")
|
|
return None
|
|
|
|
|
|
if isinstance(content, dict):
|
|
return content.get('title') or content.get('name')
|
|
|
|
logging.warning(f"Unexpected media_content format: {type(media_content)}")
|
|
return None
|
|
|
|
|
|
def update_chat_content(selected_item, use_content, use_summary, use_prompt, item_mapping):
|
|
logging.debug(f"Debug - Update Chat Content - Selected Item: {selected_item}\n")
|
|
logging.debug(f"Debug - Update Chat Content - Use Content: {use_content}\n\n\n\n")
|
|
logging.debug(f"Debug - Update Chat Content - Use Summary: {use_summary}\n\n")
|
|
logging.debug(f"Debug - Update Chat Content - Use Prompt: {use_prompt}\n\n")
|
|
logging.debug(f"Debug - Update Chat Content - Item Mapping: {item_mapping}\n\n")
|
|
|
|
if selected_item and selected_item in item_mapping:
|
|
media_id = item_mapping[selected_item]
|
|
content = load_media_content(media_id)
|
|
selected_parts = []
|
|
if use_content and "content" in content:
|
|
selected_parts.append("content")
|
|
if use_summary and "summary" in content:
|
|
selected_parts.append("summary")
|
|
if use_prompt and "prompt" in content:
|
|
selected_parts.append("prompt")
|
|
|
|
|
|
if isinstance(content, dict):
|
|
print(f"Debug - Update Chat Content - Content keys: {list(content.keys())}")
|
|
for key, value in content.items():
|
|
print(f"Debug - Update Chat Content - {key} (first 500 char): {str(value)[:500]}\n\n\n\n")
|
|
else:
|
|
print(f"Debug - Update Chat Content - Content(first 500 char): {str(content)[:500]}\n\n\n\n")
|
|
|
|
print(f"Debug - Update Chat Content - Selected Parts: {selected_parts}")
|
|
return content, selected_parts
|
|
else:
|
|
print(f"Debug - Update Chat Content - No item selected or item not in mapping")
|
|
return {}, []
|
|
|
|
|
|
|
|
|
|
|