import os import gradio as gr from gradio import ChatMessage from typing import Iterator, List, Dict, Tuple, Any import google.generativeai as genai from huggingface_hub import HfApi import requests import re import traceback import time import threading import json # HuggingFace 관련 API 키 (스페이스 분석 용) HF_TOKEN = os.getenv("HF_TOKEN") hf_api = HfApi(token=HF_TOKEN) # Gemini 2.0 Flash Thinking 모델 관련 API 키 및 클라이언트 (LLM 용) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-01-21") # -------------------------------------------------- # 파일 및 스페이스 분석 관련 함수들 (기존 코드 유지) # -------------------------------------------------- def get_headers(): if not HF_TOKEN: raise ValueError("Hugging Face token not found in environment variables") return {"Authorization": f"Bearer {HF_TOKEN}"} def get_file_content(space_id: str, file_path: str) -> str: file_url = f"https://huggingface.co/spaces/{space_id}/raw/main/{file_path}" try: response = requests.get(file_url, headers=get_headers()) if response.status_code == 200: return response.text else: return f"File not found or inaccessible: {file_path}" except requests.RequestException: return f"Error fetching content for file: {file_path}" def get_space_structure(space_id: str) -> Dict: try: files = hf_api.list_repo_files(repo_id=space_id, repo_type="space") tree = {"type": "directory", "path": "", "name": space_id, "children": []} for file in files: path_parts = file.split('/') current = tree for i, part in enumerate(path_parts): if i == len(path_parts) - 1: # 파일 current["children"].append({"type": "file", "path": file, "name": part}) else: found = False for child in current["children"]: if child["type"] == "directory" and child["name"] == part: current = child found = True break if not found: new_dir = {"type": "directory", "path": '/'.join(path_parts[:i+1]), "name": part, "children": []} current["children"].append(new_dir) current = new_dir return tree except Exception as e: print(f"Error in get_space_structure: {str(e)}") return {"error": f"API request error: {str(e)}"} def format_tree_structure(tree_data: Dict, indent: str = "") -> str: if "error" in tree_data: return tree_data["error"] formatted = f"{indent}{'📁' if tree_data.get('type') == 'directory' else '📄'} {tree_data.get('name', 'Unknown')}\n" if tree_data.get("type") == "directory": for child in sorted(tree_data.get("children", []), key=lambda x: (x.get("type", "") != "directory", x.get("name", ""))): formatted += format_tree_structure(child, indent + " ") return formatted def adjust_lines_for_code(code_content: str, min_lines: int = 10, max_lines: int = 100) -> int: num_lines = len(code_content.split('\n')) return min(max(num_lines, min_lines), max_lines) def analyze_space(url: str, progress=gr.Progress()): try: space_id = url.split('spaces/')[-1] if not re.match(r'^[\w.-]+/[\w.-]+$', space_id): raise ValueError(f"Invalid Space ID format: {space_id}") progress(0.1, desc="파일 구조 분석 중...") tree_structure = get_space_structure(space_id) if "error" in tree_structure: raise ValueError(tree_structure["error"]) tree_view = format_tree_structure(tree_structure) progress(0.3, desc="app.py 내용 가져오는 중...") app_content = get_file_content(space_id, "app.py") progress(0.5, desc="코드 요약 중...") summary = summarize_code(app_content) progress(0.7, desc="코드 분석 중...") analysis = analyze_code(app_content) progress(0.9, desc="사용법 설명 생성 중...") usage = explain_usage(app_content) app_py_lines = adjust_lines_for_code(app_content) progress(1.0, desc="완료") return app_content, tree_view, tree_structure, space_id, summary, analysis, usage, app_py_lines except Exception as e: print(f"Error in analyze_space: {str(e)}") print(traceback.format_exc()) return f"오류가 발생했습니다: {str(e)}", "", None, "", "", "", "", 10 # -------------------------------------------------- # Gemini 2.0 Flash Thinking 모델 관련 헬퍼 함수들 # -------------------------------------------------- def format_chat_history(messages: List[ChatMessage]) -> List[Dict]: formatted_history = [] for message in messages: # thinking 메시지(메타데이터 있는 메시지)는 건너뜁니다. if not (hasattr(message, "metadata") and message.metadata): formatted_history.append({ "role": "user" if message.role == "user" else "assistant", "parts": [message.content or ""] }) return formatted_history def gemini_chat_completion(system_message: str, user_message: str, max_tokens: int = 200, temperature: float = 0.7) -> str: initial_messages = [ ChatMessage(role="system", content=system_message), ChatMessage(role="user", content=user_message) ] chat_history = format_chat_history(initial_messages) chat = model.start_chat(history=chat_history) final_response = "" try: for chunk in chat.send_message(user_message, stream=True): parts = chunk.candidates[0].content.parts if len(parts) == 2: final_response += parts[1].text else: final_response += parts[0].text return final_response.strip() except Exception as e: return f"LLM 호출 중 오류 발생: {str(e)}" def summarize_code(app_content: str) -> str: system_message = "당신은 Python 코드를 분석하고 요약하는 AI 조수입니다. 주어진 코드를 3줄 이내로 간결하게 요약해주세요." user_message = f"다음 Python 코드를 3줄 이내로 요약해주세요:\n\n{app_content}" try: return gemini_chat_completion(system_message, user_message, max_tokens=200, temperature=0.7) except Exception as e: return f"요약 생성 중 오류 발생: {str(e)}" def analyze_code(app_content: str) -> str: system_message = ( "당신은 Python 코드를 분석하는 AI 조수입니다. 주어진 코드를 분석하여 다음 항목에 대해 설명해주세요:\n" "A. 배경 및 필요성\n" "B. 기능적 효용성 및 가치\n" "C. 특장점\n" "D. 적용 대상 및 타겟\n" "E. 기대효과\n" "기존 및 유사 프로젝트와 비교하여 분석해주세요. Markdown 형식으로 출력하세요." ) user_message = f"다음 Python 코드를 분석해주세요:\n\n{app_content}" try: return gemini_chat_completion(system_message, user_message, max_tokens=1000, temperature=0.7) except Exception as e: return f"분석 생성 중 오류 발생: {str(e)}" def explain_usage(app_content: str) -> str: system_message = "당신은 Python 코드를 분석하여 사용법을 설명하는 AI 조수입니다. 주어진 코드를 바탕으로 마치 화면을 보는 것처럼 사용법을 상세히 설명해주세요. Markdown 형식으로 출력하세요." user_message = f"다음 Python 코드의 사용법을 설명해주세요:\n\n{app_content}" try: return gemini_chat_completion(system_message, user_message, max_tokens=800, temperature=0.7) except Exception as e: return f"사용법 설명 생성 중 오류 발생: {str(e)}" def convert_chat_history(messages: List[Any]) -> List[Tuple[str, str]]: """ 메시지 목록의 각 항목이 ChatMessage 객체라면 (user, assistant) 튜플로, 이미 튜플인 경우 그대로 반환합니다. """ conv = [] i = 0 while i < len(messages): # 만약 이미 튜플이면 그대로 사용 if isinstance(messages[i], tuple): conv.append(messages[i]) i += 1 # 만약 ChatMessage 객체라면 elif hasattr(messages[i], "role"): if messages[i].role == "user": user_text = messages[i].content bot_text = "" if i + 1 < len(messages) and hasattr(messages[i+1], "role") and messages[i+1].role == "assistant": bot_text = messages[i+1].content i += 2 else: i += 1 conv.append((user_text, bot_text)) else: conv.append(("", messages[i].content)) i += 1 else: i += 1 return conv def convert_to_chatmessage(history: List[Tuple[str, str]]) -> List[ChatMessage]: """ 튜플 목록을 ChatMessage 객체 목록으로 변환합니다. """ new_history = [] for tup in history: if tup[0]: new_history.append(ChatMessage(role="user", content=tup[0])) if tup[1]: new_history.append(ChatMessage(role="assistant", content=tup[1])) return new_history def stream_gemini_response(user_message: str, messages: List[ChatMessage]) -> Iterator[List[ChatMessage]]: if not user_message.strip(): messages.append(ChatMessage(role="assistant", content="Please provide a non-empty text message. Empty input is not allowed.")) yield messages return try: print(f"\n=== New Request (Text) ===") print(f"User message: {user_message}") chat_history = format_chat_history(messages) chat = model.start_chat(history=chat_history) response = chat.send_message(user_message, stream=True) thought_buffer = "" response_buffer = "" thinking_complete = False messages.append( ChatMessage( role="assistant", content="", metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) ) for chunk in response: parts = chunk.candidates[0].content.parts current_chunk = parts[0].text if len(parts) == 2 and not thinking_complete: thought_buffer += current_chunk print(f"\n=== Complete Thought ===\n{thought_buffer}") messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield messages response_buffer = parts[1].text print(f"\n=== Starting Response ===\n{response_buffer}") messages.append( ChatMessage( role="assistant", content=response_buffer ) ) thinking_complete = True elif thinking_complete: response_buffer += current_chunk print(f"\n=== Response Chunk ===\n{current_chunk}") messages[-1] = ChatMessage( role="assistant", content=response_buffer ) else: thought_buffer += current_chunk print(f"\n=== Thinking Chunk ===\n{current_chunk}") messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield messages print(f"\n=== Final Response ===\n{response_buffer}") except Exception as e: print(f"\n=== Error ===\n{str(e)}") messages.append( ChatMessage( role="assistant", content=f"I apologize, but I encountered an error: {str(e)}" ) ) yield messages def respond(message: str, history: List[ChatMessage]) -> Iterator[List[Tuple[str, str]]]: """ stream_gemini_response()를 호출한 후, 출력 결과를 튜플 목록으로 변환하여 반환합니다. """ for updated_messages in stream_gemini_response(message, history): yield convert_chat_history(updated_messages) def user_message(msg: str, history: List[ChatMessage]) -> Tuple[str, List[ChatMessage]]: history.append(ChatMessage(role="user", content=msg)) return "", history def respond_wrapper(message, chat_history, max_tokens, temperature, top_p): # chat_history가 튜플 목록이라면 ChatMessage 객체로 변환 if chat_history and isinstance(chat_history[0], tuple): chat_history = convert_to_chatmessage(chat_history) for updated in stream_gemini_response(message, chat_history): yield "", convert_chat_history(updated) # -------------------------------------------------- # Gradio UI 구성 # -------------------------------------------------- def create_ui(): try: css = """ /* 전체 배경 및 기본 글꼴 설정 */ body { background-color: #f9f9f9; font-family: 'Helvetica Neue', Arial, sans-serif; color: #333; } /* 하단 푸터 숨김 */ footer { visibility: hidden; } /* 출력 그룹 스타일: 밝은 배경, 부드러운 테두리와 그림자 */ .output-group { border: 1px solid #ccc; border-radius: 8px; padding: 15px; margin-bottom: 20px; background-color: #ffffff; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); } /* 스크롤 영역 설정 */ .scroll-lock { overflow-y: auto !important; max-height: 300px !important; } .tree-view-scroll { overflow-y: auto !important; max-height: 400px !important; } .full-height { height: 80vh !important; overflow-y: auto !important; } /* 코드 박스 스타일: 모노스페이스 폰트와 밝은 배경 */ .code-box { overflow-x: auto !important; overflow-y: auto !important; white-space: pre !important; background-color: #f5f5f5; border-radius: 4px; padding: 10px; font-family: 'Courier New', Courier, monospace; } .code-box > div { min-width: 100% !important; } .code-box > div > textarea { word-break: normal !important; overflow-wrap: normal !important; } /* 탭 내비게이션 스타일: 단순하고 깔끔한 디자인 */ .tab-nav { background-color: #ffffff; border-bottom: 1px solid #ccc; display: flex; } .tab-nav button { background: none; border: none; padding: 10px 20px; margin: 0; cursor: pointer; font-size: 16px; color: #555; transition: color 0.3s, border-bottom 0.3s; } .tab-nav button:hover, .tab-nav button.selected { color: #000; border-bottom: 2px solid #007BFF; } /* 입력창 및 텍스트 영역 스타일 */ input[type="text"], textarea { color: #333; background-color: #fff; border: 1px solid #ccc; border-radius: 4px; padding: 8px; } """ with gr.Blocks(theme="default", css=css) as demo: gr.Markdown("# MOUSE: Space Research Thinking", elem_classes="header-markdown") with gr.Tabs() as tabs: with gr.TabItem("분석"): with gr.Row(): with gr.Column(scale=6): url_input = gr.Textbox(label="HuggingFace Space URL", placeholder="예: https://huggingface.co/spaces/username/space_name") analyze_button = gr.Button("분석", variant="primary") with gr.Group(elem_classes="output-group scroll-lock"): summary_output = gr.Markdown(label="요약 (3줄 이내)") with gr.Group(elem_classes="output-group scroll-lock"): analysis_output = gr.Markdown(label="분석") with gr.Group(elem_classes="output-group scroll-lock"): usage_output = gr.Markdown(label="사용법") with gr.Group(elem_classes="output-group tree-view-scroll"): tree_view_output = gr.Textbox(label="파일 구조 (Tree View)", lines=30) with gr.Column(scale=4): with gr.Group(elem_classes="output-group full-height"): code_tabs = gr.Tabs() with code_tabs: with gr.TabItem("app.py"): app_py_content = gr.Code( language="python", label="app.py", lines=200, elem_classes="full-height code-box" ) with gr.TabItem("requirements.txt"): requirements_content = gr.Textbox( label="requirements.txt", lines=200, elem_classes="full-height code-box" ) with gr.TabItem("AI 코드챗"): gr.Markdown("## : 예제를 입력/선택하고, 이어서 복사한 app.py 소스코드를 붙여 넣으세요", elem_classes="header-markdown") # 채팅 박스 높이를 400px로 지정하여 화면 높이에 맞게 줄임. chatbot = gr.Chatbot( label="대화", elem_classes="output-group", height=400 ) msg = gr.Textbox(label="메시지", placeholder="메시지를 입력하세요...") # 숨겨진 파라미터 max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="Max Tokens", visible=False) temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="Temperature", visible=False) top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="Top P", visible=False) examples = [ ["상세한 사용 방법을 마치 화면을 보면서 설명하듯이 4000 토큰 이상 자세히 설명하라"], ["FAQ 20건을 상세하게 작성하라. 4000토큰 이상 사용하라."], ["사용 방법과 차별점, 특징, 강점을 중심으로 4000 토큰 이상 유튜브 영상 스크립트 형태로 작성하라"], ["본 서비스를 SEO 최적화하여 블로그 포스트로 4000 토큰 이상 작성하라"], ["특허 출원에 활용할 혁신적인 창의 발명 내용을 중심으로 4000 토큰 이상 작성하라."], ["혁신적이고 논리적인 전문 논문의 형식으로 4000 토큰 이상 작성하라."], ["계속 이어서 답변하라"], ] gr.Examples(examples, inputs=msg) msg.submit(respond_wrapper, [msg, chatbot, max_tokens, temperature, top_p], [msg, chatbot]) with gr.TabItem("Recommended Best"): gr.Markdown( "Discover the best recommended HuggingFace Spaces [here](https://huggingface.co/spaces/openfree/Korean-Leaderboard).", elem_id="recommended-best" ) # 상태 저장용 변수 space_id_state = gr.State() tree_structure_state = gr.State() app_py_content_lines = gr.State() analyze_button.click( analyze_space, inputs=[url_input], outputs=[app_py_content, tree_view_output, tree_structure_state, space_id_state, summary_output, analysis_output, usage_output, app_py_content_lines] ).then( lambda space_id: get_file_content(space_id, "requirements.txt"), inputs=[space_id_state], outputs=[requirements_content] ) app_py_content.change(lambda lines: gr.update(lines=lines), inputs=[app_py_content_lines], outputs=[app_py_content]) return demo except Exception as e: print(f"Error in create_ui: {str(e)}") print(traceback.format_exc()) raise if __name__ == "__main__": try: print("Starting HuggingFace Space Analyzer...") demo = create_ui() print("UI created successfully.") print("Configuring Gradio queue...") demo.queue() print("Gradio queue configured.") print("Launching Gradio app...") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True, show_api=False ) print("Gradio app launched successfully.") except Exception as e: print(f"Error in main: {str(e)}") print("Detailed error information:") print(traceback.format_exc()) raise