Spaces:
Sleeping
Sleeping
import os | |
import logging | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
from transformers import HuggingFaceHub | |
from llama_cpp_agent.providers import LlamaCppPythonProvider | |
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType | |
from llama_cpp_agent.chat_history import BasicChatHistory | |
from llama_cpp_agent.chat_history.messages import Roles | |
from llama_cpp_agent.llm_output_settings import ( | |
LlmStructuredOutputSettings, | |
LlmStructuredOutputType, | |
) | |
from llama_cpp_agent.tools import WebSearchTool | |
from llama_cpp_agent.prompt_templates import web_search_system_prompt, research_system_prompt | |
from pydantic import BaseModel, Field | |
from trafilatura import fetch_url, extract | |
import json | |
from datetime import datetime, timezone | |
from typing import List | |
llm = None | |
llm_model = None | |
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
examples = [ | |
["latest news about Yann LeCun"], | |
["Latest news site:github.blog"], | |
["Where I can find best hotel in Galapagos, Ecuador intitle:hotel"], | |
["filetype:pdf intitle:python"] | |
] | |
def get_context_by_model(model_name): | |
model_context_limits = { | |
"Mistral-7B-Instruct-v0.3": 32768, | |
} | |
return model_context_limits.get(model_name, None) | |
def get_messages_formatter_type(model_name): | |
model_name = model_name.lower() | |
if "mistral" in model_name: | |
return MessagesFormatterType.MISTRAL | |
else: | |
return MessagesFormatterType.CHATML | |
def get_model(temperature, top_p, repetition_penalty): | |
return HuggingFaceHub( | |
repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
model_kwargs={ | |
"temperature": temperature, | |
"top_p": top_p, | |
"repetition_penalty": repetition_penalty, | |
"max_length": 1000 | |
}, | |
huggingfacehub_api_token=huggingface_token | |
) | |
def get_server_time(): | |
utc_time = datetime.now(timezone.utc) | |
return utc_time.strftime("%Y-%m-%d %H:%M:%S") | |
def get_website_content_from_url(url: str) -> str: | |
try: | |
downloaded = fetch_url(url) | |
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', url=url) | |
if result: | |
result = json.loads(result) | |
return f'=========== Website Title: {result["title"]} ===========\n\n=========== Website URL: {url} ===========\n\n=========== Website Content ===========\n\n{result["raw_text"]}\n\n=========== Website Content End ===========\n\n' | |
else: | |
return "" | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |
class CitingSources(BaseModel): | |
sources: List[str] = Field( | |
..., | |
description="List of sources to cite. Should be an URL of the source. E.g. GitHub URL, Blogpost URL or Newsletter URL." | |
) | |
def write_message_to_user(): | |
return "Please write the message to the user." | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
model, | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
top_k, | |
repeat_penalty, | |
): | |
global llm | |
global llm_model | |
chat_template = get_messages_formatter_type(model) | |
if llm is None or llm_model != model: | |
llm = get_model(temperature, top_p, repeat_penalty) | |
llm_model = model | |
provider = LlamaCppPythonProvider(llm) | |
logging.info(f"Loaded chat examples: {chat_template}") | |
search_tool = WebSearchTool( | |
llm_provider=provider, | |
message_formatter_type=chat_template, | |
max_tokens_search_results=12000, | |
max_tokens_per_summary=2048, | |
) | |
web_search_agent = LlamaCppAgent( | |
provider, | |
system_prompt=web_search_system_prompt, | |
predefined_messages_formatter_type=chat_template, | |
debug_output=True, | |
) | |
answer_agent = LlamaCppAgent( | |
provider, | |
system_prompt=research_system_prompt, | |
predefined_messages_formatter_type=chat_template, | |
debug_output=True, | |
) | |
settings = provider.get_provider_default_settings() | |
settings.stream = False | |
settings.temperature = temperature | |
settings.top_k = top_k | |
settings.top_p = top_p | |
settings.max_tokens = max_tokens | |
settings.repeat_penalty = repeat_penalty | |
output_settings = LlmStructuredOutputSettings.from_functions( | |
[search_tool.get_tool()] | |
) | |
messages = BasicChatHistory() | |
for msn in history: | |
user = {"role": Roles.user, "content": msn[0]} | |
assistant = {"role": Roles.assistant, "content": msn[1]} | |
messages.add_message(user) | |
messages.add_message(assistant) | |
result = web_search_agent.get_chat_response( | |
message, | |
llm_sampling_settings=settings, | |
structured_output_settings=output_settings, | |
add_message_to_chat_history=False, | |
add_response_to_chat_history=False, | |
print_output=False, | |
) | |
outputs = "" | |
settings.stream = True | |
response_text = answer_agent.get_chat_response( | |
f"Write a detailed and complete research document that fulfills the following user request: '{message}', based on the information from the web below.\n\n" + | |
result[0]["return_value"], | |
role=Roles.tool, | |
llm_sampling_settings=settings, | |
chat_history=messages, | |
returns_streaming_generator=True, | |
print_output=False, | |
) | |
for text in response_text: | |
outputs += text | |
yield outputs | |
output_settings = LlmStructuredOutputSettings.from_pydantic_models( | |
[CitingSources], LlmStructuredOutputType.object_instance | |
) | |
citing_sources = answer_agent.get_chat_response( | |
"Cite the sources you used in your response.", | |
role=Roles.tool, | |
llm_sampling_settings=settings, | |
chat_history=messages, | |
returns_streaming_generator=False, | |
structured_output_settings=output_settings, | |
print_output=False, | |
) | |
outputs += "\n\nSources:\n" | |
outputs += "\n".join(citing_sources.sources) | |
yield outputs | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Dropdown([ | |
'Mistral-7B-Instruct-v0.3' | |
], | |
value="Mistral-7B-Instruct-v0.3", | |
label="Model" | |
), | |
gr.Textbox(value=web_search_system_prompt, label="System message"), | |
gr.Slider(minimum=1, maximum=4096, value=2048, step=1, label="Max tokens"), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.45, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p", | |
), | |
gr.Slider( | |
minimum=0, | |
maximum=100, | |
value=40, | |
step=1, | |
label="Top-k", | |
), | |
gr.Slider( | |
minimum=0.0, | |
maximum=2.0, | |
value=1.1, | |
step=0.1, | |
label="Repetition penalty", | |
), | |
], | |
theme=gr.themes.Soft( | |
primary_hue="orange", | |
secondary_hue="amber", | |
neutral_hue="gray", | |
font=[gr.themes.GoogleFont("Exo"), "ui-sans-serif", "system-ui", "sans-serif"]).set( | |
body_background_fill_dark="#0c0505", | |
block_background_fill_dark="#0c0505", | |
block_border_width="1px", | |
block_title_background_fill_dark="#1b0f0f", | |
input_background_fill_dark="#140b0b", | |
button_secondary_background_fill_dark="#140b0b", | |
border_color_accent_dark="#1b0f0f", | |
border_color_primary_dark="#1b0f0f", | |
slider_color="#ff911a", | |
button_primary_background_fill="#ff911a", | |
button_primary_background_fill_dark="#ff911a", | |
button_primary_text_color="#f9f9f9", | |
button_primary_text_color_dark="#f9f9f9" | |
), | |
examples=examples, | |
title="llama.cpp agent", | |
) | |
demo.queue().launch() |