from crawl4ai import AsyncWebCrawler from urllib.parse import urlparse import aiohttp import asyncio from asyncio.exceptions import TimeoutError as async_timeout from fast_async import make_async from bs4 import BeautifulSoup, NavigableString import secrets from datetime import datetime import random import os import re import uuid from typing import List, Dict, Tuple, Optional from io import BytesIO import PyPDF2 from fake_useragent import FakeUserAgent from htmlrag import clean_html, build_block_tree, EmbedHTMLPruner, BM25HTMLPruner from transformers import AutoTokenizer, AutoConfig import torch import time class Crawler: def __init__(self, user_dir=None, rate_limit=1, headless=True, verbose=False): self.session_pool = {} # Track active sessions self.verbose = verbose self.rate_limit = rate_limit self.user_dir = user_dir self.headless = headless self.crawler = AsyncWebCrawler( context_options={"userDataDir": self.user_dir}, headless=self.headless, verbose=self.verbose ) # Browser context management self._browser_contexts = {} self._context_locks = {} async def get_browser_context(self, session_id): """Get or create a browser context with proper locking""" if session_id not in self._context_locks: self._context_locks[session_id] = asyncio.Lock() async with self._context_locks[session_id]: if session_id not in self._browser_contexts: context = await self.crawler.new_context() self._browser_contexts[session_id] = context return self._browser_contexts[session_id] async def cleanup_browser_context(self, session_id): """Safely cleanup browser context""" if session_id in self._context_locks: async with self._context_locks[session_id]: if session_id in self._browser_contexts: try: await asyncio.shield( self._browser_contexts[session_id].close() ) except Exception as e: print(f"Error cleaning up browser context: {e}") finally: del self._browser_contexts[session_id] def create_session(self): """Create a new session with secure ID""" session_id = secrets.token_urlsafe(32) # Secure session ID self.session_pool[session_id] = { 'created_at': datetime.now(), 'last_used': datetime.now(), 'requests_count': 0 } return session_id def rotate_session(self, session_id): """Implement session rotation logic""" if self.session_pool[session_id]['requests_count'] > 100: self.cleanup_session(session_id) return self.create_session() return session_id def is_dynamic_page(self, html_content: str) -> Tuple[bool, Optional[str]]: """Analyzes HTML content to determine if a webpage is dynamically loaded""" def _check_structural_indicators(soup: BeautifulSoup) -> Dict[str, int]: """Check structural indicators of dynamic content loading.""" scores = { 'empty_containers': 0, 'repeated_structures': 0, 'api_endpoints': 0, 'state_management': 0 } # 1. Check for empty content containers main_containers = soup.find_all(['main', 'div', 'section'], class_=lambda x: x and any(term in str(x).lower() for term in ['content', 'main', 'feed', 'list', 'container'])) for container in main_containers: # Check if container is empty or has minimal content if len(container.find_all()) < 3: scores['empty_containers'] += 1 # Check for repeated similar structures (common in dynamic lists) children = container.find_all(recursive=False) if children: first_child_class = children[0].get('class', []) similar_siblings = [c for c in children[1:] if c.get('class', []) == first_child_class] if len(similar_siblings) > 0: scores['repeated_structures'] += 1 # 2. Check for API endpoints in scripts scripts = soup.find_all('script', {'src': True}) api_patterns = ['/api/', '/graphql', '/rest/', '/v1/', '/v2/'] for script in scripts: if any(pattern in script['src'] for pattern in api_patterns): scores['api_endpoints'] += 1 # 3. Look for state management setup state_patterns = [ r'window\.__INITIAL_STATE__', r'window\.__PRELOADED_STATE__', r'__REDUX_STATE__', r'__NUXT__', r'__NEXT_DATA__', r'window\.__data' ] inline_scripts = soup.find_all('script') for script in inline_scripts: if script.string: for pattern in state_patterns: if re.search(pattern, script.string): scores['state_management'] += 1 return scores def _check_modern_framework_indicators(soup: BeautifulSoup) -> Dict[str, int]: """Check for indicators of modern web frameworks and dynamic loading patterns.""" scores = { 'framework_roots': 0, 'hydration': 0, 'routing': 0 } # 1. Framework-specific root elements framework_roots = { 'react': ['react-root', 'react-app', 'root', '__next'], 'angular': ['ng-version', 'ng-app'], 'vue': ['v-app', '#app', 'nuxt-app'], 'modern': ['app-root', 'application', 'spa-root'] } for framework, identifiers in framework_roots.items(): for id_value in identifiers: if (soup.find(attrs={'id': re.compile(id_value, re.I)}) or soup.find(attrs={'class': re.compile(id_value, re.I)}) or soup.find(attrs={'data-': re.compile(id_value, re.I)})): scores['framework_roots'] += 1 # 2. Check for hydration indicators hydration_patterns = [ r'hydrate', r'createRoot', r'reactive', r'observable' ] scripts = soup.find_all('script') for script in scripts: if script.string: for pattern in hydration_patterns: if re.search(pattern, script.string): scores['hydration'] += 1 # 3. Check for dynamic routing setup router_patterns = [ 'router-view', 'router-link', 'route-link', 'history.push', 'navigation' ] for pattern in router_patterns: if soup.find(class_=re.compile(pattern, re.I)) or \ soup.find(id=re.compile(pattern, re.I)): scores['routing'] += 1 return scores def _check_dynamic_loading_patterns(soup: BeautifulSoup) -> Dict[str, int]: """Check for various dynamic content loading patterns.""" scores = { 'infinite_scroll': 0, 'load_more_buttons': 0, 'pagination': 0, 'lazy_loading': 0, 'loading_indicators': 0 } # 1. Check for infinite scroll indicators scroll_indicators = [ 'infinite-scroll', 'data-infinite', 'data-virtualized', 'virtual-scroll', 'scroll-container', 'scroll-viewport' ] for indicator in scroll_indicators: elements = soup.find_all( lambda tag: any(indicator.lower() in str(v).lower() for v in tag.attrs.values()) ) if elements: scores['infinite_scroll'] += len(elements) # 2. Check for load more buttons button_patterns = [ r'load[_-]?more', r'show[_-]?more', r'view[_-]?more', r'see[_-]?more', r'more[_-]?posts', r'more[_-]?results' ] for pattern in button_patterns: elements = soup.find_all( ['button', 'a', 'div', 'span'], text=re.compile(pattern, re.I) ) if elements: scores['load_more_buttons'] += len(elements) # 3. Check for pagination pagination_patterns = [ 'pagination', 'page-numbers', 'page-nav', 'page-links' ] for pattern in pagination_patterns: elements = soup.find_all(class_=re.compile(pattern, re.I)) if elements: scores['pagination'] += len(elements) # 4. Check for lazy loading lazy_patterns = ['lazy', 'data-src', 'data-lazy'] for pattern in lazy_patterns: elements = soup.find_all( lambda tag: any(pattern.lower() in str(v).lower() for v in tag.attrs.values()) ) if elements: scores['lazy_loading'] += len(elements) # 5. Check for loading indicators loading_patterns = [ 'loading', 'spinner', 'skeleton', 'placeholder', 'shimmer' ] for pattern in loading_patterns: elements = soup.find_all(class_=re.compile(pattern, re.I)) if elements: scores['loading_indicators'] += len(elements) return scores def _evaluate_dynamic_indicators( structural: Dict[str, int], framework: Dict[str, int], loading: Dict[str, int] ) -> Tuple[bool, Optional[str]]: """Evaluate dynamic indicators and return JavaScript instructions.""" methods = [] js_snippets = [] # Infinite Scroll if loading['infinite_scroll'] > 0: methods.append("scroll") js_snippets.append( """ window.scrollTo(0, document.body.scrollHeight); await new Promise(resolve => setTimeout(resolve, 1000)); """.strip().replace('\n', '') ) # Load More Buttons if loading['load_more_buttons'] > 0: methods.append("button") js_snippets.append( """ const button = Array.from(document.querySelectorAll('button, a, div, span')).find( el => /load[_-]?more|show[_-]?more/i.test(el.textContent) ); if (button) { button.click(); await new Promise(resolve => setTimeout(resolve, 1000)); } else { console.warn("No 'Load More' button found."); } """.strip().replace('\n', '') ) # Paginated Interfaces if loading.get('pagination', 0) > 0: methods.append("pagination") js_snippets.append( """ const nextPage = document.querySelector('a[rel="next"], .pagination-next, .page-next'); if (nextPage) { nextPage.click(); await new Promise(resolve => setTimeout(resolve, 1000)); } else { console.warn("No pagination link found."); } """.strip().replace('\n', '') ) # Lazy Loading if loading.get('lazy_loading', 0) > 0: methods.append("lazy") js_snippets.append( """ if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { console.log('Framework state detected. Consider monitoring network requests for further actions.'); } """.strip().replace('\n', '') ) # Framework and State Management Indicators if framework['framework_roots'] > 0 or structural['state_management'] > 0: methods.append("stateful") js_snippets.append( """ if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { console.log('Detected stateful framework data loading.'); } """.strip().replace('\n', '') ) # API-Driven Content if structural['api_endpoints'] > 0: methods.append("api") js_snippets.append( """ console.log('API requests detected. Use browser devtools to inspect network activity for specific endpoints.'); """.strip().replace('\n', '') ) # Aggregate and finalize if methods: js_code = "\n".join(js_snippets) return True, js_code return False, None # Main execution soup = BeautifulSoup(html_content, 'html.parser') # Run all checks structural_scores = _check_structural_indicators(soup) framework_scores = _check_modern_framework_indicators(soup) loading_scores = _check_dynamic_loading_patterns(soup) # Evaluate results return _evaluate_dynamic_indicators(structural_scores, framework_scores, loading_scores) async def crawl( self, url, depth=2, max_pages=5, session_id=None, human_simulation=True, rotate_user_agent=True, rotate_proxy=True, return_html=False ): if not session_id: session_id = self.create_session() session_id = self.rotate_session(session_id) # List of rotating user agents user_agents = [ 'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Chrome/115.0.0.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Chrome/115.0.0.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Chrome/115.0.0.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' ] # List of rotating proxies proxies = [ "http://50.62.183.123:80", "http://104.129.60.84:6516", "http://156.228.118.163:3128", "http://142.111.104.97:6107", "http://156.228.99.99:3128" ] try: async with self.crawler as crawler: # Rotate user agent and optimize headers for each attempt headers = { "User-Agent": random.choice(user_agents) if rotate_user_agent else user_agents[0], "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0" } # Update crawler headers for rotation crawler.crawler_strategy.headers = headers if rotate_proxy: # Update crawler proxy for rotation crawler.crawler_strategy.proxy = random.choice(proxies) result_1 = await crawler.arun( session_id=session_id, url=url, magic=True if human_simulation else False, simulate_user=True if human_simulation else False, override_navigator=True if human_simulation else False, depth=depth, max_pages=max_pages, bypass_cache=True, remove_overlay_elements=True, delay_before_retrieve_html=1.0, verbose=self.verbose ) # Update session metrics self.session_pool[session_id]['requests_count'] += 1 self.session_pool[session_id]['last_used'] = datetime.now() if result_1.success: if hasattr(result_1, 'html'): success, js_code = self.is_dynamic_page(result_1.html) if success: async with crawler as crawler: # Update crawler headers for rotation crawler.crawler_strategy.headers = headers if rotate_proxy: # Update crawler proxy for rotation crawler.crawler_strategy.proxy = random.choice(proxies) print(f"Executing JS code: {js_code}") result_2 = await crawler.arun( session_id=session_id, url=url, magic=True if human_simulation else False, simulate_user=True if human_simulation else False, override_navigator=True if human_simulation else False, depth=depth, max_pages=max_pages, js_code=js_code, bypass_cache=True, remove_overlay_elements=True, delay_before_retrieve_html=1.0, verbose=self.verbose ) if result_2.success: result = result_2 else: result = result_1 # Update session metrics self.session_pool[session_id]['requests_count'] += 1 self.session_pool[session_id]['last_used'] = datetime.now() else: result = result_1 if return_html and hasattr(result, 'html'): return result.html elif hasattr(result, 'fit_markdown'): return result.fit_markdown elif hasattr(result, 'markdown'): return self.extract_content(result.markdown) except Exception as e: print(f"Error crawling {url}: {str(e)}") return None async def crawl_with_retry( self, url, depth=2, max_pages=5, max_retries=3, backoff_factor=1, session_id=None, human_simulation=True, rotate_user_agent=True, rotate_proxy=True, return_html=False, timeout=10.0 ): """Crawl with retry logic and anti-blocking measures""" async def attempt_crawl(attempt): try: async with async_timeout.timeout(timeout): context = await self.get_browser_context(session_id) return await self.crawl( context, url, depth, max_pages, session_id, human_simulation, rotate_user_agent, rotate_proxy, return_html ) except asyncio.TimeoutError: print(f"Timeout on attempt {attempt} for {url}") raise except Exception as e: print(f"Error on attempt {attempt} for {url}: {e}") raise if not self.is_valid_url(url) and not self.is_html_url(url): print(f"Invalid URL: {url}") return f"No web results found for query: {url}" for attempt in range(max_retries): try: if attempt > 0: # Add delay between retries with exponential backoff delay = backoff_factor * (2 ** (attempt - 1)) await asyncio.sleep(delay) return await attempt_crawl(attempt + 1) except Exception as e: if attempt == max_retries - 1: print(f"Max retries ({max_retries}) reached for {url}") return f"Failed to crawl after {max_retries} attempts: {url}" continue return f"No content found after {max_retries} attempts for: {url}" def extract_content(self, html_content): soup = BeautifulSoup(html_content, 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) return text def cleanup_session(self, session_id): """Clean up a session""" print(f"Cleaning up session {session_id}") if session_id in self.session_pool: self.crawler.crawler_strategy.kill_session(session_id) del self.session_pool[session_id] def cleanup_expired_sessions(self): """Regular cleanup of expired sessions using proper time calculation""" try: current_time = datetime.now() expired_sessions = [] for sid, data in self.session_pool.items(): # Calculate time difference in seconds time_diff = (current_time - data['last_used']).total_seconds() # Check if more than 1 hour (3600 seconds) if time_diff > 3600: expired_sessions.append(sid) # Cleanup expired sessions for session_id in expired_sessions: self.cleanup_session(session_id) except Exception as e: if self.verbose: print(f"Error during session cleanup: {str(e)}") @staticmethod def is_valid_url(url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False @staticmethod def is_html_url(url): return url.endswith(".html") or url.endswith(".htm") class CustomCrawler: def __init__( self, embed_model: str = "HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1", max_concurrent_requests: int = 10, verbose: bool = True ): print(f"๐Ÿฆ€ Initializing the crawler") if verbose else None time.sleep(1) self.embed_model = embed_model self.max_concurrent_requests = max_concurrent_requests self.verbose = verbose self.ua = FakeUserAgent() self.semaphore = asyncio.Semaphore(self.max_concurrent_requests) self.sessions = {} # Intilizing HTML Pruners and Tokenizer print(f"๐Ÿ”ƒ Loading HTML Pruners and Tokenizer with {self.embed_model}") if self.verbose else None self.bm25_html_pruner = BM25HTMLPruner() self.embed_html_pruner = EmbedHTMLPruner( embed_model=self.embed_model, local_inference=True ) self.tokenizer = AutoTokenizer.from_pretrained( self.embed_model, use_fast=True, trust_remote_code=True, device="cuda" if torch.cuda.is_available() else "cpu" ) # Get the model config and set the max context length for the model print(f"๐Ÿ› ๏ธ Getting model configuration for {self.embed_model}") if self.verbose else None self.config = AutoConfig.from_pretrained(self.embed_model) self.tokenizer.max_seq_length = self.config.max_position_embeddings print(f"๐Ÿ“ Setting max context length to {self.tokenizer.max_seq_length}") if self.verbose else None async def create_session(self): session_id = str(uuid.uuid4()) timeout = aiohttp.ClientTimeout(total=600) # Set a 10-minute timeout connector = aiohttp.TCPConnector(limit=self.max_concurrent_requests) # Connection pool self.sessions[session_id] = aiohttp.ClientSession(timeout=timeout, connector=connector) print(f"๐Ÿ”— Created session: {session_id}") if self.verbose else None return session_id async def close_session(self, session_id): session = self.sessions.pop(session_id, None) if session: await session.close() print(f"๐ŸŒ‚ Closed session: {session_id}") if self.verbose else None async def cleanup_expired_sessions(self, expiration_time: int = 600): # Default 10 minutes current_time = time.time() expired_sessions = [] print("๐Ÿ” Checking for expired sessions") if self.verbose else None for session_id, (session, creation_time) in self.sessions.items(): if current_time - creation_time > expiration_time: expired_sessions.append(session_id) for session_id in expired_sessions: await self.close_session(session_id) print(f"๐Ÿ—‘๏ธ Successfully cleaned up all expired sessions") if self.verbose else None @make_async def html_rag( self, query: str, html: str, max_context_length: int = 32000, buffer: int = 2000 ) -> str: if not html: raise Exception("No HTML contents provided.") # Validate HTML structure try: BeautifulSoup(html, 'html.parser') except Exception as e: raise Exception(f"Invalid HTML content: {e}") prompt_for_retrieval = \ """Given a query, your task is to retrieve the most relevant passages that answers and/or is relevant to the query. Query:""" self.embed_html_pruner.query_instruction_for_retrieval = prompt_for_retrieval print(f"๐Ÿงน Pruning HTML for query: {query}") if self.verbose else None cleaned_html = clean_html(html) block_tree, cleaned_html = build_block_tree(cleaned_html, max_node_words=10) block_rankings = self.bm25_html_pruner.calculate_block_rankings(query, cleaned_html, block_tree) max_context_window = max_context_length - buffer pruned_html = self.embed_html_pruner.prune_HTML( cleaned_html, block_tree, block_rankings, self.tokenizer, max_context_window ) print(f"๐Ÿ‘ Successfully pruned HTML for query: {query}") if self.verbose else None return pruned_html async def fetch_page_contents( self, urls: List[str], query: Optional[str] = None, session_id: Optional[str] = None, max_attempts: int = 3, delay: float = 1.0, timeout: float = 10.0, return_type: str = "markdown", rotate_headers: bool = True, ) -> List[Optional[str]]: async def fetch_single_page(url, proxies, session=None, query=query): for attempt in range(max_attempts): print(f"๐Ÿ” Attempt {attempt + 1}/{max_attempts}: Fetching content from {url}") if self.verbose else None content = await self._fetch_page_contents( url=url, query=query, timeout=timeout, return_type=return_type, rotate_headers=rotate_headers, proxies=proxies, session=session ) if content: print(f"โœ… Successfully fetched content from {url}") if self.verbose else None return content else: print(f"๐Ÿšซ Failed to fetch content from {url}. Retrying in {delay} seconds...") if self.verbose else None await asyncio.sleep(delay) print(f"๐Ÿšซ Failed to fetch content from {url} after {max_attempts} attempts.") if self.verbose else None return None proxy_list = self.load_proxies() # Load proxies from environment variables if proxy_list: proxies = proxy_list else: proxies = None if not urls: raise Exception("No URLs provided!") if return_type == "fit_markdown" and query is None: raise Exception("Query must be provided when return_type is 'fit_markdown'!") if session_id: # Use existing session if provided if session_id not in self.sessions: raise ValueError(f"Invalid session ID: {session_id}") session = self.sessions[session_id] tasks = [fetch_single_page(url, proxies, session) for url in urls] # Pass session to tasks else: # No session handling if session_id is None tasks = [fetch_single_page(url, proxies) for url in urls] # No session passed results = await asyncio.gather(*tasks) return [result for result in results if result is not None] async def _fetch_page_contents( self, url: str, query: Optional[str] = None, timeout: float = 10.0, return_type: str = "markdown", rotate_headers: bool = True, proxies: Optional[List[str]] = None, session: Optional[aiohttp.ClientSession] = None ) -> Optional[str]: async def get_content(response, return_type=return_type): print(f"๐Ÿ“„ Getting content from {url}") if self.verbose else None if return_type == "html": return await response.text() response.raise_for_status() content_type = response.headers.get('Content-Type', '').lower() if 'application/pdf' in content_type: content = await response.read() text = self.extract_text_from_pdf(content) return text elif 'text/html' in content_type: html_content = await response.text() if return_type == "fit_markdown": html_content = self.html_rag(query, html_content).wait() soup = BeautifulSoup(html_content, "html.parser") for script_or_style in soup(["script", "style"]): script_or_style.decompose() text = self.html_to_markdown(soup) return text.strip() else: print(f"๐Ÿšซ Unsupported content type {content_type} for URL {url}") if self.verbose else None return None headers = self.get_headers() if rotate_headers else {} proxy = self.get_proxy(proxies) if proxies else None # Split timeout into connection and processing phases timeout_config = aiohttp.ClientTimeout( total=None, # No total timeout connect=timeout, # Connection timeout sock_read=timeout # Socket read timeout ) try: # Use provided session if available if session: async with session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: return await get_content(response) # Otherwise, create a new session for each request else: async with aiohttp.ClientSession() as new_session: async with new_session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: return await get_content(response) except aiohttp.ClientError as e: print(f"๐Ÿšซ Request Exception for {url}: {e}") if self.verbose else None return None except asyncio.TimeoutError as e: print(f"๐Ÿšซ Timeout error for {url}") if self.verbose else None return None except Exception as e: print(f"๐Ÿšซ Unexpected error fetching {url}: {e}") if self.verbose else None return None def load_proxies(self) -> Optional[List[str]]: # Get all environment variables env_vars = dict(os.environ) # Load proxies from environment variables proxy_pattern = re.compile(r"PROXY_\d+") proxies = [env_vars[key] for key in env_vars if proxy_pattern.match(key)] if proxies: print(f"๐Ÿ”Œ Loaded {len(proxies)} proxies from environment variables") if self.verbose else None return proxies else: return None def get_proxy(self, proxies: List[str]) -> str: if proxies: # Check if the proxies list is not empty return next(iter(proxies)) return None # Or raise an exception, handle differently, etc. def get_headers(self) -> Dict[str, str]: return {'User-Agent': self.ua.random} def extract_text_from_pdf(self, pdf_content: bytes) -> str: try: print(f"๐Ÿ“• Extracting text from PDF") if self.verbose else None pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_content)) text = '' for page in pdf_reader.pages: text += page.extract_text() print(f"๐Ÿ’ช Successfully extracted text from PDF") if self.verbose else None return text except Exception as e: print(f"๐Ÿšซ Error extracting text from PDF: {e}") if self.verbose else None return "" def html_to_markdown(self, soup): markdown_text = "" print(f"๐Ÿ“Ÿ Converting HTML to Markdown") if self.verbose else None def process_element(element, indent=0): nonlocal markdown_text if isinstance(element, NavigableString): text = str(element).strip() if text: markdown_text += text + " " return tag = element.name if tag == "h1": markdown_text += "# " + element.text.strip() + "\n\n" elif tag == "h2": markdown_text += "## " + element.text.strip() + "\n\n" elif tag == "h3": markdown_text += "### " + element.text.strip() + "\n\n" elif tag == "h4": markdown_text += "#### " + element.text.strip() + "\n\n" elif tag == "h5": markdown_text += "##### " + element.text.strip() + "\n\n" elif tag == "h6": markdown_text += "###### " + element.text.strip() + "\n\n" elif tag == "p": markdown_text += element.text.strip() + "\n\n" elif tag == "br": markdown_text += "\n" elif tag == "ul": for li in element.find_all("li", recursive=False): markdown_text += " " * indent + "- " process_element(li, indent + 1) markdown_text += "\n" markdown_text += "\n" elif tag == "ol": for i, li in enumerate(element.find_all("li", recursive=False), 1): markdown_text += " " * indent + f"{i}. " process_element(li, indent + 1) markdown_text += "\n" markdown_text += "\n" elif tag == "table": rows = element.find_all("tr") for row in rows: cells = row.find_all(["td", "th"]) row_text = [cell.text.strip() for cell in cells] markdown_text += "| " + " | ".join(row_text) + " |\n" if row == rows[0]: # Header row separator markdown_text += "| " + " | ".join(["---"] * len(cells)) + " |\n" markdown_text += "\n" elif tag == "blockquote": markdown_text += "> " + element.text.strip().replace("\n", "\n> ") + "\n\n" elif tag == "strong" or tag == "b": markdown_text += "**" + element.text.strip() + "**" elif tag == "em" or tag == "i": markdown_text += "*" + element.text.strip() + "*" elif tag == "code": markdown_text += "`" + element.text.strip() + "`" elif tag == "pre": markdown_text += "```\n" + element.text + "\n```\n\n" elif tag == "hr": markdown_text += "---\n\n" else: for child in element.children: process_element(child, indent) process_element(soup) print(f"๐Ÿ‘Œ Successfully converted HTML to Markdown") if self.verbose else None return markdown_text if __name__ == "__main__": import time import winloop URLS = [ "https://en.wikipedia.org/wiki/Treaty_Principles_Bill#:~:text=The%20Treaty%20Principles%20Bill%2C%20or,of%20the%20Treaty%20of%20Waitangi.", "https://www.parliament.nz/en/pb/sc/make-a-submission/document/54SCJUST_SCF_227E6D0B-E632-42EB-CFFE-08DCFEB826C6/principles-of-the-treaty-of-waitangi-bill", "https://en.wikipedia.org/wiki/Waitangi_Tribunal", "https://aljazeera.com/news/2024/11/19/why-are-new-zealands-maori-protesting-over-colonial-era-treaty-bill", "https://downiewenjack.ca/treaty-of-waitangi-treaty-principles-bill/" ]# * 10 # Make 50 requests query = "What is the Treaty of Waitangi Bill?" loop = asyncio.get_event_loop() custom_crawler = CustomCrawler(max_concurrent_requests=1000) session_id = loop.run_until_complete(custom_crawler.create_session()) start = time.perf_counter() winloop.install() result = loop.run_until_complete(custom_crawler.fetch_page_contents( URLS, query, session_id=session_id, timeout=20, max_attempts=1, return_type="fit_markdown", ) ) end = time.perf_counter() loop.run_until_complete(custom_crawler.close_session(session_id)) loop.run_until_complete(custom_crawler.cleanup_expired_sessions()) print("\n\n".join([f"Document {i+1}:\n\n{result[i]}" for i in range(len(result))])) print(f"\n\nTime taken: {end - start} seconds")