import asyncio import subprocess import json import concurrent.futures from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse from jinja2 import Template from llama_cpp import Llama from contextlib import asynccontextmanager import logging from pathlib import Path # Set up logging logging.basicConfig(level=logging.INFO) # Set up the log file and empty it log_path = Path("interaction_history.log") log_path.touch(exist_ok=True) with log_path.open('w') as f: pass # Do nothing, just empty the file # Global variable to keep track of the last read position in the log file last_read_position = 0 # Define the models and their paths models = { "production": { "file": "DeepSeek-R1-Distill-Llama-8B-Q4_K_L.gguf", "alias": "R1Llama8BQ4L", "template": "app/templates/Llama8bq4k.html" }, "development": { "file": "DeepSeek-R1-Distill-Qwen-1.5B-Q2_K.gguf", "alias": "R1Qwen1.5BQ2", "template": "app/templates/Qwen5bq2k.html" }, } model_in_use = models["production"] with open(model_in_use["template"], "r") as jinja_template: CHAT_TEMPLATE = jinja_template.read() #with open("app/templates/default.html", "r") as jinja_template: # CHAT_TEMPLATE = jinja_template.read() # Define the shell execution tool def execute_shell(arguments): """Execute a shell command.""" try: args = json.loads(arguments) command = args.get("command", "") if not command: return json.dumps({"error": "No command provided."}) process = subprocess.run( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) return json.dumps({"stdout": process.stdout, "stderr": process.stderr}) except Exception as e: return json.dumps({"error": str(e)}) # Define the tools available to the assistant tools = { "shell": { "description": "Execute shell commands.", "example_input": '{"command": string"..."}', "example_output": '{"stdout": "...", "stderr": "..."}', "function": execute_shell, }, } tools_list = [ { "type": "function", "function": { "name": "shell", "description": "Execute shell commands.", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute." } }, "required": ["command"] } } } ] expected_tool_usage = """\n\nYou are expected to demarcate tool calls between markers like: <|tool▁calls▁begin|> ... <|tool▁calls▁end|> and each individual call between: <|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```" \nBut you should not make more than one call at a time.\n For example, to download the Google homepage, you could use the `wget` command. <|tool_calls_begin|> <|tool_call_begin|> { "tool": "shell", "arguments": "{\"command\": \"wget google.com\"}" } <|tool_sep|> <|tool_call_end|> <|tool_calls_end|> """ text_prompt = """You are an AI model expert in computational biology performing autonomous real research. Your goal is to discover something new and relevant in the field of nerve regeneration. You can execute shell commands in the Docker container with the following tools:\n\n""" custom_prompt = """You are an AI model expert in computational biology performing autonomous real research. Your goal is to discover something new and relevant in the field of nerve regeneration. You can execute shell commands using a Json: ```json { "tool": "shell", "arguments": '{\"command\": ""}' } ``` """ # Dynamically generate the system prompt based on available tools. def generate_system_prompt(tools): tool_descriptions = [] for tool_name, tool_data in tools.items(): description = tool_data.get("description", "No description available.") example_input = tool_data.get("example_input", "{}") example_output = tool_data.get("example_output", "{}") tool_descriptions.append( f"""- **{tool_name}**: - Description: {description} - Input: {example_input} - Output: {example_output}""" ) return ( text_prompt + "\n\n".join(tool_descriptions) + expected_tool_usage ) # Create the system prompt. system_prompt = generate_system_prompt(tools) with log_path.open("a") as f: #f.write("System prompt:\n\n"+system_prompt+"\n\n") f.write("System prompt:\n"+custom_prompt+"\n\n") # Parse out any tool calls embedded in the model's output. def extract_tool_calls(response_text): """ Parse tool calls from model output. The model is expected to demarcate tool calls between markers like: <|tool▁calls▁begin|> ... <|tool▁calls▁end|> and each individual call between: <|tool▁call▁begin|> ... <|tool▁sep|> ... "```json" ... "```" """ if "<|tool▁calls▁begin|>" not in response_text: return [] tool_calls_part = response_text.split("<|tool▁calls▁begin|>")[1] tool_calls_part = tool_calls_part.split("<|tool▁calls▁end|>")[0] tool_calls = tool_calls_part.split("<|tool▁call▁begin|>") parsed_tool_calls = [] for tool_call in tool_calls: tool_call = tool_call.strip() if tool_call: try: tool_type, tool_name_and_args = tool_call.split("<|tool▁sep|>") tool_name, tool_args = tool_name_and_args.split("\n```json\n", 1) tool_args = tool_args.split("\n```")[0] parsed_tool_calls.append({ "type": tool_type, "name": tool_name.strip(), "arguments": tool_args.strip() }) except ValueError: logging.warning("Failed to parse tool call: %s", tool_call) return parsed_tool_calls def process_tool_call(tool_call): """Execute the requested tool and return its output.""" tool_name = tool_call["name"] tool_args = tool_call["arguments"] if tool_name in tools: tool_function = tools[tool_name]["function"] return tool_function(tool_args) else: return json.dumps({"error": f"Tool {tool_name} not found."}) # # Helper: Wrap a synchronous generator as an asynchronous generator. # async def async_generator_from_sync(sync_gen_func, *args, **kwargs): """ Runs a synchronous generator function in a thread and yields items asynchronously. """ loop = asyncio.get_running_loop() q = asyncio.Queue() def producer(): try: for item in sync_gen_func(*args, **kwargs): loop.call_soon_threadsafe(q.put_nowait, item) except Exception as e: loop.call_soon_threadsafe(q.put_nowait, e) finally: # Signal the end of iteration with a sentinel (None) loop.call_soon_threadsafe(q.put_nowait, None) with concurrent.futures.ThreadPoolExecutor() as executor: logging.info("Inside executor") executor.submit(producer) while True: item = await q.get() if item is None: break if isinstance(item, Exception): raise item yield item # # Background response generator without requiring a WebSocket. # async def generate_response_background(conversation): logging.info(f"Starting generation with conversation: {conversation}") async for token_chunk in async_generator_from_sync( llm.create_chat_completion, messages=conversation, stream=True, tools=tools_list, max_tokens=2048, repeat_penalty=1.1 ): logging.debug(f"Raw token chunk: {json.dumps(token_chunk, indent=2)}") yield token_chunk await asyncio.sleep(0) # # Main research loop running continuously in the background. # async def run_research_forever(): global log_path logging.info("🚀 Autonomous computational biology research initiated!") with log_path.open("a") as f: f.write("🚀 Autonomous computational biology research initiated!\n") conversation = [{"role": "system", "content": custom_prompt}] while True: full_response = "" try: async for token in generate_response_background(conversation): # Safely extract delta delta = token["choices"][0].get("delta", {}) # Handle text content token_text = delta.get("content", "") # Default to empty string full_response += token_text # Handle tool calls (critical for container environment) if "tool_calls" in delta: tool_calls = delta["tool_calls"] # Process tool call deltas here (append to full_response or log) tool_call_str = json.dumps(tool_calls) full_response += f"\n🔧 Tool Call: {tool_call_str}\n" # Logging remains the same with open(log_path, "a") as f: f.write(token_text) f.flush() # Check for finish reason if token['choices'][0].get("finish_reason"): # The presence of a finish reason (like "stop") indicates that generation is complete. # Append the assistant's response to the conversation log. conversation.append({"role": "assistant", "content": full_response}) try: tool_output = parse_tool_calls(full_response) conversation.append({"role": "tool", "content": tool_output}) except Exception as e: logging.error(f"🛠️ Tool execution failed: {e} attempting tool execution fallback") try: tool_output = parse_tool_calls_fallback(full_response) conversation.append({"role": "tool", "content": tool_output}) except: logging.error(f"🛠️ Tool execution fallback also failed: {e}") conversation.append({"role": "system", "content": """Error in function calling. You must work towards your goal using the proper format"""+custom_prompt}) continue except Exception as e: logging.error(f"Autonomous research error during response generation: {e}") continue # Delay before the next query iteration. await asyncio.sleep(1) def parse_tool_calls_fallback(full_response): tool_call = full_response.split("```json")[1] tool_call = tool_call.split("```")[0] tool_call = tool_output.strip() tool_output = process_tool_call({tool_call}) return tool_output def parse_tool_calls(full_response): # Check for tool calls in the response and process them. logging.info(f"Full response: {full_response}") tool_calls = extract_tool_calls(full_response) logging.info(f"Tool calls: {tool_calls}") for tool_call in tool_calls: tool_output = process_tool_call(tool_call) logging.info(f"🔧 Tool Execution: {tool_output}") with log_path.open("a") as f: f.write(f"🔧 Tool Execution: {tool_output}\n") return tool_output # Automatically start the research process when the app starts. @asynccontextmanager async def lifespan(app: FastAPI): """Start the background task when FastAPI starts.""" logging.info("Starting run_research_forever()...") await asyncio.sleep(5) # Wait for the server to load asyncio.create_task(run_research_forever()) # Run in background yield logging.info("FastAPI shutdown: Cleaning up resources.") # Initialize the FastAPI application app = FastAPI(lifespan=lifespan) # Load the Llama model (assumed to return a synchronous generator when stream=True) llm = Llama(model_path=model_in_use["file"], n_ctx=2048) @app.websocket("/stream") async def stream(websocket: WebSocket): logging.info("WebSocket connection established.") global log_path, last_read_position await websocket.accept() # Send existing interaction history to the client. try: with open(log_path, "r") as log_file: interaction_history = log_file.read() last_read_position = log_file.tell() if interaction_history: await websocket.send_text(interaction_history) except Exception as e: logging.error(f"Error reading interaction history: {e}") # Continuously send updates from the log file. while True: await asyncio.sleep(0.1) try: with open(log_path, "r") as log_file: log_file.seek(last_read_position) new_content = log_file.read() if new_content: await websocket.send_text(new_content) last_read_position = log_file.tell() except Exception as e: logging.error(f"Error reading interaction history: {e}") # Endpoint to retrieve the interaction log. @app.get("/log") async def get_log(): try: with open("interaction_history.log", "r") as f: log_content = f.read() # Return the log inside a
 block for readability.
        return HTMLResponse(content=f"
{log_content}
") except Exception as e: logging.error(f"Error reading log: {e}") return {"error": str(e)} # A simple frontend page with a link to the log. @app.get("/", response_class=HTMLResponse) async def get(): try: with open("app/templates/index.html", "r") as f: html_content = f.read() except Exception as e: logging.error(f"Error loading template: {e}") html_content = "

Error loading template

" return HTMLResponse(html_content) # To run the app, use a command like: if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)