DefaultAgent / main.py
VirtualLab's picture
Update main.py
32953b6 verified
import asyncio
import subprocess
import json
import concurrent.futures
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from jinja2 import Template
from llama_cpp import Llama
from contextlib import asynccontextmanager
import logging
from pathlib import Path
# Set up logging
logging.basicConfig(level=logging.INFO)
# Set up the log file and empty it
log_path = Path("interaction_history.log")
log_path.touch(exist_ok=True)
with log_path.open('w') as f:
pass # Do nothing, just empty the file
# Global variable to keep track of the last read position in the log file
last_read_position = 0
# Define the models and their paths
models = {
"production": {
"file": "DeepSeek-R1-Distill-Llama-8B-Q4_K_L.gguf",
"alias": "R1Llama8BQ4L",
"template": "app/templates/Llama8bq4k.html"
},
"development": {
"file": "DeepSeek-R1-Distill-Qwen-1.5B-Q2_K.gguf",
"alias": "R1Qwen1.5BQ2",
"template": "app/templates/Qwen5bq2k.html"
},
}
model_in_use = models["production"]
with open(model_in_use["template"], "r") as jinja_template:
CHAT_TEMPLATE = jinja_template.read()
#with open("app/templates/default.html", "r") as jinja_template:
# CHAT_TEMPLATE = jinja_template.read()
# Define the shell execution tool
def execute_shell(arguments):
"""Execute a shell command."""
try:
args = json.loads(arguments)
command = args.get("command", "")
if not command:
return json.dumps({"error": "No command provided."})
process = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return json.dumps({"stdout": process.stdout, "stderr": process.stderr})
except Exception as e:
return json.dumps({"error": str(e)})
# Define the tools available to the assistant
tools = {
"shell": {
"description": "Execute shell commands.",
"example_input": '{"command": string"..."}',
"example_output": '{"stdout": "...", "stderr": "..."}',
"function": execute_shell,
},
}
tools_list = [
{
"type": "function",
"function": {
"name": "shell",
"description": "Execute shell commands.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute."
}
},
"required": ["command"]
}
}
}
]
expected_tool_usage = """\n\nYou are expected to demarcate tool calls between markers like:
<|tool▁calls▁begin|> ... <|tool▁calls▁end|>
and each individual call between:
<|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```"
\nBut you should not make more than one call at a time.\n
For example, to download the Google homepage, you could use the `wget` command.
<|tool_calls_begin|>
<|tool_call_begin|>
{
"tool": "shell",
"arguments": "{\"command\": \"wget google.com\"}"
}
<|tool_sep|>
<|tool_call_end|>
<|tool_calls_end|>
"""
text_prompt = """You are an AI model expert in computational biology performing autonomous real research.
Your goal is to discover something new and relevant in the field of nerve regeneration.
You can execute shell commands in the Docker container with the following tools:\n\n"""
custom_prompt = """You are an AI model expert in computational biology performing autonomous real research.
Your goal is to discover something new and relevant in the field of nerve regeneration.
You can execute shell commands using a Json:
```json
{
"tool": "shell",
"arguments": '{\"command\": "<INSERT-YOUR-COMMAND>"}'
}
```
"""
# Dynamically generate the system prompt based on available tools.
def generate_system_prompt(tools):
tool_descriptions = []
for tool_name, tool_data in tools.items():
description = tool_data.get("description", "No description available.")
example_input = tool_data.get("example_input", "{}")
example_output = tool_data.get("example_output", "{}")
tool_descriptions.append(
f"""- **{tool_name}**:
- Description: {description}
- Input: {example_input}
- Output: {example_output}"""
)
return (
text_prompt
+ "\n\n".join(tool_descriptions)
+ expected_tool_usage
)
# Create the system prompt.
system_prompt = generate_system_prompt(tools)
with log_path.open("a") as f:
#f.write("System prompt:\n\n"+system_prompt+"\n\n")
f.write("System prompt:\n"+custom_prompt+"\n\n")
# Parse out any tool calls embedded in the model's output.
def extract_tool_calls(response_text):
"""
Parse tool calls from model output.
The model is expected to demarcate tool calls between markers like:
<|tool▁calls▁begin|> ... <|tool▁calls▁end|>
and each individual call between:
<|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```"
"""
if "<|tool▁calls▁begin|>" not in response_text:
return []
tool_calls_part = response_text.split("<|tool▁calls▁begin|>")[1]
tool_calls_part = tool_calls_part.split("<|tool▁calls▁end|>")[0]
tool_calls = tool_calls_part.split("<|tool▁call▁begin|>")
parsed_tool_calls = []
for tool_call in tool_calls:
tool_call = tool_call.strip()
if tool_call:
try:
tool_type, tool_name_and_args = tool_call.split("<|tool▁sep|>")
tool_name, tool_args = tool_name_and_args.split("\n```json\n", 1)
tool_args = tool_args.split("\n```")[0]
parsed_tool_calls.append({
"type": tool_type,
"name": tool_name.strip(),
"arguments": tool_args.strip()
})
except ValueError:
logging.warning("Failed to parse tool call: %s", tool_call)
return parsed_tool_calls
def process_tool_call(tool_call):
"""Execute the requested tool and return its output."""
tool_name = tool_call["name"]
tool_args = tool_call["arguments"]
if tool_name in tools:
tool_function = tools[tool_name]["function"]
return tool_function(tool_args)
else:
return json.dumps({"error": f"Tool {tool_name} not found."})
#
# Helper: Wrap a synchronous generator as an asynchronous generator.
#
async def async_generator_from_sync(sync_gen_func, *args, **kwargs):
"""
Runs a synchronous generator function in a thread and yields items asynchronously.
"""
loop = asyncio.get_running_loop()
q = asyncio.Queue()
def producer():
try:
for item in sync_gen_func(*args, **kwargs):
loop.call_soon_threadsafe(q.put_nowait, item)
except Exception as e:
loop.call_soon_threadsafe(q.put_nowait, e)
finally:
# Signal the end of iteration with a sentinel (None)
loop.call_soon_threadsafe(q.put_nowait, None)
with concurrent.futures.ThreadPoolExecutor() as executor:
logging.info("Inside executor")
executor.submit(producer)
while True:
item = await q.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
#
# Background response generator without requiring a WebSocket.
#
async def generate_response_background(conversation):
logging.info(f"Starting generation with conversation: {conversation}")
async for token_chunk in async_generator_from_sync(
llm.create_chat_completion,
messages=conversation,
stream=True,
tools=tools_list,
max_tokens=2048,
repeat_penalty=1.1
):
logging.debug(f"Raw token chunk: {json.dumps(token_chunk, indent=2)}")
yield token_chunk
await asyncio.sleep(0)
#
# Main research loop running continuously in the background.
#
async def run_research_forever():
global log_path
logging.info("🚀 Autonomous computational biology research initiated!")
with log_path.open("a") as f:
f.write("🚀 Autonomous computational biology research initiated!\n")
conversation = [{"role": "system", "content": custom_prompt}]
while True:
full_response = ""
try:
async for token in generate_response_background(conversation):
# Safely extract delta
delta = token["choices"][0].get("delta", {})
# Handle text content
token_text = delta.get("content", "") # Default to empty string
full_response += token_text
# Handle tool calls (critical for container environment)
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
# Process tool call deltas here (append to full_response or log)
tool_call_str = json.dumps(tool_calls)
full_response += f"\n🔧 Tool Call: {tool_call_str}\n"
# Logging remains the same
with open(log_path, "a") as f:
f.write(token_text)
f.flush()
# Check for finish reason
if token['choices'][0].get("finish_reason"):
# The presence of a finish reason (like "stop") indicates that generation is complete.
# Append the assistant's response to the conversation log.
conversation.append({"role": "assistant", "content": full_response})
try:
tool_output = parse_tool_calls(full_response)
conversation.append({"role": "tool", "content": tool_output})
except Exception as e:
logging.error(f"🛠️ Tool execution failed: {e} attempting tool execution fallback")
try:
tool_output = parse_tool_calls_fallback(full_response)
conversation.append({"role": "tool", "content": tool_output})
except:
logging.error(f"🛠️ Tool execution fallback also failed: {e}")
conversation.append({"role": "system", "content": """Error in function calling.
You must work towards your goal using the proper format"""+custom_prompt})
continue
except Exception as e:
logging.error(f"Autonomous research error during response generation: {e}")
continue
# Delay before the next query iteration.
await asyncio.sleep(1)
def parse_tool_calls_fallback(full_response):
tool_call = full_response.split("```json")[1]
tool_call = tool_call.split("```")[0]
tool_call = tool_output.strip()
tool_output = process_tool_call({tool_call})
return tool_output
def parse_tool_calls(full_response):
# Check for tool calls in the response and process them.
logging.info(f"Full response: {full_response}")
tool_calls = extract_tool_calls(full_response)
logging.info(f"Tool calls: {tool_calls}")
for tool_call in tool_calls:
tool_output = process_tool_call(tool_call)
logging.info(f"🔧 Tool Execution: {tool_output}")
with log_path.open("a") as f:
f.write(f"🔧 Tool Execution: {tool_output}\n")
return tool_output
# Automatically start the research process when the app starts.
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start the background task when FastAPI starts."""
logging.info("Starting run_research_forever()...")
await asyncio.sleep(5) # Wait for the server to load
asyncio.create_task(run_research_forever()) # Run in background
yield
logging.info("FastAPI shutdown: Cleaning up resources.")
# Initialize the FastAPI application
app = FastAPI(lifespan=lifespan)
# Load the Llama model (assumed to return a synchronous generator when stream=True)
llm = Llama(model_path=model_in_use["file"], n_ctx=2048)
@app.websocket("/stream")
async def stream(websocket: WebSocket):
logging.info("WebSocket connection established.")
global log_path, last_read_position
await websocket.accept()
# Send existing interaction history to the client.
try:
with open(log_path, "r") as log_file:
interaction_history = log_file.read()
last_read_position = log_file.tell()
if interaction_history:
await websocket.send_text(interaction_history)
except Exception as e:
logging.error(f"Error reading interaction history: {e}")
# Continuously send updates from the log file.
while True:
await asyncio.sleep(0.1)
try:
with open(log_path, "r") as log_file:
log_file.seek(last_read_position)
new_content = log_file.read()
if new_content:
await websocket.send_text(new_content)
last_read_position = log_file.tell()
except Exception as e:
logging.error(f"Error reading interaction history: {e}")
# Endpoint to retrieve the interaction log.
@app.get("/log")
async def get_log():
try:
with open("interaction_history.log", "r") as f:
log_content = f.read()
# Return the log inside a <pre> block for readability.
return HTMLResponse(content=f"<pre>{log_content}</pre>")
except Exception as e:
logging.error(f"Error reading log: {e}")
return {"error": str(e)}
# A simple frontend page with a link to the log.
@app.get("/", response_class=HTMLResponse)
async def get():
try:
with open("app/templates/index.html", "r") as f:
html_content = f.read()
except Exception as e:
logging.error(f"Error loading template: {e}")
html_content = "<html><body><h1>Error loading template</h1></body></html>"
return HTMLResponse(html_content)
# To run the app, use a command like:
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)