''' Guidelines for Creating and Utilizing Tools in tools.py: 1. Initial Assessment: Review Existing Tools: Before adding new functions, thoroughly read through tools.py to understand the existing tools and their functionalities. Determine if an existing tool can be adapted or extended to meet the current needs, avoiding redundancy. 2. Tool Creation and Function Design: Create Within tools.py: Add new tools as functions within tools.py. If tools.py doesn't exist, create it. Ensure each function is self-contained and focused on a single task for modularity. Design for Importing: Design tools to be imported and executed via terminal commands. Do not include execution code that runs when tools.py is imported. Follow Best Practices: Use clear and descriptive function names that reflect their general purpose. Include docstrings for each function, detailing the purpose, parameters, and expected outputs. Adhere to PEP 8 style guidelines for readable and maintainable code. 3. Generalization: Broad Input Handling: Design functions to handle a wide range of inputs, enhancing reusability for future tasks. Accept parameters that allow the function to be applicable in various scenarios (e.g., any stock ticker, URL, or data file). Flexible Functionality: Ensure functions can process different data types and structures when applicable. Avoid hardcoding values; use parameters and defaults where necessary. Modularity: If a task involves multiple distinct operations, split them into separate functions. This approach enhances clarity and allows for individual functions to be reused independently. 4. Execution and Script Management: Import and Run via Terminal: Do not execute tools.py directly. Instead, import the necessary functions and run them using the terminal. Use the command: bash Copy code python -c "from tools import function_name; function_name(args)" Replace function_name and args with the appropriate function and arguments. Avoid Additional Scripts: Do not create extra .py files or scripts for execution purposes. Keep all tool functions within tools.py and execute them using the import method shown above. 5. Output: Console Printing: Ensure that all tools print their output directly to the console. Format the output for readability, using clear messages and organizing data in a logical manner. No Return Statements for Output: While functions can return values for internal use, the primary results should be displayed using print() statements. 6. Error Handling: Input Validation: Validate all input parameters to catch errors before execution. Provide informative error messages to guide correct usage. Exception Management: Use try-except blocks to handle potential exceptions without crashing the program. Log errors where appropriate, and ensure they don't expose sensitive information. Debugging and Testing: Test functions with various inputs, including edge cases, to ensure robustness. If errors are found, revise and debug the functions promptly. 7. Post-Creation: Execute to Fulfill Requests: After creating or updating tools, execute them as needed to fulfill user requests unless the request was solely for tool creation. Documentation: Update any relevant documentation or comments within tools.py to reflect new additions or changes. Consider maintaining a usage example within the docstring for complex functions. 8. Maintenance and Refactoring: Regular Review: Periodically review tools.py to identify opportunities for optimization and improvement. Remove or update deprecated functions that are no longer effective or necessary. Enhance Generalization: Refactor functions to improve their general applicability as new requirements emerge. Stay vigilant for patterns that can be abstracted into more general solutions. 9. Compliance and Security: Data Protection: Ensure that tools handle data securely, especially when dealing with sensitive information. Avoid hardcoding credentials or exposing private data through outputs. Licensing and Dependencies: Verify that any third-party libraries used are properly licensed and documented. Include installation instructions for dependencies if they are not part of the standard library. ''' # Your tools will be defined below this line import os from datetime import datetime, timedelta import PyPDF2 from pathlib import Path import json import re import requests from typing import List, Dict, Optional, Union import subprocess import sys from git import Repo def get_current_month_folder() -> str: """Returns the path to the current month's folder.""" base_path = r"C:\Users\admin\Dropbox\Current\2024" current_month = datetime.now().strftime("%B") # Full month name return os.path.join(base_path, current_month) def get_pdfs_for_date(target_date: datetime = None) -> List[str]: """ Finds all PDFs saved on a specific date in the current month's folder structure. Args: target_date: datetime object for the target date. If None, uses today's date. Returns a list of full paths to PDF files. """ if target_date is None: target_date = datetime.now() target_date_str = target_date.strftime("%Y-%m-%d") month_folder = get_current_month_folder() pdf_files = [] # Walk through all subdirectories for root, _, files in os.walk(month_folder): for file in files: if file.lower().endswith('.pdf'): file_path = os.path.join(root, file) # Get file's modification time mod_time = datetime.fromtimestamp(os.path.getmtime(file_path)) if mod_time.strftime("%Y-%m-%d") == target_date_str: pdf_files.append(file_path) return pdf_files def extract_text_from_pdf(pdf_path: str) -> str: """Extract text content from a PDF file.""" try: with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) text = "" for page in reader.pages: text += page.extract_text() + "\n" return text except Exception as e: print(f"Error processing {pdf_path}: {str(e)}") return "" def summarize_pdfs_for_date(target_date: datetime = None): """ Main function to process and summarize PDFs for a specific date. Args: target_date: datetime object for the target date. If None, uses today's date. Prints summary to console and saves to a JSON file. """ if target_date is None: target_date = datetime.now() pdfs = get_pdfs_for_date(target_date) if not pdfs: print(f"No PDFs found for {target_date.strftime('%Y-%m-%d')}") return summaries = {} for pdf_path in pdfs: print(f"Processing: {pdf_path}") text = extract_text_from_pdf(pdf_path) # Basic summary: first 500 characters of text summary = text[:500] + "..." if len(text) > 500 else text # Store in dictionary with filename as key filename = os.path.basename(pdf_path) summaries[filename] = { "path": pdf_path, "summary": summary, "processed_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # Save summaries to JSON file in the current directory output_dir = "summaries" os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, f"summaries_{target_date.strftime('%Y-%m-%d')}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump(summaries, f, indent=4, ensure_ascii=False) print(f"\nProcessed {len(pdfs)} PDFs") print(f"Summaries saved to: {output_file}") # Print summaries to console for filename, data in summaries.items(): print(f"\n{'='*80}\n{filename}") print(f"Path: {data['path']}") print(f"\nSummary:\n{data['summary'][:200]}...") class WebSearchTool: """ A tool for performing web searches using the Perplexity API. Designed to be the default web search mechanism for context-requiring queries. """ def __init__(self, api_key: Optional[str] = None): """ Initialize the WebSearchTool. Args: api_key: Perplexity API key. If None, will try to get from environment variable. """ self.api_key = api_key or os.getenv("PERPLEXITY_API_KEY") if not self.api_key: raise ValueError("Perplexity API key must be provided or set in PERPLEXITY_API_KEY environment variable") self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } def search(self, query: str, max_results: int = 5) -> Dict: """ Perform a web search using Perplexity API. Args: query: The search query max_results: Maximum number of results to return Returns: Dictionary containing search results and metadata """ try: # Make the API request response = requests.post( "https://api.perplexity.ai/chat/completions", headers=self.headers, json={ "model": "llama-3.1-sonar-huge-128k-online", "messages": [ { "role": "system", "content": "You are a helpful assistant that provides accurate and concise answers based on web search results." }, { "role": "user", "content": query } ] } ) response.raise_for_status() data = response.json() # Extract answer from the response answer = data.get("choices", [{}])[0].get("message", {}).get("content", "No answer found") # Process and format the results results = { "query": query, "timestamp": datetime.now().isoformat(), "answer": answer, "references": [], # References not available in this API version "metadata": { "source": "Perplexity API", "model": "llama-3.1-sonar-huge-128k-online" } } return results except Exception as e: print(f"Error performing search: {str(e)}") return { "query": query, "timestamp": datetime.now().isoformat(), "error": str(e), "metadata": { "source": "Perplexity API", "status": "error" } } def format_results(self, results: Dict, format: str = "text") -> str: """ Format search results in the specified format. Args: results: Search results dictionary format: Output format ("text" or "markdown") Returns: Formatted string of results """ if "error" in results: return f"Error: {results['error']}" if format == "markdown": output = f"# Search Results for: {results['query']}\n\n" output += f"## Answer\n{results['answer']}\n\n" if results['references']: output += "## References\n" for i, ref in enumerate(results['references'], 1): output += f"{i}. {ref['title']} - {ref['url']}\n" return output else: output = f"Search Results for: {results['query']}\n\n" output += f"Answer:\n{results['answer']}\n\n" if results['references']: output += "References:\n" for i, ref in enumerate(results['references'], 1): output += f"{i}. {ref['title']} - {ref['url']}\n" return output class MCPServerManager: """ A tool for installing and managing Model Context Protocol (MCP) servers. Integrates with Claude desktop and manages server configurations. """ DEFAULT_CONFIG_LOCATIONS = [ "mcp.json", ".mcp/config.json", "config/mcp.json", "mcp_config.json", ".config/mcp/servers.json" ] def __init__(self, base_dir: Optional[str] = None): """ Initialize the MCP Server Manager. Args: base_dir: Base directory for installing servers. If None, uses current directory. """ self.base_dir = Path(base_dir) if base_dir else Path.cwd() / "mcp_servers" self.base_dir.mkdir(parents=True, exist_ok=True) self.servers_repo_url = "https://github.com/modelcontextprotocol/servers.git" self.installed_servers = {} self.load_installed_servers() def load_installed_servers(self): """Load information about installed servers from the config file.""" config_file = self.base_dir / "config.json" if config_file.exists(): with open(config_file, "r") as f: self.installed_servers = json.load(f) def save_installed_servers(self): """Save information about installed servers to the config file.""" config_file = self.base_dir / "config.json" with open(config_file, "w") as f: json.dump(self.installed_servers, f, indent=4) def get_featured_servers(self) -> List[Dict]: """ Get list of featured servers from the MCP GitHub repository. Returns: List of server information dictionaries """ try: # Clone or update the servers repository repo_dir = self.base_dir / "servers_repo" if repo_dir.exists(): repo = Repo(repo_dir) repo.remotes.origin.pull() else: repo = Repo.clone_from(self.servers_repo_url, repo_dir) # First try to read from featured.json featured_file = repo_dir / "featured.json" if featured_file.exists(): with open(featured_file, "r", encoding="utf-8") as f: return json.load(f) # If featured.json doesn't exist, parse README.md readme_file = repo_dir / "README.md" if readme_file.exists(): servers = [] with open(readme_file, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # Look for server repository links repo_links = re.findall(r"\[([^\]]+)\]\((https://github.com/[^)]+)\)", content) for name, url in repo_links: if "/modelcontextprotocol/" in url: servers.append({ "name": name, "repository": url, "config": {} }) return servers return [] except Exception as e: print(f"Error getting featured servers: {str(e)}") return [] def find_server_config(self, server_dir: Path) -> Optional[Dict]: """ Search for MCP server configuration in common locations. Args: server_dir: Directory to search in Returns: Server configuration dictionary if found, None otherwise """ # First check common config file locations for config_path in self.DEFAULT_CONFIG_LOCATIONS: config_file = server_dir / config_path if config_file.exists(): try: with open(config_file, "r", encoding="utf-8") as f: config = json.load(f) if "mcpServers" in config: return config["mcpServers"] except Exception as e: print(f"Error reading config from {config_file}: {str(e)}") # Check package.json for Node.js projects package_json = server_dir / "package.json" if package_json.exists(): try: with open(package_json, "r", encoding="utf-8") as f: config = json.load(f) if "mcpServers" in config: return config["mcpServers"] except Exception as e: print(f"Error reading config from package.json: {str(e)}") # Check pyproject.toml for Python projects pyproject_toml = server_dir / "pyproject.toml" if pyproject_toml.exists(): try: import tomli with open(pyproject_toml, "rb") as f: config = tomli.load(f) if "tool" in config and "mcp" in config["tool"]: return {"python": config["tool"]["mcp"]} except ImportError: print("tomli package not found, skipping pyproject.toml parsing") except Exception as e: print(f"Error reading config from pyproject.toml: {str(e)}") return None def install_server(self, server_name: str, custom_config: Optional[Dict] = None) -> bool: """ Install a specific MCP server. Args: server_name: Name of the server to install custom_config: Optional custom configuration for the server Returns: True if installation was successful, False otherwise """ try: # Get server information from featured servers featured_servers = self.get_featured_servers() server_info = next((s for s in featured_servers if s["name"] == server_name), None) if not server_info: print(f"Server '{server_name}' not found in featured servers") return False # Create server directory server_dir = self.base_dir / server_name server_dir.mkdir(exist_ok=True) # Clone server repository repo = Repo.clone_from(server_info["repository"], server_dir) # Install dependencies if (server_dir / "requirements.txt").exists(): subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) elif (server_dir / "package.json").exists(): subprocess.run(["npm", "install"], cwd=server_dir) # Find or use custom server configuration config = custom_config or self.find_server_config(server_dir) or {} # Store server information self.installed_servers[server_name] = { "path": str(server_dir), "version": repo.head.commit.hexsha[:8], "install_date": datetime.now().isoformat(), "config": config } self.save_installed_servers() print(f"Successfully installed {server_name}") if config: print(f"Found server configuration: {json.dumps(config, indent=2)}") else: print("No server configuration found. You may need to configure it manually.") return True except Exception as e: print(f"Error installing server '{server_name}': {str(e)}") return False def uninstall_server(self, server_name: str) -> bool: """ Uninstall a specific MCP server. Args: server_name: Name of the server to uninstall Returns: True if uninstallation was successful, False otherwise """ try: if server_name not in self.installed_servers: print(f"Server '{server_name}' is not installed") return False # Remove server directory server_dir = Path(self.installed_servers[server_name]["path"]) shutil.rmtree(server_dir) # Remove from installed servers del self.installed_servers[server_name] self.save_installed_servers() print(f"Successfully uninstalled {server_name}") return True except Exception as e: print(f"Error uninstalling server '{server_name}': {str(e)}") return False def list_installed_servers(self) -> Dict[str, Dict]: """ Get information about installed servers. Returns: Dictionary of installed server information """ return self.installed_servers def update_server(self, server_name: str) -> bool: """ Update a specific MCP server to the latest version. Args: server_name: Name of the server to update Returns: True if update was successful, False otherwise """ try: if server_name not in self.installed_servers: print(f"Server '{server_name}' is not installed") return False server_dir = Path(self.installed_servers[server_name]["path"]) repo = Repo(server_dir) # Get current version old_version = repo.head.commit.hexsha[:8] # Pull latest changes repo.remotes.origin.pull() # Get new version new_version = repo.head.commit.hexsha[:8] # Update dependencies if needed if (server_dir / "requirements.txt").exists(): subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) # Update stored information self.installed_servers[server_name]["version"] = new_version self.save_installed_servers() print(f"Updated {server_name} from {old_version} to {new_version}") return True except Exception as e: print(f"Error updating server '{server_name}': {str(e)}") return False def configure_server(self, server_name: str, config: Dict) -> bool: """ Configure a specific MCP server. Args: server_name: Name of the server to configure config: Configuration dictionary in the format: { "command": str, # Command to run the server "args": List[str], # Arguments for the command "env": Dict[str, str], # Optional environment variables "cwd": str, # Optional working directory } Returns: True if configuration was successful, False otherwise """ try: if server_name not in self.installed_servers: print(f"Server '{server_name}' is not installed") return False server_dir = Path(self.installed_servers[server_name]["path"]) # Validate configuration if "command" not in config: print("Error: Server configuration must include 'command'") return False # Update configuration self.installed_servers[server_name]["config"] = config self.save_installed_servers() # Try to write configuration to a standard location config_dir = server_dir / ".mcp" config_dir.mkdir(exist_ok=True) config_file = config_dir / "config.json" with open(config_file, "w", encoding="utf-8") as f: json.dump({"mcpServers": {server_name: config}}, f, indent=2) print(f"Successfully configured {server_name}") print(f"Configuration saved to {config_file}") return True except Exception as e: print(f"Error configuring server '{server_name}': {str(e)}") return False def start_server(self, server_name: str) -> bool: """ Start a specific MCP server. Args: server_name: Name of the server to start Returns: True if server was started successfully, False otherwise """ try: if server_name not in self.installed_servers: print(f"Server '{server_name}' is not installed") return False server_info = self.installed_servers[server_name] config = server_info.get("config", {}) if not config: print(f"Server '{server_name}' is not configured") return False # Prepare command and arguments command = config.get("command") args = config.get("args", []) env = {**os.environ, **(config.get("env", {}))} cwd = config.get("cwd") or server_info["path"] # Start the server process process = subprocess.Popen( [command, *args], env=env, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Store process information self.installed_servers[server_name]["process"] = process self.save_installed_servers() print(f"Started server '{server_name}' (PID: {process.pid})") return True except Exception as e: print(f"Error starting server '{server_name}': {str(e)}") return False def stop_server(self, server_name: str) -> bool: """ Stop a specific MCP server. Args: server_name: Name of the server to stop Returns: True if server was stopped successfully, False otherwise """ try: if server_name not in self.installed_servers: print(f"Server '{server_name}' is not installed") return False server_info = self.installed_servers[server_name] process = server_info.get("process") if not process: print(f"Server '{server_name}' is not running") return False # Try to stop the process gracefully process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() # Remove process information del self.installed_servers[server_name]["process"] self.save_installed_servers() print(f"Stopped server '{server_name}'") return True except Exception as e: print(f"Error stopping server '{server_name}': {str(e)}") return False async def generate_pyflowchart(repo_path: str, output_dir: Optional[str] = None) -> None: """ Generate flowcharts for all Python files in a repository using pyflowchart. Creates HTML flowcharts that can be viewed in a browser. Args: repo_path: Path to the repository output_dir: Optional directory to save flowcharts. If None, creates a 'flowcharts' directory in the repo. """ try: # Ensure pyflowchart is installed subprocess.run([sys.executable, "-m", "pip", "install", "pyflowchart"], check=True) # Set up output directory if output_dir is None: output_dir = os.path.join(repo_path, 'flowcharts') os.makedirs(output_dir, exist_ok=True) # Generate HTML flowcharts for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.py'): py_file = os.path.join(root, file) # Skip empty files if os.path.getsize(py_file) == 0: print(f"Skipping empty file: {py_file}") continue # Check if file has actual Python code with open(py_file, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: print(f"Skipping empty file: {py_file}") continue html_file = os.path.join(output_dir, f"{os.path.splitext(file)[0]}_flowchart.html") # Generate flowchart HTML print(f"Generating flowchart for {py_file}") try: subprocess.run([ sys.executable, "-m", "pyflowchart", py_file, "--output", html_file ], check=True) print(f"Saved HTML to {html_file}") except subprocess.CalledProcessError as e: print(f"Error generating flowchart for {py_file}: {str(e)}") continue print(f"\nFlowcharts generated in: {output_dir}") except subprocess.CalledProcessError as e: print(f"Error running pyflowchart: {str(e)}") except Exception as e: print(f"Error generating flowcharts: {str(e)}") def extract_repo_context(repo_url: str, output_file: Optional[str] = None) -> None: """ Extract repository context using gitingest.com. Args: repo_url: GitHub repository URL output_file: Optional file to save the context. If None, uses repo name with .txt extension. """ try: # Convert github.com URL to gitingest.com if "github.com" not in repo_url: raise ValueError("Only GitHub repositories are supported") ingest_url = repo_url.replace("github.com", "gitingest.com") print(f"Fetching repository context from: {ingest_url}") # Make request to gitingest.com response = requests.get(ingest_url) response.raise_for_status() # Extract content content = response.text # Save to file if output_file is None: repo_name = repo_url.split('/')[-1].replace('.git', '') output_file = f"{repo_name}_context.txt" with open(output_file, 'w', encoding='utf-8') as f: f.write(content) print(f"Repository context saved to: {output_file}") except requests.RequestException as e: print(f"Error fetching repository context: {str(e)}") except Exception as e: print(f"Error extracting repository context: {str(e)}") async def post_clone_actions(repo_url: str) -> None: """ Perform post-clone actions after a git clone operation: 1. Generate flowcharts for all Python files using pyflowchart 2. Extract repository context using gitingest.com Args: repo_url: URL of the repository that was just cloned """ try: # Get the repository name from the URL repo_name = repo_url.split('/')[-1].replace('.git', '') repo_path = os.path.join(os.getcwd(), repo_name) # Generate flowcharts print("\nGenerating flowcharts...") await generate_pyflowchart(repo_path) # Extract repository context print("\nExtracting repository context...") extract_repo_context(repo_url) except Exception as e: print(f"Error in post-clone actions: {str(e)}") # Example usage: if __name__ == "__main__": # Initialize the search tool search_tool = WebSearchTool() # Perform a simple search results = search_tool.search("Latest developments in AI technology") # Print formatted results print(search_tool.format_results(results, "markdown"))