Spaces:
Sleeping
Sleeping
import uuid | |
from fastapi import FastAPI | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from langchain_core.messages import BaseMessage, HumanMessage, trim_messages | |
from langchain_core.tools import tool | |
from langchain_openai import ChatOpenAI | |
from langgraph.checkpoint.memory import MemorySaver | |
from langgraph.prebuilt import create_react_agent | |
from pydantic import BaseModel | |
from typing import Optional | |
import json | |
from sse_starlette.sse import EventSourceResponse | |
import io | |
import sys | |
from contextlib import redirect_stdout, redirect_stderr | |
from langchain_core.runnables import RunnableConfig | |
import requests | |
import uvicorn | |
import re | |
from fastapi.staticfiles import StaticFiles | |
from langchain_core.runnables import RunnableConfig | |
from langchain_core.prompts import ChatPromptTemplate | |
from datetime import datetime | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
app.mount("/chatui", StaticFiles(directory="static/chatui", html=True), name="index") | |
class CodeExecutionResult: | |
def __init__(self, output: str, error: str = None): | |
self.output = output | |
self.error = error | |
API_URL = "https://pvanand-code-execution-files-v41.hf.space" | |
def execute_python(code: str, config: RunnableConfig): | |
"""Execute Python code in an IPython interactiveshell and return the output. | |
The returned artifacts (if present) are automatically rendered in the UI and visible to the user. | |
Available Libraries: plotly (default charting library),pandas,yfinance,numpy,geopandas,folium | |
Args: | |
code: Valid Python code with correct indentation and syntax including necessary imports. | |
""" | |
thread_config = config.get("configurable", {}) | |
session_token = thread_config.get("thread_id", "test") | |
headers = { | |
'accept': 'application/json', | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"session_token": session_token, | |
"code": code | |
} | |
response = requests.post( | |
f'{API_URL}/v0/execute', | |
headers=headers, | |
data=json.dumps(data) | |
) | |
if response.status_code != 200: | |
return f"Error: Request failed with status code {response.status_code}. Response: {response.text}" | |
else: | |
response_json = response.json() | |
return f"data: {json.dumps(response_json)} \ndata:" | |
memory = MemorySaver() | |
model = ChatOpenAI(model="gpt-4o-mini", streaming=True) | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", f"You are a Data Visualization assistant.You have access to a jupyter client with access to internet for python code execution.\ | |
Your taks is to assist users with your data analysis and visualization expertise. Use Plotly for creating visualizations. Generated artifacts\ | |
are automatically rendered in the UI. Today's date is \ | |
{datetime.now().strftime('%Y-%m-%d')}. The current folder contains the following files: {{collection_files}}"), | |
("placeholder", "{messages}"), | |
]) | |
def state_modifier(state) -> list[BaseMessage]: | |
collection_files = "None" | |
try: | |
formatted_prompt = prompt.invoke({ | |
"collection_files": collection_files, | |
"messages": state["messages"] | |
}) | |
print(state["messages"]) | |
return trim_messages( | |
formatted_prompt, | |
token_counter=len, | |
max_tokens=16000, | |
strategy="last", | |
start_on="human", | |
include_system=True, | |
allow_partial=False, | |
) | |
except Exception as e: | |
print(f"Error in state modifier: {str(e)}") | |
return state["messages"] | |
# Create the agent with the Python execution tool | |
agent = create_react_agent( | |
model, | |
tools=[execute_python], | |
checkpointer=memory, | |
state_modifier=state_modifier, | |
) | |
class ChatInput(BaseModel): | |
message: str | |
thread_id: Optional[str] = None | |
async def chat(input_data: ChatInput): | |
thread_id = input_data.thread_id or str(uuid.uuid4()) | |
config = { | |
"configurable": { | |
"thread_id": thread_id | |
} | |
} | |
input_message = HumanMessage(content=input_data.message) | |
async def generate(): | |
async for event in agent.astream_events( | |
{"messages": [input_message]}, | |
config, | |
version="v2" | |
): | |
kind = event["event"] | |
if kind == "on_chat_model_stream": | |
content = event["data"]["chunk"].content | |
if content: | |
yield f"{json.dumps({'type': 'token', 'content': content})}\n" | |
elif kind == "on_tool_start": | |
tool_input = event['data'].get('input', '') | |
yield f"{json.dumps({'type': 'tool_start', 'tool': event['name'], 'input': tool_input})}\n" | |
elif kind == "on_tool_end": | |
tool_output = event['data'].get('output', '').content | |
print(type(tool_output)) | |
#print(dir(tool_output)) | |
#print the keys | |
pattern = r'data: (.*?)\ndata:' | |
match = re.search(pattern, tool_output) | |
print(tool_output) | |
if match: | |
tool_output_json = match.group(1).strip() | |
try: | |
tool_output = json.loads(tool_output_json) | |
if "artifacts" in tool_output: | |
for artifact in tool_output["artifacts"]: | |
artifact_content = requests.get(f"{API_URL}/artifact/{artifact['artifact_id']}").content | |
#print(artifact_content) | |
#tool_output["artifacts"][artifact["artifact_id"]] = artifact_content | |
except Exception as e: | |
print(e) | |
print("Error parsing tool output as json: ", tool_output) | |
else: | |
print("No match found in tool output") | |
yield f"{json.dumps({'type': 'tool_end', 'tool': event['name'], 'output': tool_output})}\n" | |
return EventSourceResponse( | |
generate(), | |
media_type="text/event-stream" | |
) | |
async def health_check(): | |
return {"status": "healthy"} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) |