code-chat-api / main.py
pvanand's picture
Update main.py
8e7b765 verified
import uuid
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from langchain_core.messages import BaseMessage, HumanMessage, trim_messages
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel
from typing import Optional
import json
from sse_starlette.sse import EventSourceResponse
import io
import sys
from contextlib import redirect_stdout, redirect_stderr
from langchain_core.runnables import RunnableConfig
import requests
import uvicorn
import re
from fastapi.staticfiles import StaticFiles
from langchain_core.runnables import RunnableConfig
from langchain_core.prompts import ChatPromptTemplate
from datetime import datetime
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/chatui", StaticFiles(directory="static/chatui", html=True), name="index")
class CodeExecutionResult:
def __init__(self, output: str, error: str = None):
self.output = output
self.error = error
API_URL = "https://pvanand-code-execution-files-v41.hf.space"
@tool
def execute_python(code: str, config: RunnableConfig):
"""Execute Python code in an IPython interactiveshell and return the output.
The returned artifacts (if present) are automatically rendered in the UI and visible to the user.
Available Libraries: plotly (default charting library),pandas,yfinance,numpy,geopandas,folium
Args:
code: Valid Python code with correct indentation and syntax including necessary imports.
"""
thread_config = config.get("configurable", {})
session_token = thread_config.get("thread_id", "test")
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"session_token": session_token,
"code": code
}
response = requests.post(
f'{API_URL}/v0/execute',
headers=headers,
data=json.dumps(data)
)
if response.status_code != 200:
return f"Error: Request failed with status code {response.status_code}. Response: {response.text}"
else:
response_json = response.json()
return f"data: {json.dumps(response_json)} \ndata:"
memory = MemorySaver()
model = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", f"You are a Data Visualization assistant.You have access to a jupyter client with access to internet for python code execution.\
Your taks is to assist users with your data analysis and visualization expertise. Use Plotly for creating visualizations. Generated artifacts\
are automatically rendered in the UI. Today's date is \
{datetime.now().strftime('%Y-%m-%d')}. The current folder contains the following files: {{collection_files}}"),
("placeholder", "{messages}"),
])
def state_modifier(state) -> list[BaseMessage]:
collection_files = "None"
try:
formatted_prompt = prompt.invoke({
"collection_files": collection_files,
"messages": state["messages"]
})
print(state["messages"])
return trim_messages(
formatted_prompt,
token_counter=len,
max_tokens=16000,
strategy="last",
start_on="human",
include_system=True,
allow_partial=False,
)
except Exception as e:
print(f"Error in state modifier: {str(e)}")
return state["messages"]
# Create the agent with the Python execution tool
agent = create_react_agent(
model,
tools=[execute_python],
checkpointer=memory,
state_modifier=state_modifier,
)
class ChatInput(BaseModel):
message: str
thread_id: Optional[str] = None
@app.post("/chat")
async def chat(input_data: ChatInput):
thread_id = input_data.thread_id or str(uuid.uuid4())
config = {
"configurable": {
"thread_id": thread_id
}
}
input_message = HumanMessage(content=input_data.message)
async def generate():
async for event in agent.astream_events(
{"messages": [input_message]},
config,
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
yield f"{json.dumps({'type': 'token', 'content': content})}\n"
elif kind == "on_tool_start":
tool_input = event['data'].get('input', '')
yield f"{json.dumps({'type': 'tool_start', 'tool': event['name'], 'input': tool_input})}\n"
elif kind == "on_tool_end":
tool_output = event['data'].get('output', '').content
print(type(tool_output))
#print(dir(tool_output))
#print the keys
pattern = r'data: (.*?)\ndata:'
match = re.search(pattern, tool_output)
print(tool_output)
if match:
tool_output_json = match.group(1).strip()
try:
tool_output = json.loads(tool_output_json)
if "artifacts" in tool_output:
for artifact in tool_output["artifacts"]:
artifact_content = requests.get(f"{API_URL}/artifact/{artifact['artifact_id']}").content
#print(artifact_content)
#tool_output["artifacts"][artifact["artifact_id"]] = artifact_content
except Exception as e:
print(e)
print("Error parsing tool output as json: ", tool_output)
else:
print("No match found in tool output")
yield f"{json.dumps({'type': 'tool_end', 'tool': event['name'], 'output': tool_output})}\n"
return EventSourceResponse(
generate(),
media_type="text/event-stream"
)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)