|
import time |
|
from urllib.parse import urlparse |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from bs4 import BeautifulSoup, Comment, element, Tag, NavigableString |
|
import json |
|
import html |
|
import re |
|
import os |
|
import platform |
|
from .prompts import PROMPT_EXTRACT_BLOCKS |
|
from .config import * |
|
from pathlib import Path |
|
from typing import Dict, Any |
|
from urllib.parse import urljoin |
|
import requests |
|
from requests.exceptions import InvalidSchema |
|
from typing import Optional, Tuple, Dict, Any |
|
import xxhash |
|
from colorama import Fore, Style, init |
|
import textwrap |
|
import cProfile |
|
import pstats |
|
from functools import wraps |
|
import asyncio |
|
|
|
|
|
class InvalidCSSSelectorError(Exception): |
|
pass |
|
|
|
def create_box_message(message: str, type: str = "info", width: int = 120, add_newlines: bool = True, double_line: bool = False) -> str: |
|
""" |
|
Create a styled message box with colored borders and formatted text. |
|
|
|
How it works: |
|
1. Determines box style and colors based on the message type (e.g., info, warning). |
|
2. Wraps text to fit within the specified width. |
|
3. Constructs a box using characters (single or double lines) with appropriate formatting. |
|
4. Adds optional newlines before and after the box. |
|
|
|
Args: |
|
message (str): The message to display inside the box. |
|
type (str): Type of the message (e.g., "info", "warning", "error", "success"). Defaults to "info". |
|
width (int): Width of the box. Defaults to 120. |
|
add_newlines (bool): Whether to add newlines before and after the box. Defaults to True. |
|
double_line (bool): Whether to use double lines for the box border. Defaults to False. |
|
|
|
Returns: |
|
str: A formatted string containing the styled message box. |
|
""" |
|
|
|
init() |
|
|
|
|
|
styles = { |
|
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, "β "), |
|
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, "βΉ"), |
|
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, "β"), |
|
"error": (Fore.RED, Fore.LIGHTRED_EX, "Γ"), |
|
} |
|
|
|
border_color, text_color, prefix = styles.get(type.lower(), styles["info"]) |
|
|
|
|
|
box_chars = { |
|
"single": ("β", "β", "β", "β", "β", "β"), |
|
"double": ("β", "β", "β", "β", "β", "β") |
|
} |
|
line_style = "double" if double_line else "single" |
|
h_line, v_line, tl, tr, bl, br = box_chars[line_style] |
|
|
|
|
|
formatted_lines = [] |
|
raw_lines = message.split('\n') |
|
|
|
if raw_lines: |
|
first_line = f"{prefix} {raw_lines[0].strip()}" |
|
wrapped_first = textwrap.fill(first_line, width=width-4) |
|
formatted_lines.extend(wrapped_first.split('\n')) |
|
|
|
for line in raw_lines[1:]: |
|
if line.strip(): |
|
wrapped = textwrap.fill(f" {line.strip()}", width=width-4) |
|
formatted_lines.extend(wrapped.split('\n')) |
|
else: |
|
formatted_lines.append("") |
|
|
|
|
|
horizontal_line = h_line * (width - 1) |
|
box = [ |
|
f"{border_color}{tl}{horizontal_line}{tr}", |
|
*[f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" for line in formatted_lines], |
|
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}" |
|
] |
|
|
|
result = "\n".join(box) |
|
if add_newlines: |
|
result = f"\n{result}\n" |
|
|
|
return result |
|
|
|
def calculate_semaphore_count(): |
|
""" |
|
Calculate the optimal semaphore count based on system resources. |
|
|
|
How it works: |
|
1. Determines the number of CPU cores and total system memory. |
|
2. Sets a base count as half of the available CPU cores. |
|
3. Limits the count based on memory, assuming 2GB per semaphore instance. |
|
4. Returns the minimum value between CPU and memory-based limits. |
|
|
|
Returns: |
|
int: The calculated semaphore count. |
|
""" |
|
|
|
cpu_count = os.cpu_count() |
|
memory_gb = get_system_memory() / (1024 ** 3) |
|
base_count = max(1, cpu_count // 2) |
|
memory_based_cap = int(memory_gb / 2) |
|
return min(base_count, memory_based_cap) |
|
|
|
def get_system_memory(): |
|
""" |
|
Get the total system memory in bytes. |
|
|
|
How it works: |
|
1. Detects the operating system. |
|
2. Reads memory information from system-specific commands or files. |
|
3. Converts the memory to bytes for uniformity. |
|
|
|
Returns: |
|
int: The total system memory in bytes. |
|
|
|
Raises: |
|
OSError: If the operating system is unsupported. |
|
""" |
|
|
|
system = platform.system() |
|
if system == "Linux": |
|
with open('/proc/meminfo', 'r') as mem: |
|
for line in mem: |
|
if line.startswith('MemTotal:'): |
|
return int(line.split()[1]) * 1024 |
|
elif system == "Darwin": |
|
import subprocess |
|
output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8') |
|
return int(output.strip()) |
|
elif system == "Windows": |
|
import ctypes |
|
kernel32 = ctypes.windll.kernel32 |
|
c_ulonglong = ctypes.c_ulonglong |
|
class MEMORYSTATUSEX(ctypes.Structure): |
|
_fields_ = [ |
|
('dwLength', ctypes.c_ulong), |
|
('dwMemoryLoad', ctypes.c_ulong), |
|
('ullTotalPhys', c_ulonglong), |
|
('ullAvailPhys', c_ulonglong), |
|
('ullTotalPageFile', c_ulonglong), |
|
('ullAvailPageFile', c_ulonglong), |
|
('ullTotalVirtual', c_ulonglong), |
|
('ullAvailVirtual', c_ulonglong), |
|
('ullAvailExtendedVirtual', c_ulonglong), |
|
] |
|
memoryStatus = MEMORYSTATUSEX() |
|
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX) |
|
kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus)) |
|
return memoryStatus.ullTotalPhys |
|
else: |
|
raise OSError("Unsupported operating system") |
|
|
|
def get_home_folder(): |
|
""" |
|
Get or create the home folder for Crawl4AI configuration and cache. |
|
|
|
How it works: |
|
1. Uses environment variables or defaults to the user's home directory. |
|
2. Creates `.crawl4ai` and its subdirectories (`cache`, `models`) if they don't exist. |
|
3. Returns the path to the home folder. |
|
|
|
Returns: |
|
str: The path to the Crawl4AI home folder. |
|
""" |
|
|
|
home_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), ".crawl4ai") |
|
os.makedirs(home_folder, exist_ok=True) |
|
os.makedirs(f"{home_folder}/cache", exist_ok=True) |
|
os.makedirs(f"{home_folder}/models", exist_ok=True) |
|
return home_folder |
|
|
|
def beautify_html(escaped_html): |
|
""" |
|
Beautifies an escaped HTML string. |
|
|
|
Parameters: |
|
escaped_html (str): A string containing escaped HTML. |
|
|
|
Returns: |
|
str: A beautifully formatted HTML string. |
|
""" |
|
|
|
unescaped_html = html.unescape(escaped_html) |
|
|
|
|
|
soup = BeautifulSoup(unescaped_html, 'html.parser') |
|
pretty_html = soup.prettify() |
|
|
|
return pretty_html |
|
|
|
def split_and_parse_json_objects(json_string): |
|
""" |
|
Splits a JSON string which is a list of objects and tries to parse each object. |
|
|
|
Parameters: |
|
json_string (str): A string representation of a list of JSON objects, e.g., '[{...}, {...}, ...]'. |
|
|
|
Returns: |
|
tuple: A tuple containing two lists: |
|
- First list contains all successfully parsed JSON objects. |
|
- Second list contains the string representations of all segments that couldn't be parsed. |
|
""" |
|
|
|
if json_string.startswith('[') and json_string.endswith(']'): |
|
json_string = json_string[1:-1].strip() |
|
|
|
|
|
segments = [] |
|
depth = 0 |
|
start_index = 0 |
|
|
|
for i, char in enumerate(json_string): |
|
if char == '{': |
|
if depth == 0: |
|
start_index = i |
|
depth += 1 |
|
elif char == '}': |
|
depth -= 1 |
|
if depth == 0: |
|
segments.append(json_string[start_index:i+1]) |
|
|
|
|
|
parsed_objects = [] |
|
unparsed_segments = [] |
|
|
|
for segment in segments: |
|
try: |
|
obj = json.loads(segment) |
|
parsed_objects.append(obj) |
|
except json.JSONDecodeError: |
|
unparsed_segments.append(segment) |
|
|
|
return parsed_objects, unparsed_segments |
|
|
|
def sanitize_html(html): |
|
""" |
|
Sanitize an HTML string by escaping quotes. |
|
|
|
How it works: |
|
1. Replaces all unwanted and special characters with an empty string. |
|
2. Escapes double and single quotes for safe usage. |
|
|
|
Args: |
|
html (str): The HTML string to sanitize. |
|
|
|
Returns: |
|
str: The sanitized HTML string. |
|
""" |
|
|
|
|
|
sanitized_html = html |
|
|
|
|
|
|
|
sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'") |
|
|
|
return sanitized_html |
|
|
|
def sanitize_input_encode(text: str) -> str: |
|
"""Sanitize input to handle potential encoding issues.""" |
|
try: |
|
try: |
|
if not text: |
|
return '' |
|
|
|
return text.encode('utf-8', errors='ignore').decode('utf-8') |
|
except UnicodeEncodeError as e: |
|
print(f"Warning: Encoding issue detected. Some characters may be lost. Error: {e}") |
|
|
|
return text.encode('ascii', errors='ignore').decode('ascii') |
|
except Exception as e: |
|
raise ValueError(f"Error sanitizing input: {str(e)}") from e |
|
|
|
def escape_json_string(s): |
|
""" |
|
Escapes characters in a string to be JSON safe. |
|
|
|
Parameters: |
|
s (str): The input string to be escaped. |
|
|
|
Returns: |
|
str: The escaped string, safe for JSON encoding. |
|
""" |
|
|
|
s = s.replace('\\', '\\\\') |
|
|
|
|
|
s = s.replace('"', '\\"') |
|
|
|
|
|
s = s.replace('\b', '\\b') |
|
s = s.replace('\f', '\\f') |
|
s = s.replace('\n', '\\n') |
|
s = s.replace('\r', '\\r') |
|
s = s.replace('\t', '\\t') |
|
|
|
|
|
|
|
s = re.sub(r'[\x00-\x1f\x7f-\x9f]', lambda x: '\\u{:04x}'.format(ord(x.group())), s) |
|
|
|
return s |
|
|
|
def replace_inline_tags(soup, tags, only_text=False): |
|
""" |
|
Replace inline HTML tags with Markdown-style equivalents. |
|
|
|
How it works: |
|
1. Maps specific tags (e.g., <b>, <i>) to Markdown syntax. |
|
2. Finds and replaces all occurrences of these tags in the provided BeautifulSoup object. |
|
3. Optionally replaces tags with their text content only. |
|
|
|
Args: |
|
soup (BeautifulSoup): Parsed HTML content. |
|
tags (List[str]): List of tags to replace. |
|
only_text (bool): Whether to replace tags with plain text. Defaults to False. |
|
|
|
Returns: |
|
BeautifulSoup: Updated BeautifulSoup object with replaced tags. |
|
""" |
|
|
|
tag_replacements = { |
|
'b': lambda tag: f"**{tag.text}**", |
|
'i': lambda tag: f"*{tag.text}*", |
|
'u': lambda tag: f"__{tag.text}__", |
|
'span': lambda tag: f"{tag.text}", |
|
'del': lambda tag: f"~~{tag.text}~~", |
|
'ins': lambda tag: f"++{tag.text}++", |
|
'sub': lambda tag: f"~{tag.text}~", |
|
'sup': lambda tag: f"^^{tag.text}^^", |
|
'strong': lambda tag: f"**{tag.text}**", |
|
'em': lambda tag: f"*{tag.text}*", |
|
'code': lambda tag: f"`{tag.text}`", |
|
'kbd': lambda tag: f"`{tag.text}`", |
|
'var': lambda tag: f"_{tag.text}_", |
|
's': lambda tag: f"~~{tag.text}~~", |
|
'q': lambda tag: f'"{tag.text}"', |
|
'abbr': lambda tag: f"{tag.text} ({tag.get('title', '')})", |
|
'cite': lambda tag: f"_{tag.text}_", |
|
'dfn': lambda tag: f"_{tag.text}_", |
|
'time': lambda tag: f"{tag.text}", |
|
'small': lambda tag: f"<small>{tag.text}</small>", |
|
'mark': lambda tag: f"=={tag.text}==" |
|
} |
|
|
|
replacement_data = [(tag, tag_replacements.get(tag, lambda t: t.text)) for tag in tags] |
|
|
|
for tag_name, replacement_func in replacement_data: |
|
for tag in soup.find_all(tag_name): |
|
replacement_text = tag.text if only_text else replacement_func(tag) |
|
tag.replace_with(replacement_text) |
|
|
|
return soup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None, **kwargs): |
|
""" |
|
Extract structured content, media, and links from website HTML. |
|
|
|
How it works: |
|
1. Parses the HTML content using BeautifulSoup. |
|
2. Extracts internal/external links and media (images, videos, audios). |
|
3. Cleans the content by removing unwanted tags and attributes. |
|
4. Converts cleaned HTML to Markdown. |
|
5. Collects metadata and returns the extracted information. |
|
|
|
Args: |
|
url (str): The website URL. |
|
html (str): The HTML content of the website. |
|
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. |
|
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. |
|
|
|
Returns: |
|
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. |
|
""" |
|
|
|
try: |
|
if not html: |
|
return None |
|
|
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
|
|
|
body = soup.body |
|
|
|
|
|
if css_selector: |
|
selected_elements = body.select(css_selector) |
|
if not selected_elements: |
|
raise InvalidCSSSelectorError(f"Invalid CSS selector , No elements found for CSS selector: {css_selector}") |
|
div_tag = soup.new_tag('div') |
|
for el in selected_elements: |
|
div_tag.append(el) |
|
body = div_tag |
|
|
|
links = { |
|
'internal': [], |
|
'external': [] |
|
} |
|
|
|
|
|
for a in body.find_all('a', href=True): |
|
href = a['href'] |
|
url_base = url.split('/')[2] |
|
if href.startswith('http') and url_base not in href: |
|
links['external'].append({ |
|
'href': href, |
|
'text': a.get_text() |
|
}) |
|
else: |
|
links['internal'].append( |
|
{ |
|
'href': href, |
|
'text': a.get_text() |
|
} |
|
) |
|
|
|
|
|
for tag in body.find_all(['script', 'style', 'link', 'meta', 'noscript']): |
|
tag.decompose() |
|
|
|
|
|
for tag in body.find_all(): |
|
if tag.name != 'img': |
|
tag.attrs = {} |
|
|
|
|
|
media = { |
|
'images': [], |
|
'videos': [], |
|
'audios': [] |
|
} |
|
for img in body.find_all('img'): |
|
media['images'].append({ |
|
'src': img.get('src'), |
|
'alt': img.get('alt'), |
|
"type": "image" |
|
}) |
|
|
|
|
|
for video in body.find_all('video'): |
|
media['videos'].append({ |
|
'src': video.get('src'), |
|
'alt': video.get('alt'), |
|
"type": "video" |
|
}) |
|
|
|
|
|
for audio in body.find_all('audio'): |
|
media['audios'].append({ |
|
'src': audio.get('src'), |
|
'alt': audio.get('alt'), |
|
"type": "audio" |
|
}) |
|
|
|
|
|
for img in body.find_all('img'): |
|
alt_text = img.get('alt') |
|
if alt_text: |
|
img.replace_with(soup.new_string(alt_text)) |
|
else: |
|
img.decompose() |
|
|
|
|
|
|
|
def replace_pre_tags_with_text(node): |
|
for child in node.find_all('pre'): |
|
|
|
child.string = child.get_text() |
|
return node |
|
|
|
|
|
body = replace_pre_tags_with_text(body) |
|
|
|
|
|
body = replace_inline_tags( |
|
body, |
|
['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark'], |
|
only_text=kwargs.get('only_text', False) |
|
) |
|
|
|
|
|
def remove_empty_and_low_word_count_elements(node, word_count_threshold): |
|
for child in node.contents: |
|
if isinstance(child, element.Tag): |
|
remove_empty_and_low_word_count_elements(child, word_count_threshold) |
|
word_count = len(child.get_text(strip=True).split()) |
|
if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold: |
|
child.decompose() |
|
return node |
|
|
|
body = remove_empty_and_low_word_count_elements(body, word_count_threshold) |
|
|
|
def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD): |
|
|
|
tags_to_remove = [] |
|
|
|
|
|
for tag in body.find_all(True): |
|
|
|
if tag.string and tag.string.strip(): |
|
|
|
word_count = len(tag.string.strip().split()) |
|
|
|
if word_count < word_count_threshold: |
|
tags_to_remove.append(tag) |
|
|
|
|
|
for tag in tags_to_remove: |
|
tag.decompose() |
|
|
|
return body |
|
|
|
|
|
|
|
body = remove_small_text_tags(body, word_count_threshold) |
|
|
|
def is_empty_or_whitespace(tag: Tag): |
|
if isinstance(tag, NavigableString): |
|
return not tag.strip() |
|
|
|
if not tag.contents: |
|
return True |
|
return all(is_empty_or_whitespace(child) for child in tag.contents) |
|
|
|
def remove_empty_tags(body: Tag): |
|
|
|
changes = True |
|
while changes: |
|
changes = False |
|
|
|
empty_tags = [tag for tag in body.find_all(True) if is_empty_or_whitespace(tag)] |
|
for tag in empty_tags: |
|
|
|
tag.decompose() |
|
changes = True |
|
|
|
return body |
|
|
|
|
|
|
|
body = remove_empty_tags(body) |
|
|
|
|
|
def flatten_nested_elements(node): |
|
for child in node.contents: |
|
if isinstance(child, element.Tag): |
|
flatten_nested_elements(child) |
|
if len(child.contents) == 1 and child.contents[0].name == child.name: |
|
|
|
child_content = child.contents[0] |
|
child.replace_with(child_content) |
|
|
|
return node |
|
|
|
body = flatten_nested_elements(body) |
|
|
|
|
|
|
|
|
|
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): |
|
comment.extract() |
|
|
|
|
|
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') |
|
|
|
|
|
cleaned_html = sanitize_html(cleaned_html) |
|
|
|
|
|
|
|
h = html2text.HTML2Text() |
|
h = CustomHTML2Text() |
|
h.ignore_links = True |
|
markdown = h.handle(cleaned_html) |
|
markdown = markdown.replace(' ```', '```') |
|
|
|
try: |
|
meta = extract_metadata(html, soup) |
|
except Exception as e: |
|
print('Error extracting metadata:', str(e)) |
|
meta = {} |
|
|
|
|
|
|
|
return{ |
|
'markdown': markdown, |
|
'cleaned_html': cleaned_html, |
|
'success': True, |
|
'media': media, |
|
'links': links, |
|
'metadata': meta |
|
} |
|
|
|
except Exception as e: |
|
print('Error processing HTML content:', str(e)) |
|
raise InvalidCSSSelectorError(f"Invalid CSS selector: {css_selector}") from e |
|
|
|
def get_content_of_website_optimized(url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: |
|
if not html: |
|
return None |
|
|
|
soup = BeautifulSoup(html, 'html.parser') |
|
body = soup.body |
|
|
|
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) |
|
|
|
for tag in kwargs.get('excluded_tags', []) or []: |
|
for el in body.select(tag): |
|
el.decompose() |
|
|
|
if css_selector: |
|
selected_elements = body.select(css_selector) |
|
if not selected_elements: |
|
raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") |
|
body = soup.new_tag('div') |
|
for el in selected_elements: |
|
body.append(el) |
|
|
|
links = {'internal': [], 'external': []} |
|
media = {'images': [], 'videos': [], 'audios': []} |
|
|
|
|
|
def find_closest_parent_with_useful_text(tag): |
|
current_tag = tag |
|
while current_tag: |
|
current_tag = current_tag.parent |
|
|
|
if current_tag: |
|
text_content = current_tag.get_text(separator=' ',strip=True) |
|
|
|
if len(text_content.split()) >= image_description_min_word_threshold: |
|
return text_content |
|
return None |
|
|
|
def process_image(img, url, index, total_images): |
|
|
|
def is_valid_image(img, parent, parent_classes): |
|
style = img.get('style', '') |
|
src = img.get('src', '') |
|
classes_to_check = ['button', 'icon', 'logo'] |
|
tags_to_check = ['button', 'input'] |
|
return all([ |
|
'display:none' not in style, |
|
src, |
|
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check), |
|
parent.name not in tags_to_check |
|
]) |
|
|
|
|
|
def score_image_for_usefulness(img, base_url, index, images_count): |
|
|
|
def parse_dimension(dimension): |
|
if dimension: |
|
match = re.match(r"(\d+)(\D*)", dimension) |
|
if match: |
|
number = int(match.group(1)) |
|
unit = match.group(2) or 'px' |
|
return number, unit |
|
return None, None |
|
|
|
|
|
def fetch_image_file_size(img, base_url): |
|
|
|
img_url = urljoin(base_url,img.get('src')) |
|
try: |
|
response = requests.head(img_url) |
|
if response.status_code == 200: |
|
return response.headers.get('Content-Length',None) |
|
else: |
|
print(f"Failed to retrieve file size for {img_url}") |
|
return None |
|
except InvalidSchema as e: |
|
return None |
|
finally: |
|
return |
|
|
|
image_height = img.get('height') |
|
height_value, height_unit = parse_dimension(image_height) |
|
image_width = img.get('width') |
|
width_value, width_unit = parse_dimension(image_width) |
|
image_size = 0 |
|
image_format = os.path.splitext(img.get('src',''))[1].lower() |
|
|
|
image_format = image_format.strip('.') |
|
score = 0 |
|
if height_value: |
|
if height_unit == 'px' and height_value > 150: |
|
score += 1 |
|
if height_unit in ['%','vh','vmin','vmax'] and height_value >30: |
|
score += 1 |
|
if width_value: |
|
if width_unit == 'px' and width_value > 150: |
|
score += 1 |
|
if width_unit in ['%','vh','vmin','vmax'] and width_value >30: |
|
score += 1 |
|
if image_size > 10000: |
|
score += 1 |
|
if img.get('alt') != '': |
|
score+=1 |
|
if any(image_format==format for format in ['jpg','png','webp']): |
|
score+=1 |
|
if index/images_count<0.5: |
|
score+=1 |
|
return score |
|
|
|
if not is_valid_image(img, img.parent, img.parent.get('class', [])): |
|
return None |
|
score = score_image_for_usefulness(img, url, index, total_images) |
|
if score <= IMAGE_SCORE_THRESHOLD: |
|
return None |
|
return { |
|
'src': img.get('src', '').replace('\\"', '"').strip(), |
|
'alt': img.get('alt', ''), |
|
'desc': find_closest_parent_with_useful_text(img), |
|
'score': score, |
|
'type': 'image' |
|
} |
|
|
|
def process_element(element: element.PageElement) -> bool: |
|
try: |
|
if isinstance(element, NavigableString): |
|
if isinstance(element, Comment): |
|
element.extract() |
|
return False |
|
|
|
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: |
|
element.decompose() |
|
return False |
|
|
|
keep_element = False |
|
|
|
if element.name == 'a' and element.get('href'): |
|
href = element['href'] |
|
url_base = url.split('/')[2] |
|
link_data = {'href': href, 'text': element.get_text()} |
|
if href.startswith('http') and url_base not in href: |
|
links['external'].append(link_data) |
|
else: |
|
links['internal'].append(link_data) |
|
keep_element = True |
|
|
|
elif element.name == 'img': |
|
return True |
|
|
|
elif element.name in ['video', 'audio']: |
|
media[f"{element.name}s"].append({ |
|
'src': element.get('src'), |
|
'alt': element.get('alt'), |
|
'type': element.name, |
|
'description': find_closest_parent_with_useful_text(element) |
|
}) |
|
source_tags = element.find_all('source') |
|
for source_tag in source_tags: |
|
media[f"{element.name}s"].append({ |
|
'src': source_tag.get('src'), |
|
'alt': element.get('alt'), |
|
'type': element.name, |
|
'description': find_closest_parent_with_useful_text(element) |
|
}) |
|
return True |
|
|
|
if element.name != 'pre': |
|
if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']: |
|
if kwargs.get('only_text', False): |
|
element.replace_with(element.get_text()) |
|
else: |
|
element.unwrap() |
|
elif element.name != 'img': |
|
element.attrs = {} |
|
|
|
|
|
for child in list(element.children): |
|
if isinstance(child, NavigableString) and not isinstance(child, Comment): |
|
if len(child.strip()) > 0: |
|
keep_element = True |
|
else: |
|
if process_element(child): |
|
keep_element = True |
|
|
|
|
|
|
|
if not keep_element: |
|
word_count = len(element.get_text(strip=True).split()) |
|
keep_element = word_count >= word_count_threshold |
|
|
|
if not keep_element: |
|
element.decompose() |
|
|
|
return keep_element |
|
except Exception as e: |
|
print('Error processing element:', str(e)) |
|
return False |
|
|
|
|
|
imgs = body.find_all('img') |
|
media['images'] = [ |
|
result for result in |
|
(process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) |
|
if result is not None |
|
] |
|
|
|
process_element(body) |
|
|
|
def flatten_nested_elements(node): |
|
if isinstance(node, NavigableString): |
|
return node |
|
if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name: |
|
return flatten_nested_elements(node.contents[0]) |
|
node.contents = [flatten_nested_elements(child) for child in node.contents] |
|
return node |
|
|
|
body = flatten_nested_elements(body) |
|
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') |
|
for img in imgs: |
|
try: |
|
src = img.get('src', '') |
|
if base64_pattern.match(src): |
|
img['src'] = base64_pattern.sub('', src) |
|
except: |
|
pass |
|
|
|
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') |
|
cleaned_html = sanitize_html(cleaned_html) |
|
|
|
h = CustomHTML2Text() |
|
h.ignore_links = True |
|
markdown = h.handle(cleaned_html) |
|
markdown = markdown.replace(' ```', '```') |
|
|
|
try: |
|
meta = extract_metadata(html, soup) |
|
except Exception as e: |
|
print('Error extracting metadata:', str(e)) |
|
meta = {} |
|
|
|
return { |
|
'markdown': markdown, |
|
'cleaned_html': cleaned_html, |
|
'success': True, |
|
'media': media, |
|
'links': links, |
|
'metadata': meta |
|
} |
|
|
|
def extract_metadata(html, soup=None): |
|
""" |
|
Extract optimized content, media, and links from website HTML. |
|
|
|
How it works: |
|
1. Similar to `get_content_of_website`, but optimized for performance. |
|
2. Filters and scores images for usefulness. |
|
3. Extracts contextual descriptions for media files. |
|
4. Handles excluded tags and CSS selectors. |
|
5. Cleans HTML and converts it to Markdown. |
|
|
|
Args: |
|
url (str): The website URL. |
|
html (str): The HTML content of the website. |
|
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. |
|
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. |
|
**kwargs: Additional options for customization. |
|
|
|
Returns: |
|
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. |
|
""" |
|
|
|
metadata = {} |
|
|
|
if not html and not soup: |
|
return {} |
|
|
|
if not soup: |
|
soup = BeautifulSoup(html, 'lxml') |
|
|
|
head = soup.head |
|
if not head: |
|
return metadata |
|
|
|
|
|
title_tag = head.find('title') |
|
metadata['title'] = title_tag.string.strip() if title_tag and title_tag.string else None |
|
|
|
|
|
description_tag = head.find('meta', attrs={'name': 'description'}) |
|
metadata['description'] = description_tag.get('content', '').strip() if description_tag else None |
|
|
|
|
|
keywords_tag = head.find('meta', attrs={'name': 'keywords'}) |
|
metadata['keywords'] = keywords_tag.get('content', '').strip() if keywords_tag else None |
|
|
|
|
|
author_tag = head.find('meta', attrs={'name': 'author'}) |
|
metadata['author'] = author_tag.get('content', '').strip() if author_tag else None |
|
|
|
|
|
og_tags = head.find_all('meta', attrs={'property': re.compile(r'^og:')}) |
|
for tag in og_tags: |
|
property_name = tag.get('property', '').strip() |
|
content = tag.get('content', '').strip() |
|
if property_name and content: |
|
metadata[property_name] = content |
|
|
|
|
|
twitter_tags = head.find_all('meta', attrs={'name': re.compile(r'^twitter:')}) |
|
for tag in twitter_tags: |
|
property_name = tag.get('name', '').strip() |
|
content = tag.get('content', '').strip() |
|
if property_name and content: |
|
metadata[property_name] = content |
|
|
|
return metadata |
|
|
|
def extract_xml_tags(string): |
|
""" |
|
Extracts XML tags from a string. |
|
|
|
Args: |
|
string (str): The input string containing XML tags. |
|
|
|
Returns: |
|
List[str]: A list of XML tags extracted from the input string. |
|
""" |
|
tags = re.findall(r'<(\w+)>', string) |
|
return list(set(tags)) |
|
|
|
def extract_xml_data(tags, string): |
|
""" |
|
Extract data for specified XML tags from a string. |
|
|
|
How it works: |
|
1. Searches the string for each tag using regex. |
|
2. Extracts the content within the tags. |
|
3. Returns a dictionary of tag-content pairs. |
|
|
|
Args: |
|
tags (List[str]): The list of XML tags to extract. |
|
string (str): The input string containing XML data. |
|
|
|
Returns: |
|
Dict[str, str]: A dictionary with tag names as keys and extracted content as values. |
|
""" |
|
|
|
data = {} |
|
|
|
for tag in tags: |
|
pattern = f"<{tag}>(.*?)</{tag}>" |
|
match = re.search(pattern, string, re.DOTALL) |
|
if match: |
|
data[tag] = match.group(1).strip() |
|
else: |
|
data[tag] = "" |
|
|
|
return data |
|
|
|
def perform_completion_with_backoff( |
|
provider, |
|
prompt_with_variables, |
|
api_token, |
|
json_response = False, |
|
base_url=None, |
|
**kwargs |
|
): |
|
""" |
|
Perform an API completion request with exponential backoff. |
|
|
|
How it works: |
|
1. Sends a completion request to the API. |
|
2. Retries on rate-limit errors with exponential delays. |
|
3. Returns the API response or an error after all retries. |
|
|
|
Args: |
|
provider (str): The name of the API provider. |
|
prompt_with_variables (str): The input prompt for the completion request. |
|
api_token (str): The API token for authentication. |
|
json_response (bool): Whether to request a JSON response. Defaults to False. |
|
base_url (Optional[str]): The base URL for the API. Defaults to None. |
|
**kwargs: Additional arguments for the API request. |
|
|
|
Returns: |
|
dict: The API response or an error message after all retries. |
|
""" |
|
|
|
from litellm import completion |
|
from litellm.exceptions import RateLimitError |
|
max_attempts = 3 |
|
base_delay = 2 |
|
|
|
extra_args = { |
|
"temperature": 0.01, |
|
'api_key': api_token, |
|
'base_url': base_url |
|
} |
|
if json_response: |
|
extra_args["response_format"] = { "type": "json_object" } |
|
|
|
if kwargs.get("extra_args"): |
|
extra_args.update(kwargs["extra_args"]) |
|
|
|
for attempt in range(max_attempts): |
|
try: |
|
|
|
response =completion( |
|
model=provider, |
|
messages=[ |
|
{"role": "user", "content": prompt_with_variables} |
|
], |
|
**extra_args |
|
) |
|
return response |
|
except RateLimitError as e: |
|
print("Rate limit error:", str(e)) |
|
|
|
|
|
if attempt < max_attempts - 1: |
|
|
|
delay = base_delay * (2 ** attempt) |
|
print(f"Waiting for {delay} seconds before retrying...") |
|
time.sleep(delay) |
|
else: |
|
|
|
return [{ |
|
"index": 0, |
|
"tags": ["error"], |
|
"content": ["Rate limit error. Please try again later."] |
|
}] |
|
|
|
def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None, base_url = None): |
|
""" |
|
Extract content blocks from website HTML using an AI provider. |
|
|
|
How it works: |
|
1. Prepares a prompt by sanitizing and escaping HTML. |
|
2. Sends the prompt to an AI provider with optional retries. |
|
3. Parses the response to extract structured blocks or errors. |
|
|
|
Args: |
|
url (str): The website URL. |
|
html (str): The HTML content of the website. |
|
provider (str): The AI provider for content extraction. Defaults to DEFAULT_PROVIDER. |
|
api_token (Optional[str]): The API token for authentication. Defaults to None. |
|
base_url (Optional[str]): The base URL for the API. Defaults to None. |
|
|
|
Returns: |
|
List[dict]: A list of extracted content blocks. |
|
""" |
|
|
|
|
|
api_token = PROVIDER_MODELS.get(provider, None) if not api_token else api_token |
|
|
|
variable_values = { |
|
"URL": url, |
|
"HTML": escape_json_string(sanitize_html(html)), |
|
} |
|
|
|
prompt_with_variables = PROMPT_EXTRACT_BLOCKS |
|
for variable in variable_values: |
|
prompt_with_variables = prompt_with_variables.replace( |
|
"{" + variable + "}", variable_values[variable] |
|
) |
|
|
|
response = perform_completion_with_backoff(provider, prompt_with_variables, api_token, base_url=base_url) |
|
|
|
try: |
|
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] |
|
blocks = json.loads(blocks) |
|
|
|
for block in blocks: |
|
block['error'] = False |
|
except Exception as e: |
|
parsed, unparsed = split_and_parse_json_objects(response.choices[0].message.content) |
|
blocks = parsed |
|
|
|
if unparsed: |
|
blocks.append({ |
|
"index": 0, |
|
"error": True, |
|
"tags": ["error"], |
|
"content": unparsed |
|
}) |
|
return blocks |
|
|
|
def extract_blocks_batch(batch_data, provider = "groq/llama3-70b-8192", api_token = None): |
|
""" |
|
Extract content blocks from a batch of website HTMLs. |
|
|
|
How it works: |
|
1. Prepares prompts for each URL and HTML pair. |
|
2. Sends the prompts to the AI provider in a batch request. |
|
3. Parses the responses to extract structured blocks or errors. |
|
|
|
Args: |
|
batch_data (List[Tuple[str, str]]): A list of (URL, HTML) pairs. |
|
provider (str): The AI provider for content extraction. Defaults to "groq/llama3-70b-8192". |
|
api_token (Optional[str]): The API token for authentication. Defaults to None. |
|
|
|
Returns: |
|
List[dict]: A list of extracted content blocks from all batch items. |
|
""" |
|
|
|
api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token |
|
from litellm import batch_completion |
|
messages = [] |
|
|
|
for url, html in batch_data: |
|
variable_values = { |
|
"URL": url, |
|
"HTML": html, |
|
} |
|
|
|
prompt_with_variables = PROMPT_EXTRACT_BLOCKS |
|
for variable in variable_values: |
|
prompt_with_variables = prompt_with_variables.replace( |
|
"{" + variable + "}", variable_values[variable] |
|
) |
|
|
|
messages.append([{"role": "user", "content": prompt_with_variables}]) |
|
|
|
|
|
responses = batch_completion( |
|
model = provider, |
|
messages = messages, |
|
temperature = 0.01 |
|
) |
|
|
|
all_blocks = [] |
|
for response in responses: |
|
try: |
|
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] |
|
blocks = json.loads(blocks) |
|
|
|
except Exception as e: |
|
blocks = [{ |
|
"index": 0, |
|
"tags": ["error"], |
|
"content": ["Error extracting blocks from the HTML content. Choose another provider/model or try again."], |
|
"questions": ["What went wrong during the block extraction process?"] |
|
}] |
|
all_blocks.append(blocks) |
|
|
|
return sum(all_blocks, []) |
|
|
|
def merge_chunks_based_on_token_threshold(chunks, token_threshold): |
|
""" |
|
Merges small chunks into larger ones based on the total token threshold. |
|
|
|
:param chunks: List of text chunks to be merged based on token count. |
|
:param token_threshold: Max number of tokens for each merged chunk. |
|
:return: List of merged text chunks. |
|
""" |
|
merged_sections = [] |
|
current_chunk = [] |
|
total_token_so_far = 0 |
|
|
|
for chunk in chunks: |
|
chunk_token_count = len(chunk.split()) * 1.3 |
|
if total_token_so_far + chunk_token_count < token_threshold: |
|
current_chunk.append(chunk) |
|
total_token_so_far += chunk_token_count |
|
else: |
|
if current_chunk: |
|
merged_sections.append('\n\n'.join(current_chunk)) |
|
current_chunk = [chunk] |
|
total_token_so_far = chunk_token_count |
|
|
|
|
|
if current_chunk: |
|
merged_sections.append('\n\n'.join(current_chunk)) |
|
|
|
return merged_sections |
|
|
|
def process_sections(url: str, sections: list, provider: str, api_token: str, base_url=None) -> list: |
|
""" |
|
Process sections of HTML content sequentially or in parallel. |
|
|
|
How it works: |
|
1. Sequentially processes sections with delays for "groq/" providers. |
|
2. Uses ThreadPoolExecutor for parallel processing with other providers. |
|
3. Extracts content blocks for each section. |
|
|
|
Args: |
|
url (str): The website URL. |
|
sections (List[str]): The list of HTML sections to process. |
|
provider (str): The AI provider for content extraction. |
|
api_token (str): The API token for authentication. |
|
base_url (Optional[str]): The base URL for the API. Defaults to None. |
|
|
|
Returns: |
|
List[dict]: The list of extracted content blocks from all sections. |
|
""" |
|
|
|
extracted_content = [] |
|
if provider.startswith("groq/"): |
|
|
|
for section in sections: |
|
extracted_content.extend(extract_blocks(url, section, provider, api_token, base_url=base_url)) |
|
time.sleep(0.5) |
|
else: |
|
|
|
with ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(extract_blocks, url, section, provider, api_token, base_url=base_url) for section in sections] |
|
for future in as_completed(futures): |
|
extracted_content.extend(future.result()) |
|
|
|
return extracted_content |
|
|
|
def wrap_text(draw, text, font, max_width): |
|
""" |
|
Wrap text to fit within a specified width for rendering. |
|
|
|
How it works: |
|
1. Splits the text into words. |
|
2. Constructs lines that fit within the maximum width using the provided font. |
|
3. Returns the wrapped text as a single string. |
|
|
|
Args: |
|
draw (ImageDraw.Draw): The drawing context for measuring text size. |
|
text (str): The text to wrap. |
|
font (ImageFont.FreeTypeFont): The font to use for measuring text size. |
|
max_width (int): The maximum width for each line. |
|
|
|
Returns: |
|
str: The wrapped text. |
|
""" |
|
|
|
|
|
lines = [] |
|
words = text.split() |
|
while words: |
|
line = '' |
|
while words and draw.textbbox((0, 0), line + words[0], font=font)[2] <= max_width: |
|
line += (words.pop(0) + ' ') |
|
lines.append(line) |
|
return '\n'.join(lines) |
|
|
|
def format_html(html_string): |
|
""" |
|
Prettify an HTML string using BeautifulSoup. |
|
|
|
How it works: |
|
1. Parses the HTML string with BeautifulSoup. |
|
2. Formats the HTML with proper indentation. |
|
3. Returns the prettified HTML string. |
|
|
|
Args: |
|
html_string (str): The HTML string to format. |
|
|
|
Returns: |
|
str: The prettified HTML string. |
|
""" |
|
|
|
soup = BeautifulSoup(html_string, 'lxml.parser') |
|
return soup.prettify() |
|
|
|
def fast_format_html(html_string): |
|
""" |
|
A fast HTML formatter that uses string operations instead of parsing. |
|
|
|
Args: |
|
html_string (str): The HTML string to format |
|
|
|
Returns: |
|
str: The formatted HTML string |
|
""" |
|
|
|
indent = 0 |
|
indent_str = " " |
|
formatted = [] |
|
in_content = False |
|
|
|
|
|
parts = html_string.replace('>', '>\n').replace('<', '\n<').split('\n') |
|
|
|
for part in parts: |
|
if not part.strip(): |
|
continue |
|
|
|
|
|
if part.startswith('</'): |
|
indent -= 1 |
|
formatted.append(indent_str * indent + part) |
|
|
|
|
|
elif part.startswith('<') and part.endswith('/>'): |
|
formatted.append(indent_str * indent + part) |
|
|
|
|
|
elif part.startswith('<'): |
|
formatted.append(indent_str * indent + part) |
|
indent += 1 |
|
|
|
|
|
else: |
|
content = part.strip() |
|
if content: |
|
formatted.append(indent_str * indent + content) |
|
|
|
return '\n'.join(formatted) |
|
|
|
def normalize_url(href, base_url): |
|
"""Normalize URLs to ensure consistent format""" |
|
from urllib.parse import urljoin, urlparse |
|
|
|
|
|
parsed_base = urlparse(base_url) |
|
if not parsed_base.scheme or not parsed_base.netloc: |
|
raise ValueError(f"Invalid base URL format: {base_url}") |
|
|
|
|
|
normalized = urljoin(base_url, href.strip()) |
|
return normalized |
|
|
|
def normalize_url_tmp(href, base_url): |
|
"""Normalize URLs to ensure consistent format""" |
|
|
|
try: |
|
base_parts = base_url.split('/') |
|
protocol = base_parts[0] |
|
domain = base_parts[2] |
|
except IndexError: |
|
raise ValueError(f"Invalid base URL format: {base_url}") |
|
|
|
|
|
special_protocols = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} |
|
if any(href.lower().startswith(proto) for proto in special_protocols): |
|
return href.strip() |
|
|
|
|
|
if href.startswith('#'): |
|
return f"{base_url}{href}" |
|
|
|
|
|
if href.startswith('//'): |
|
return f"{protocol}{href}" |
|
|
|
|
|
if href.startswith('/'): |
|
return f"{protocol}//{domain}{href}" |
|
|
|
|
|
if not href.startswith(('http://', 'https://')): |
|
|
|
href = href.lstrip('./') |
|
return f"{protocol}//{domain}/{href}" |
|
|
|
return href.strip() |
|
|
|
def get_base_domain(url: str) -> str: |
|
""" |
|
Extract the base domain from a given URL, handling common edge cases. |
|
|
|
How it works: |
|
1. Parses the URL to extract the domain. |
|
2. Removes the port number and 'www' prefix. |
|
3. Handles special domains (e.g., 'co.uk') to extract the correct base. |
|
|
|
Args: |
|
url (str): The URL to extract the base domain from. |
|
|
|
Returns: |
|
str: The extracted base domain or an empty string if parsing fails. |
|
""" |
|
try: |
|
|
|
domain = urlparse(url).netloc.lower() |
|
if not domain: |
|
return "" |
|
|
|
|
|
domain = domain.split(':')[0] |
|
|
|
|
|
domain = re.sub(r'^www\.', '', domain) |
|
|
|
|
|
parts = domain.split('.') |
|
if len(parts) > 2 and parts[-2] in { |
|
'co', 'com', 'org', 'gov', 'edu', 'net', |
|
'mil', 'int', 'ac', 'ad', 'ae', 'af', 'ag' |
|
}: |
|
return '.'.join(parts[-3:]) |
|
|
|
return '.'.join(parts[-2:]) |
|
except Exception: |
|
return "" |
|
|
|
def is_external_url(url: str, base_domain: str) -> bool: |
|
""" |
|
Extract the base domain from a given URL, handling common edge cases. |
|
|
|
How it works: |
|
1. Parses the URL to extract the domain. |
|
2. Removes the port number and 'www' prefix. |
|
3. Handles special domains (e.g., 'co.uk') to extract the correct base. |
|
|
|
Args: |
|
url (str): The URL to extract the base domain from. |
|
|
|
Returns: |
|
str: The extracted base domain or an empty string if parsing fails. |
|
""" |
|
special = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} |
|
if any(url.lower().startswith(p) for p in special): |
|
return True |
|
|
|
try: |
|
parsed = urlparse(url) |
|
if not parsed.netloc: |
|
return False |
|
|
|
|
|
url_domain = parsed.netloc.lower().replace('www.', '') |
|
base = base_domain.lower().replace('www.', '') |
|
|
|
|
|
return not url_domain.endswith(base) |
|
except Exception: |
|
return False |
|
|
|
def clean_tokens(tokens: list[str]) -> list[str]: |
|
""" |
|
Clean a list of tokens by removing noise, stop words, and short tokens. |
|
|
|
How it works: |
|
1. Defines a set of noise words and stop words. |
|
2. Filters tokens based on length and exclusion criteria. |
|
3. Excludes tokens starting with certain symbols (e.g., "β", "β²"). |
|
|
|
Args: |
|
tokens (list[str]): The list of tokens to clean. |
|
|
|
Returns: |
|
list[str]: The cleaned list of tokens. |
|
""" |
|
|
|
|
|
noise = {'ccp', 'up', 'β', 'β²', 'β¬οΈ', 'a', 'an', 'at', 'by', 'in', 'of', 'on', 'to', 'the'} |
|
|
|
STOP_WORDS = { |
|
'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by', 'for', 'from', |
|
'has', 'he', 'in', 'is', 'it', 'its', 'of', 'on', 'that', 'the', |
|
'to', 'was', 'were', 'will', 'with', |
|
|
|
|
|
'i', 'you', 'he', 'she', 'it', 'we', 'they', |
|
'me', 'him', 'her', 'us', 'them', |
|
'my', 'your', 'his', 'her', 'its', 'our', 'their', |
|
'mine', 'yours', 'hers', 'ours', 'theirs', |
|
'myself', 'yourself', 'himself', 'herself', 'itself', 'ourselves', 'themselves', |
|
|
|
|
|
'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', |
|
'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', |
|
|
|
|
|
'about', 'above', 'across', 'after', 'against', 'along', 'among', 'around', |
|
'at', 'before', 'behind', 'below', 'beneath', 'beside', 'between', 'beyond', |
|
'by', 'down', 'during', 'except', 'for', 'from', 'in', 'inside', 'into', |
|
'near', 'of', 'off', 'on', 'out', 'outside', 'over', 'past', 'through', |
|
'to', 'toward', 'under', 'underneath', 'until', 'up', 'upon', 'with', 'within', |
|
|
|
|
|
'and', 'but', 'or', 'nor', 'for', 'yet', 'so', |
|
'although', 'because', 'since', 'unless', |
|
|
|
|
|
'a', 'an', 'the', |
|
|
|
|
|
'this', 'that', 'these', 'those', |
|
'what', 'which', 'who', 'whom', 'whose', |
|
'when', 'where', 'why', 'how', |
|
'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', |
|
'can', 'cannot', "can't", 'could', "couldn't", |
|
'may', 'might', 'must', "mustn't", |
|
'shall', 'should', "shouldn't", |
|
'will', "won't", 'would', "wouldn't", |
|
'not', "n't", 'no', 'nor', 'none' |
|
} |
|
|
|
|
|
return [token for token in tokens |
|
if len(token) > 2 |
|
and token not in noise |
|
and token not in STOP_WORDS |
|
and not token.startswith('β') |
|
and not token.startswith('β²') |
|
and not token.startswith('β¬')] |
|
|
|
def profile_and_time(func): |
|
""" |
|
Decorator to profile a function's execution time and performance. |
|
|
|
How it works: |
|
1. Records the start time before executing the function. |
|
2. Profiles the function's execution using `cProfile`. |
|
3. Prints the elapsed time and profiling statistics. |
|
|
|
Args: |
|
func (Callable): The function to decorate. |
|
|
|
Returns: |
|
Callable: The decorated function with profiling and timing enabled. |
|
""" |
|
|
|
@wraps(func) |
|
def wrapper(self, *args, **kwargs): |
|
|
|
start_time = time.perf_counter() |
|
|
|
|
|
profiler = cProfile.Profile() |
|
profiler.enable() |
|
|
|
|
|
result = func(self, *args, **kwargs) |
|
|
|
|
|
profiler.disable() |
|
|
|
|
|
elapsed_time = time.perf_counter() - start_time |
|
|
|
|
|
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds") |
|
|
|
|
|
stats = pstats.Stats(profiler) |
|
stats.sort_stats('cumulative') |
|
stats.print_stats(20) |
|
|
|
return result |
|
return wrapper |
|
|
|
def generate_content_hash(content: str) -> str: |
|
"""Generate a unique hash for content""" |
|
return xxhash.xxh64(content.encode()).hexdigest() |
|
|
|
|
|
def ensure_content_dirs(base_path: str) -> Dict[str, str]: |
|
"""Create content directories if they don't exist""" |
|
dirs = { |
|
'html': 'html_content', |
|
'cleaned': 'cleaned_html', |
|
'markdown': 'markdown_content', |
|
'extracted': 'extracted_content', |
|
'screenshots': 'screenshots', |
|
'screenshot': 'screenshots' |
|
} |
|
|
|
content_paths = {} |
|
for key, dirname in dirs.items(): |
|
path = os.path.join(base_path, dirname) |
|
os.makedirs(path, exist_ok=True) |
|
content_paths[key] = path |
|
|
|
return content_paths |
|
|
|
def configure_windows_event_loop(): |
|
""" |
|
Configure the Windows event loop to use ProactorEventLoop. |
|
This resolves the NotImplementedError that occurs on Windows when using asyncio subprocesses. |
|
|
|
This function should only be called on Windows systems and before any async operations. |
|
On non-Windows systems, this function does nothing. |
|
|
|
Example: |
|
```python |
|
from crawl4ai.async_configs import configure_windows_event_loop |
|
|
|
# Call this before any async operations if you're on Windows |
|
configure_windows_event_loop() |
|
``` |
|
""" |
|
if platform.system() == 'Windows': |
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) |
|
|
|
def get_error_context(exc_info, context_lines: int = 5): |
|
""" |
|
Extract error context with more reliable line number tracking. |
|
|
|
Args: |
|
exc_info: The exception info from sys.exc_info() |
|
context_lines: Number of lines to show before and after the error |
|
|
|
Returns: |
|
dict: Error context information |
|
""" |
|
import traceback |
|
import linecache |
|
import os |
|
|
|
|
|
tb = traceback.extract_tb(exc_info[2]) |
|
|
|
|
|
last_frame = tb[-1] |
|
filename = last_frame.filename |
|
line_no = last_frame.lineno |
|
func_name = last_frame.name |
|
|
|
|
|
|
|
context_start = max(1, line_no - context_lines) |
|
context_end = line_no + context_lines + 1 |
|
|
|
|
|
context_lines = [] |
|
for i in range(context_start, context_end): |
|
line = linecache.getline(filename, i) |
|
if line: |
|
|
|
line = line.rstrip() |
|
pointer = 'β' if i == line_no else ' ' |
|
context_lines.append(f"{i:4d} {pointer} {line}") |
|
|
|
|
|
code_context = '\n'.join(context_lines) |
|
|
|
|
|
try: |
|
rel_path = os.path.relpath(filename) |
|
except ValueError: |
|
|
|
rel_path = filename |
|
|
|
return { |
|
"filename": rel_path, |
|
"line_no": line_no, |
|
"function": func_name, |
|
"code_context": code_context |
|
} |
|
|
|
|
|
|