DefaultAgent / app /test.py
VirtualLab's picture
First agent
6ac5190
import os
import asyncio
import subprocess
import json
import concurrent.futures
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from jinja2 import Template
from llama_cpp import Llama
from contextlib import asynccontextmanager
import logging
from pathlib import Path
# Set up logging
logging.basicConfig(level=logging.INFO)
# Set up the log file and ensure it exists
log_path = Path("interaction_history.log")
log_path.touch(exist_ok=True)
# Global variable to keep track of the last read position in the log file
last_read_position = 0
# Define the models and their paths
models = {
"production": {
"file": "DeepSeek-R1-Distill-Llama-8B-Q4_K_L.gguf",
"alias": "R1Llama8BQ4L",
"template": "/templates/Llama8bq4k.html"
},
"development": {
"file": "/home/ali/Projects/VirtualLabDev/Local/DeepSeek-R1-Distill-Qwen-1.5B-Q2_K.gguf",
"alias": "R1Qwen1.5BQ2",
"template": "./templates/Qwen5bq2k.html"
},
}
model_in_use = models["development"]
with open(model_in_use["template"], "r") as jinja_template:
CHAT_TEMPLATE = jinja_template.read()
with open("templates/default.html", "r") as jinja_template:
CHAT_TEMPLATE = jinja_template.read()
# Define the shell execution tool
def execute_shell(arguments):
"""Execute a shell command."""
try:
args = json.loads(arguments)
command = args.get("command", "")
if not command:
return json.dumps({"error": "No command provided."})
process = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return json.dumps({"stdout": process.stdout, "stderr": process.stderr})
except Exception as e:
return json.dumps({"error": str(e)})
# Define the tools available to the assistant
tools = {
"shell": {
"description": "Execute shell commands.",
"example_input": '{"command": "ls -l"}',
"example_output": '{"stdout": "...", "stderr": "..."}',
"function": execute_shell,
},
}
# Dynamically generate the system prompt based on available tools.
def generate_system_prompt(tools):
tool_descriptions = []
for tool_name, tool_data in tools.items():
description = tool_data.get("description", "No description available.")
example_input = tool_data.get("example_input", "{}")
example_output = tool_data.get("example_output", "{}")
tool_descriptions.append(
f"""- **{tool_name}**:
- Description: {description}
- Input: {example_input}
- Output: {example_output}"""
)
return (
"You are an autonomous computational biology researcher with access to the following tools:\n\n"
+ "\n\n".join(tool_descriptions)
)
# Create the system prompt.
system_prompt = generate_system_prompt(tools)
# Parse out any tool calls embedded in the model's output.
def extract_tool_calls(response_text):
"""
Parse tool calls from model output.
The model is expected to demarcate tool calls between markers like:
<|tool▁calls▁begin|> ... <|tool▁calls▁end|>
and each individual call between:
<|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```"
"""
if "<|tool▁calls▁begin|>" not in response_text:
return []
tool_calls_part = response_text.split("<|tool▁calls▁begin|>")[1]
tool_calls_part = tool_calls_part.split("<|tool▁calls▁end|>")[0]
tool_calls = tool_calls_part.split("<|tool▁call▁begin|>")
parsed_tool_calls = []
for tool_call in tool_calls:
tool_call = tool_call.strip()
if tool_call:
try:
tool_type, tool_name_and_args = tool_call.split("<|tool▁sep|>")
tool_name, tool_args = tool_name_and_args.split("\n```json\n", 1)
tool_args = tool_args.split("\n```")[0]
parsed_tool_calls.append({
"type": tool_type,
"name": tool_name.strip(),
"arguments": tool_args.strip()
})
except ValueError:
logging.warning("Failed to parse tool call: %s", tool_call)
return parsed_tool_calls
def process_tool_call(tool_call):
"""Execute the requested tool and return its output."""
tool_name = tool_call["name"]
tool_args = tool_call["arguments"]
if tool_name in tools:
tool_function = tools[tool_name]["function"]
return tool_function(tool_args)
else:
return json.dumps({"error": f"Tool {tool_name} not found."})
#
# Helper: Wrap a synchronous generator as an asynchronous generator.
#
async def async_generator_from_sync(sync_gen_func, *args, **kwargs):
"""
Runs a synchronous generator function in a thread and yields items asynchronously.
"""
loop = asyncio.get_running_loop()
q = asyncio.Queue()
def producer():
try:
for item in sync_gen_func(*args, **kwargs):
loop.call_soon_threadsafe(q.put_nowait, item)
except Exception as e:
loop.call_soon_threadsafe(q.put_nowait, e)
finally:
# Signal the end of iteration with a sentinel (None)
loop.call_soon_threadsafe(q.put_nowait, None)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.submit(producer)
while True:
item = await q.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
#
# Background response generator without requiring a WebSocket.
#
async def generate_response_background(conversation):
"""Generate a model response asynchronously."""
#template = Template(CHAT_TEMPLATE)
#prompt = template.render(messages=conversation)
#logging.info(f"Prompt: {prompt}")
async for token_chunk in async_generator_from_sync(
llm.create_chat_completion,
messages=conversation,
stream=True,
max_tokens=2048
):
# Extract token from OpenAI-compatible format
token = token_chunk["choices"][0]["delta"].get("content", "")
yield token_chunk # Yield the token string directly
await asyncio.sleep(0)
#
# Main research loop running continuously in the background.
#
async def run_research_forever():
global log_path
logging.info("🚀 Autonomous computational biology research initiated!")
with log_path.open("a") as f:
f.write("🚀 Autonomous computational biology research initiated!\n")
conversation = [{"role": "system", "content": system_prompt}]
while True:
full_response = ""
try:
# Generate the model response and accumulate the full text.
async for token in generate_response_background(conversation):
token_text = token["choices"][0]["delta"].get("content", "")
full_response += token_text
# Log each token individually
with open(log_path, "a") as f:
f.write(token_text)
f.flush()
# Optionally, check if a finish reason is provided
if token['choices'][0].get("finish_reason", "") is not None:
# The presence of a finish reason (like "stop") indicates that generation is complete.
# Append the assistant's response to the conversation log.
conversation.append({"role": "assistant", "content": full_response})
try:
tool_output = parse_tool_calls(full_response)
conversation.append({"role": "tool", "content": tool_output})
except Exception as e:
logging.error(f"🛠️ Tool execution failed: {e}")
continue
except Exception as e:
logging.error(f"Autonomous research error during response generation: {e}")
continue
# Delay before the next query iteration.
await asyncio.sleep(1)
def parse_tool_calls(full_response):
# Check for tool calls in the response and process them.
logging.info(f"Full response: {full_response}")
tool_calls = extract_tool_calls(full_response)
logging.info(f"Tool calls: {tool_calls}")
for tool_call in tool_calls:
tool_output = process_tool_call(tool_call)
logging.info(f"🔧 Tool Execution: {tool_output}")
with log_path.open("a") as f:
f.write(f"🔧 Tool Execution: {tool_output}\n")
return tool_output
# Automatically start the research process when the app starts.
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start the background task when FastAPI starts."""
logging.info("Starting run_research_forever()...")
await asyncio.sleep(5) # Wait for the server to load
asyncio.create_task(run_research_forever()) # Run in background
yield
logging.info("FastAPI shutdown: Cleaning up resources.")
# Initialize the FastAPI application
app = FastAPI(lifespan=lifespan)
# Load the Llama model (assumed to return a synchronous generator when stream=True)
llm = Llama(model_path=model_in_use["file"], n_ctx=2048)
@app.websocket("/stream")
async def stream(websocket: WebSocket):
logging.info("WebSocket connection established.")
global log_path, last_read_position
await websocket.accept()
# Send existing interaction history to the client.
try:
with open(log_path, "r") as log_file:
log_file.seek(last_read_position)
interaction_history = log_file.read()
last_read_position = log_file.tell()
if interaction_history:
await websocket.send_text(interaction_history)
except Exception as e:
logging.error(f"Error reading interaction history: {e}")
# Continuously send updates from the log file.
while True:
await asyncio.sleep(0.1)
try:
with open(log_path, "r") as log_file:
log_file.seek(last_read_position)
new_content = log_file.read()
if new_content:
await websocket.send_text(new_content)
last_read_position = log_file.tell()
except Exception as e:
logging.error(f"Error reading interaction history: {e}")
# Endpoint to retrieve the interaction log.
@app.get("/log")
async def get_log():
try:
with open("interaction_history.log", "r") as f:
log_content = f.read()
# Return the log inside a <pre> block for readability.
return HTMLResponse(content=f"<pre>{log_content}</pre>")
except Exception as e:
logging.error(f"Error reading log: {e}")
return {"error": str(e)}
# A simple frontend page with a link to the log.
@app.get("/", response_class=HTMLResponse)
async def get():
try:
with open("templates/index.html", "r") as f:
html_content = f.read()
except Exception as e:
logging.error(f"Error loading template: {e}")
html_content = "<html><body><h1>Error loading template</h1></body></html>"
return HTMLResponse(html_content)
# To run the app, use a command like:
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)