eaglelandsonce's picture
Update app.py
e764a0a verified
raw
history blame
8.07 kB
from crewai import Agent, Task, Crew
import gradio as gr
import asyncio
from typing import List, Dict, Any, Generator
from langchain_openai import ChatOpenAI
import queue
import threading
import os
class AgentMessageQueue:
def __init__(self):
self.message_queue = queue.Queue()
self.last_agent = None
def add_message(self, message: Dict):
self.message_queue.put(message)
def get_messages(self) -> List[Dict]:
messages = []
while not self.message_queue.empty():
messages.append(self.message_queue.get())
return messages
class SpaceSystemCrew:
def __init__(self, api_key: str = None):
self.api_key = api_key
self.message_queue = AgentMessageQueue()
self.system_architect = None
self.mechanical_engineer = None
self.aerospace_manager = None
self.current_agent = None
self.final_design = None
def initialize_agents(self, topic: str):
if not self.api_key:
raise ValueError("OpenAI API key is required")
os.environ["OPENAI_API_KEY"] = self.api_key
llm = ChatOpenAI(temperature=0.7, model="gpt-4")
self.system_architect = Agent(
role="System Architect",
goal=f"Design the architecture of the {topic} space system",
backstory="Expert in space system design, integrating mechanical and propulsion components into a cohesive system",
allow_delegation=False,
verbose=True,
llm=llm
)
self.mechanical_engineer = Agent(
role="Mechanical Engineer",
goal=f"Develop structural and propulsion mechanisms for {topic}",
backstory="Specialist in high-velocity launch systems and orbital stabilization",
allow_delegation=False,
verbose=True,
llm=llm
)
self.aerospace_manager = Agent(
role="Aerospace Mission Manager",
goal=f"Ensure mission feasibility, safety, and execution for {topic}",
backstory="Oversees project feasibility, mission planning, and ensures integration with existing aerospace infrastructure",
allow_delegation=False,
verbose=True,
llm=llm
)
def create_tasks(self, topic: str) -> List[Task]:
architect_task = Task(
description=f"""Develop the complete architecture for the {topic} space system, including:
1. High-level design of the five-stage system
2. Identification of key components and subsystems
3. Integration with existing space infrastructure
4. Energy efficiency and sustainability considerations""",
expected_output="A comprehensive space system architecture plan",
agent=self.system_architect
)
mechanical_task = Task(
description="""Design the structural and propulsion aspects of the space system, including:
1. Engineering specifications for the spin-launcher mechanism
2. Payload stabilization and orbital insertion strategies
3. Material selection and thermal considerations
4. Fuel efficiency and mechanical reliability""",
expected_output="Detailed propulsion and structural design specifications",
agent=self.mechanical_engineer
)
aerospace_task = Task(
description="""Assess the feasibility and mission execution strategies, including:
1. Launch site selection and environmental impact assessment
2. Safety protocols and risk mitigation strategies
3. Crew and payload rendezvous procedures
4. Operational procedures for space missions post-docking""",
expected_output="A validated mission feasibility and execution plan",
agent=self.aerospace_manager
)
return [architect_task, mechanical_task, aerospace_task]
async def process_design(self, topic: str) -> Generator[List[Dict], None, None]:
try:
self.initialize_agents(topic)
self.current_agent = "System Architect"
yield [{
"role": "assistant",
"content": "Starting work on your space system design...",
"metadata": {"title": "πŸš€ Process Started"}
}]
crew = Crew(
agents=[self.system_architect, self.mechanical_engineer, self.aerospace_manager],
tasks=self.create_tasks(topic),
verbose=True
)
def run_crew():
try:
crew.kickoff()
except Exception as e:
self.message_queue.add_message({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
thread = threading.Thread(target=run_crew)
thread.start()
while thread.is_alive() or not self.message_queue.message_queue.empty():
messages = self.message_queue.get_messages()
if messages:
yield messages
await asyncio.sleep(0.1)
except Exception as e:
yield [{
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
}]
def create_demo():
space_crew = None
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸš€ AI Space System Design Crew")
openai_api_key = gr.Textbox(
label='OpenAI API Key',
type='password',
placeholder='Enter your OpenAI API key...',
interactive=True
)
chatbot = gr.Chatbot(
label="Design Process",
height=700,
type="messages",
show_label=True,
visible=False,
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"),
render_markdown=True
)
with gr.Row(equal_height=True):
topic = gr.Textbox(
label="Space System Name",
placeholder="Enter system name...",
scale=4,
visible=False
)
btn = gr.Button("Design System", variant="primary", scale=1, visible=False)
async def process_input(topic, history, api_key):
nonlocal space_crew
if not api_key:
history = history or []
history.append({
"role": "assistant",
"content": "Please provide an OpenAI API key.",
"metadata": {"title": "❌ Error"}
})
yield history
return
if space_crew is None:
space_crew = SpaceSystemCrew(api_key=api_key)
history = history or []
history.append({"role": "user", "content": f"Design a space system for: {topic}"})
yield history
try:
async for messages in space_crew.process_design(topic):
history.extend(messages)
yield history
except Exception as e:
history.append({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
yield history
openai_api_key.submit(lambda: {openai_api_key: gr.Textbox(visible=False), chatbot: gr.Chatbot(visible=True), topic: gr.Textbox(visible=True), btn: gr.Button(visible=True)}, None, [openai_api_key, chatbot, topic, btn])
btn.click(process_input, [topic, chatbot, openai_api_key], [chatbot])
return demo
if __name__ == "__main__":
demo = create_demo()
demo.queue()
demo.launch(debug=True)