|
from crewai import Agent, Task, Crew |
|
import gradio as gr |
|
import asyncio |
|
from typing import List, Dict, Any, Generator |
|
from langchain_openai import ChatOpenAI |
|
import queue |
|
import threading |
|
import os |
|
|
|
class AgentMessageQueue: |
|
def __init__(self): |
|
self.message_queue = queue.Queue() |
|
self.last_agent = None |
|
|
|
def add_message(self, message: Dict): |
|
self.message_queue.put(message) |
|
|
|
def get_messages(self) -> List[Dict]: |
|
messages = [] |
|
while not self.message_queue.empty(): |
|
messages.append(self.message_queue.get()) |
|
return messages |
|
|
|
class SpaceSystemCrew: |
|
def __init__(self, api_key: str = None): |
|
self.api_key = api_key |
|
self.message_queue = AgentMessageQueue() |
|
self.system_architect = None |
|
self.mechanical_engineer = None |
|
self.aerospace_manager = None |
|
self.current_agent = None |
|
self.final_design = None |
|
|
|
def initialize_agents(self, topic: str): |
|
if not self.api_key: |
|
raise ValueError("OpenAI API key is required") |
|
|
|
os.environ["OPENAI_API_KEY"] = self.api_key |
|
llm = ChatOpenAI(temperature=0.7, model="gpt-4") |
|
|
|
self.system_architect = Agent( |
|
role="System Architect", |
|
goal=f"Design the architecture of the {topic} space system", |
|
backstory="Expert in space system design, integrating mechanical and propulsion components into a cohesive system", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
self.mechanical_engineer = Agent( |
|
role="Mechanical Engineer", |
|
goal=f"Develop structural and propulsion mechanisms for {topic}", |
|
backstory="Specialist in high-velocity launch systems and orbital stabilization", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
self.aerospace_manager = Agent( |
|
role="Aerospace Mission Manager", |
|
goal=f"Ensure mission feasibility, safety, and execution for {topic}", |
|
backstory="Oversees project feasibility, mission planning, and ensures integration with existing aerospace infrastructure", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
def create_tasks(self, topic: str) -> List[Task]: |
|
architect_task = Task( |
|
description=f"""Develop the complete architecture for the {topic} space system, including: |
|
1. High-level design of the five-stage system |
|
2. Identification of key components and subsystems |
|
3. Integration with existing space infrastructure |
|
4. Energy efficiency and sustainability considerations""", |
|
expected_output="A comprehensive space system architecture plan", |
|
agent=self.system_architect |
|
) |
|
|
|
mechanical_task = Task( |
|
description="""Design the structural and propulsion aspects of the space system, including: |
|
1. Engineering specifications for the spin-launcher mechanism |
|
2. Payload stabilization and orbital insertion strategies |
|
3. Material selection and thermal considerations |
|
4. Fuel efficiency and mechanical reliability""", |
|
expected_output="Detailed propulsion and structural design specifications", |
|
agent=self.mechanical_engineer |
|
) |
|
|
|
aerospace_task = Task( |
|
description="""Assess the feasibility and mission execution strategies, including: |
|
1. Launch site selection and environmental impact assessment |
|
2. Safety protocols and risk mitigation strategies |
|
3. Crew and payload rendezvous procedures |
|
4. Operational procedures for space missions post-docking""", |
|
expected_output="A validated mission feasibility and execution plan", |
|
agent=self.aerospace_manager |
|
) |
|
|
|
return [architect_task, mechanical_task, aerospace_task] |
|
|
|
async def process_design(self, topic: str) -> Generator[List[Dict], None, None]: |
|
try: |
|
self.initialize_agents(topic) |
|
self.current_agent = "System Architect" |
|
|
|
yield [{ |
|
"role": "assistant", |
|
"content": "Starting work on your space system design...", |
|
"metadata": {"title": "π Process Started"} |
|
}] |
|
|
|
crew = Crew( |
|
agents=[self.system_architect, self.mechanical_engineer, self.aerospace_manager], |
|
tasks=self.create_tasks(topic), |
|
verbose=True |
|
) |
|
|
|
def run_crew(): |
|
try: |
|
crew.kickoff() |
|
except Exception as e: |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": f"An error occurred: {str(e)}", |
|
"metadata": {"title": "β Error"} |
|
}) |
|
|
|
thread = threading.Thread(target=run_crew) |
|
thread.start() |
|
|
|
while thread.is_alive() or not self.message_queue.message_queue.empty(): |
|
messages = self.message_queue.get_messages() |
|
if messages: |
|
yield messages |
|
await asyncio.sleep(0.1) |
|
except Exception as e: |
|
yield [{ |
|
"role": "assistant", |
|
"content": f"An error occurred: {str(e)}", |
|
"metadata": {"title": "β Error"} |
|
}] |
|
|
|
|
|
def create_demo(): |
|
space_crew = None |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π AI Space System Design Crew") |
|
|
|
openai_api_key = gr.Textbox( |
|
label='OpenAI API Key', |
|
type='password', |
|
placeholder='Enter your OpenAI API key...', |
|
interactive=True |
|
) |
|
|
|
chatbot = gr.Chatbot( |
|
label="Design Process", |
|
height=700, |
|
type="messages", |
|
show_label=True, |
|
visible=False, |
|
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), |
|
render_markdown=True |
|
) |
|
|
|
with gr.Row(equal_height=True): |
|
topic = gr.Textbox( |
|
label="Space System Name", |
|
placeholder="Enter system name...", |
|
scale=4, |
|
visible=False |
|
) |
|
btn = gr.Button("Design System", variant="primary", scale=1, visible=False) |
|
|
|
async def process_input(topic, history, api_key): |
|
nonlocal space_crew |
|
if not api_key: |
|
history = history or [] |
|
history.append({ |
|
"role": "assistant", |
|
"content": "Please provide an OpenAI API key.", |
|
"metadata": {"title": "β Error"} |
|
}) |
|
yield history |
|
return |
|
|
|
if space_crew is None: |
|
space_crew = SpaceSystemCrew(api_key=api_key) |
|
|
|
history = history or [] |
|
history.append({"role": "user", "content": f"Design a space system for: {topic}"}) |
|
yield history |
|
|
|
try: |
|
async for messages in space_crew.process_design(topic): |
|
history.extend(messages) |
|
yield history |
|
except Exception as e: |
|
history.append({ |
|
"role": "assistant", |
|
"content": f"An error occurred: {str(e)}", |
|
"metadata": {"title": "β Error"} |
|
}) |
|
yield history |
|
|
|
openai_api_key.submit(lambda: {openai_api_key: gr.Textbox(visible=False), chatbot: gr.Chatbot(visible=True), topic: gr.Textbox(visible=True), btn: gr.Button(visible=True)}, None, [openai_api_key, chatbot, topic, btn]) |
|
btn.click(process_input, [topic, chatbot, openai_api_key], [chatbot]) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.queue() |
|
demo.launch(debug=True) |
|
|