File size: 6,446 Bytes
e029e22 f51bb92 9d89b34 f51bb92 fc2cb23 f51bb92 f2daaee f51bb92 9d89b34 f51bb92 8f6647c f51bb92 8f6647c fc2cb23 8f6647c fc2cb23 8f6647c fc2cb23 8f6647c f51bb92 f2daaee f51bb92 fc2cb23 e029e22 aaaac46 fc2cb23 e029e22 aaaac46 e029e22 fc2cb23 e029e22 b409192 28ba961 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from modules.config.prompts import prompts
import chainlit as cl
def get_sources(res, answer, stream=True, view_sources=False):
source_elements = []
source_dict = {} # Dictionary to store URL elements
for idx, source in enumerate(res["context"]):
source_metadata = source.metadata
url = source_metadata.get("source", "N/A")
score = source_metadata.get("score", "N/A")
page = source_metadata.get("page", 1)
lecture_tldr = source_metadata.get("tldr", "N/A")
lecture_recording = source_metadata.get("lecture_recording", "N/A")
suggested_readings = source_metadata.get("suggested_readings", "N/A")
date = source_metadata.get("date", "N/A")
source_type = source_metadata.get("source_type", "N/A")
url_name = f"{url}_{page}"
if url_name not in source_dict:
source_dict[url_name] = {
"text": source.page_content,
"url": url,
"score": score,
"page": page,
"lecture_tldr": lecture_tldr,
"lecture_recording": lecture_recording,
"suggested_readings": suggested_readings,
"date": date,
"source_type": source_type,
}
else:
source_dict[url_name]["text"] += f"\n\n{source.page_content}"
full_answer = "" # Not to include the answer again if streaming
if not stream: # First, display the answer if not streaming
full_answer = "**Answer:**\n"
full_answer += answer
if view_sources:
# Then, display the sources
# check if the answer has sources
if len(source_dict) == 0:
full_answer += "\n\n**No sources found.**"
return full_answer, source_elements, source_dict
else:
full_answer += "\n\n**Sources:**\n"
for idx, (url_name, source_data) in enumerate(source_dict.items()):
full_answer += f"\nSource {idx + 1} (Score: {source_data['score']}): {source_data['url']}\n"
name = f"Source {idx + 1} Text\n"
full_answer += name
source_elements.append(
cl.Text(name=name, content=source_data["text"], display="side")
)
# Add a PDF element if the source is a PDF file
if source_data["url"].lower().endswith(".pdf"):
name = f"Source {idx + 1} PDF\n"
full_answer += name
pdf_url = f"{source_data['url']}#page={source_data['page']+1}"
source_elements.append(
cl.Pdf(name=name, url=pdf_url, display="side")
)
full_answer += "\n**Metadata:**\n"
for idx, (url_name, source_data) in enumerate(source_dict.items()):
full_answer += f"\nSource {idx + 1} Metadata:\n"
source_elements.append(
cl.Text(
name=f"Source {idx + 1} Metadata",
content=f"Source: {source_data['url']}\n"
f"Page: {source_data['page']}\n"
f"Type: {source_data['source_type']}\n"
f"Date: {source_data['date']}\n"
f"TL;DR: {source_data['lecture_tldr']}\n"
f"Lecture Recording: {source_data['lecture_recording']}\n"
f"Suggested Readings: {source_data['suggested_readings']}\n",
display="side",
)
)
return full_answer, source_elements, source_dict
def get_prompt(config, prompt_type):
llm_params = config["llm_params"]
llm_loader = llm_params["llm_loader"]
use_history = llm_params["use_history"]
llm_style = llm_params["llm_style"].lower()
if prompt_type == "qa":
if llm_loader == "local_llm":
if use_history:
return prompts["tiny_llama"]["prompt_with_history"]
else:
return prompts["tiny_llama"]["prompt_no_history"]
else:
if use_history:
return prompts["openai"]["prompt_with_history"][llm_style]
else:
return prompts["openai"]["prompt_no_history"]
elif prompt_type == "rephrase":
return prompts["openai"]["rephrase_prompt"]
def get_history_chat_resume(steps, k, SYSTEM, LLM):
conversation_list = []
count = 0
for step in reversed(steps):
if step["name"] not in [SYSTEM]:
if step["type"] == "user_message":
conversation_list.append(
{"type": "user_message", "content": step["output"]}
)
elif step["type"] == "assistant_message":
if step["name"] == LLM:
conversation_list.append(
{"type": "ai_message", "content": step["output"]}
)
else:
raise ValueError("Invalid message type")
count += 1
if count >= 2 * k: # 2 * k to account for both user and assistant messages
break
conversation_list = conversation_list[::-1]
return conversation_list
def get_history_setup_llm(memory_list):
conversation_list = []
for message in memory_list:
message_dict = message.to_dict() if hasattr(message, "to_dict") else message
# Check if the type attribute is present as a key or attribute
message_type = (
message_dict.get("type", None)
if isinstance(message_dict, dict)
else getattr(message, "type", None)
)
# Check if content is present as a key or attribute
message_content = (
message_dict.get("content", None)
if isinstance(message_dict, dict)
else getattr(message, "content", None)
)
if message_type in ["ai", "ai_message"]:
conversation_list.append({"type": "ai_message", "content": message_content})
elif message_type in ["human", "user_message"]:
conversation_list.append(
{"type": "user_message", "content": message_content}
)
else:
raise ValueError("Invalid message type")
return conversation_list
def get_last_config(steps):
# TODO: Implement this function
return None
|