Spaces:
Running
Running
import asyncio | |
import subprocess | |
import json | |
import concurrent.futures | |
from fastapi import FastAPI, WebSocket | |
from fastapi.responses import HTMLResponse | |
from jinja2 import Template | |
from llama_cpp import Llama | |
from contextlib import asynccontextmanager | |
import logging | |
from pathlib import Path | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
# Set up the log file and empty it | |
log_path = Path("interaction_history.log") | |
log_path.touch(exist_ok=True) | |
with log_path.open('w') as f: | |
pass # Do nothing, just empty the file | |
# Global variable to keep track of the last read position in the log file | |
last_read_position = 0 | |
# Define the models and their paths | |
models = { | |
"production": { | |
"file": "DeepSeek-R1-Distill-Llama-8B-Q4_K_L.gguf", | |
"alias": "R1Llama8BQ4L", | |
"template": "app/templates/Llama8bq4k.html" | |
}, | |
"development": { | |
"file": "DeepSeek-R1-Distill-Qwen-1.5B-Q2_K.gguf", | |
"alias": "R1Qwen1.5BQ2", | |
"template": "app/templates/Qwen5bq2k.html" | |
}, | |
} | |
model_in_use = models["production"] | |
with open(model_in_use["template"], "r") as jinja_template: | |
CHAT_TEMPLATE = jinja_template.read() | |
#with open("app/templates/default.html", "r") as jinja_template: | |
# CHAT_TEMPLATE = jinja_template.read() | |
# Define the shell execution tool | |
def execute_shell(arguments): | |
"""Execute a shell command.""" | |
try: | |
args = json.loads(arguments) | |
command = args.get("command", "") | |
if not command: | |
return json.dumps({"error": "No command provided."}) | |
process = subprocess.run( | |
command, | |
shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
return json.dumps({"stdout": process.stdout, "stderr": process.stderr}) | |
except Exception as e: | |
return json.dumps({"error": str(e)}) | |
# Define the tools available to the assistant | |
tools = { | |
"shell": { | |
"description": "Execute shell commands.", | |
"example_input": '{"command": string"..."}', | |
"example_output": '{"stdout": "...", "stderr": "..."}', | |
"function": execute_shell, | |
}, | |
} | |
tools_list = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "shell", | |
"description": "Execute shell commands.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"command": { | |
"type": "string", | |
"description": "The shell command to execute." | |
} | |
}, | |
"required": ["command"] | |
} | |
} | |
} | |
] | |
expected_tool_usage = """\n\nYou are expected to demarcate tool calls between markers like: | |
<|tool▁calls▁begin|> ... <|tool▁calls▁end|> | |
and each individual call between: | |
<|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```" | |
\nBut you should not make more than one call at a time.\n | |
For example, to download the Google homepage, you could use the `wget` command. | |
<|tool_calls_begin|> | |
<|tool_call_begin|> | |
{ | |
"tool": "shell", | |
"arguments": "{\"command\": \"wget google.com\"}" | |
} | |
<|tool_sep|> | |
<|tool_call_end|> | |
<|tool_calls_end|> | |
""" | |
text_prompt = """You are an AI model expert in computational biology performing autonomous real research. | |
Your goal is to discover something new and relevant in the field of nerve regeneration. | |
You can execute shell commands in the Docker container with the following tools:\n\n""" | |
custom_prompt = """You are an AI model expert in computational biology performing autonomous real research. | |
Your goal is to discover something new and relevant in the field of nerve regeneration. | |
You can execute shell commands using a Json: | |
```json | |
{ | |
"tool": "shell", | |
"arguments": '{\"command\": "<INSERT-YOUR-COMMAND>"}' | |
} | |
``` | |
""" | |
# Dynamically generate the system prompt based on available tools. | |
def generate_system_prompt(tools): | |
tool_descriptions = [] | |
for tool_name, tool_data in tools.items(): | |
description = tool_data.get("description", "No description available.") | |
example_input = tool_data.get("example_input", "{}") | |
example_output = tool_data.get("example_output", "{}") | |
tool_descriptions.append( | |
f"""- **{tool_name}**: | |
- Description: {description} | |
- Input: {example_input} | |
- Output: {example_output}""" | |
) | |
return ( | |
text_prompt | |
+ "\n\n".join(tool_descriptions) | |
+ expected_tool_usage | |
) | |
# Create the system prompt. | |
system_prompt = generate_system_prompt(tools) | |
with log_path.open("a") as f: | |
#f.write("System prompt:\n\n"+system_prompt+"\n\n") | |
f.write("System prompt:\n"+custom_prompt+"\n\n") | |
# Parse out any tool calls embedded in the model's output. | |
def extract_tool_calls(response_text): | |
""" | |
Parse tool calls from model output. | |
The model is expected to demarcate tool calls between markers like: | |
<|tool▁calls▁begin|> ... <|tool▁calls▁end|> | |
and each individual call between: | |
<|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```" | |
""" | |
if "<|tool▁calls▁begin|>" not in response_text: | |
return [] | |
tool_calls_part = response_text.split("<|tool▁calls▁begin|>")[1] | |
tool_calls_part = tool_calls_part.split("<|tool▁calls▁end|>")[0] | |
tool_calls = tool_calls_part.split("<|tool▁call▁begin|>") | |
parsed_tool_calls = [] | |
for tool_call in tool_calls: | |
tool_call = tool_call.strip() | |
if tool_call: | |
try: | |
tool_type, tool_name_and_args = tool_call.split("<|tool▁sep|>") | |
tool_name, tool_args = tool_name_and_args.split("\n```json\n", 1) | |
tool_args = tool_args.split("\n```")[0] | |
parsed_tool_calls.append({ | |
"type": tool_type, | |
"name": tool_name.strip(), | |
"arguments": tool_args.strip() | |
}) | |
except ValueError: | |
logging.warning("Failed to parse tool call: %s", tool_call) | |
return parsed_tool_calls | |
def process_tool_call(tool_call): | |
"""Execute the requested tool and return its output.""" | |
tool_name = tool_call["name"] | |
tool_args = tool_call["arguments"] | |
if tool_name in tools: | |
tool_function = tools[tool_name]["function"] | |
return tool_function(tool_args) | |
else: | |
return json.dumps({"error": f"Tool {tool_name} not found."}) | |
# | |
# Helper: Wrap a synchronous generator as an asynchronous generator. | |
# | |
async def async_generator_from_sync(sync_gen_func, *args, **kwargs): | |
""" | |
Runs a synchronous generator function in a thread and yields items asynchronously. | |
""" | |
loop = asyncio.get_running_loop() | |
q = asyncio.Queue() | |
def producer(): | |
try: | |
for item in sync_gen_func(*args, **kwargs): | |
loop.call_soon_threadsafe(q.put_nowait, item) | |
except Exception as e: | |
loop.call_soon_threadsafe(q.put_nowait, e) | |
finally: | |
# Signal the end of iteration with a sentinel (None) | |
loop.call_soon_threadsafe(q.put_nowait, None) | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
logging.info("Inside executor") | |
executor.submit(producer) | |
while True: | |
item = await q.get() | |
if item is None: | |
break | |
if isinstance(item, Exception): | |
raise item | |
yield item | |
# | |
# Background response generator without requiring a WebSocket. | |
# | |
async def generate_response_background(conversation): | |
logging.info(f"Starting generation with conversation: {conversation}") | |
async for token_chunk in async_generator_from_sync( | |
llm.create_chat_completion, | |
messages=conversation, | |
stream=True, | |
tools=tools_list, | |
max_tokens=2048, | |
repeat_penalty=1.1 | |
): | |
logging.debug(f"Raw token chunk: {json.dumps(token_chunk, indent=2)}") | |
yield token_chunk | |
await asyncio.sleep(0) | |
# | |
# Main research loop running continuously in the background. | |
# | |
async def run_research_forever(): | |
global log_path | |
logging.info("🚀 Autonomous computational biology research initiated!") | |
with log_path.open("a") as f: | |
f.write("🚀 Autonomous computational biology research initiated!\n") | |
conversation = [{"role": "system", "content": custom_prompt}] | |
while True: | |
full_response = "" | |
try: | |
async for token in generate_response_background(conversation): | |
# Safely extract delta | |
delta = token["choices"][0].get("delta", {}) | |
# Handle text content | |
token_text = delta.get("content", "") # Default to empty string | |
full_response += token_text | |
# Handle tool calls (critical for container environment) | |
if "tool_calls" in delta: | |
tool_calls = delta["tool_calls"] | |
# Process tool call deltas here (append to full_response or log) | |
tool_call_str = json.dumps(tool_calls) | |
full_response += f"\n🔧 Tool Call: {tool_call_str}\n" | |
# Logging remains the same | |
with open(log_path, "a") as f: | |
f.write(token_text) | |
f.flush() | |
# Check for finish reason | |
if token['choices'][0].get("finish_reason"): | |
# The presence of a finish reason (like "stop") indicates that generation is complete. | |
# Append the assistant's response to the conversation log. | |
conversation.append({"role": "assistant", "content": full_response}) | |
try: | |
tool_output = parse_tool_calls(full_response) | |
conversation.append({"role": "tool", "content": tool_output}) | |
except Exception as e: | |
logging.error(f"🛠️ Tool execution failed: {e} attempting tool execution fallback") | |
try: | |
tool_output = parse_tool_calls_fallback(full_response) | |
conversation.append({"role": "tool", "content": tool_output}) | |
except: | |
logging.error(f"🛠️ Tool execution fallback also failed: {e}") | |
conversation.append({"role": "system", "content": """Error in function calling. | |
You must work towards your goal using the proper format"""+custom_prompt}) | |
continue | |
except Exception as e: | |
logging.error(f"Autonomous research error during response generation: {e}") | |
continue | |
# Delay before the next query iteration. | |
await asyncio.sleep(1) | |
def parse_tool_calls_fallback(full_response): | |
tool_call = full_response.split("```json")[1] | |
tool_call = tool_call.split("```")[0] | |
tool_call = tool_output.strip() | |
tool_output = process_tool_call({tool_call}) | |
return tool_output | |
def parse_tool_calls(full_response): | |
# Check for tool calls in the response and process them. | |
logging.info(f"Full response: {full_response}") | |
tool_calls = extract_tool_calls(full_response) | |
logging.info(f"Tool calls: {tool_calls}") | |
for tool_call in tool_calls: | |
tool_output = process_tool_call(tool_call) | |
logging.info(f"🔧 Tool Execution: {tool_output}") | |
with log_path.open("a") as f: | |
f.write(f"🔧 Tool Execution: {tool_output}\n") | |
return tool_output | |
# Automatically start the research process when the app starts. | |
async def lifespan(app: FastAPI): | |
"""Start the background task when FastAPI starts.""" | |
logging.info("Starting run_research_forever()...") | |
await asyncio.sleep(5) # Wait for the server to load | |
asyncio.create_task(run_research_forever()) # Run in background | |
yield | |
logging.info("FastAPI shutdown: Cleaning up resources.") | |
# Initialize the FastAPI application | |
app = FastAPI(lifespan=lifespan) | |
# Load the Llama model (assumed to return a synchronous generator when stream=True) | |
llm = Llama(model_path=model_in_use["file"], n_ctx=2048) | |
async def stream(websocket: WebSocket): | |
logging.info("WebSocket connection established.") | |
global log_path, last_read_position | |
await websocket.accept() | |
# Send existing interaction history to the client. | |
try: | |
with open(log_path, "r") as log_file: | |
interaction_history = log_file.read() | |
last_read_position = log_file.tell() | |
if interaction_history: | |
await websocket.send_text(interaction_history) | |
except Exception as e: | |
logging.error(f"Error reading interaction history: {e}") | |
# Continuously send updates from the log file. | |
while True: | |
await asyncio.sleep(0.1) | |
try: | |
with open(log_path, "r") as log_file: | |
log_file.seek(last_read_position) | |
new_content = log_file.read() | |
if new_content: | |
await websocket.send_text(new_content) | |
last_read_position = log_file.tell() | |
except Exception as e: | |
logging.error(f"Error reading interaction history: {e}") | |
# Endpoint to retrieve the interaction log. | |
async def get_log(): | |
try: | |
with open("interaction_history.log", "r") as f: | |
log_content = f.read() | |
# Return the log inside a <pre> block for readability. | |
return HTMLResponse(content=f"<pre>{log_content}</pre>") | |
except Exception as e: | |
logging.error(f"Error reading log: {e}") | |
return {"error": str(e)} | |
# A simple frontend page with a link to the log. | |
async def get(): | |
try: | |
with open("app/templates/index.html", "r") as f: | |
html_content = f.read() | |
except Exception as e: | |
logging.error(f"Error loading template: {e}") | |
html_content = "<html><body><h1>Error loading template</h1></body></html>" | |
return HTMLResponse(html_content) | |
# To run the app, use a command like: | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |