|
import re |
|
import time |
|
from abc import ABC, abstractmethod |
|
from typing import Dict, Any, Optional |
|
from bs4 import BeautifulSoup |
|
from concurrent.futures import ThreadPoolExecutor |
|
import asyncio, requests, re, os |
|
from .config import * |
|
from bs4 import element, NavigableString, Comment |
|
from bs4 import PageElement, Tag |
|
from urllib.parse import urljoin |
|
from requests.exceptions import InvalidSchema |
|
|
|
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter |
|
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator |
|
from .models import MarkdownGenerationResult |
|
from .utils import ( |
|
extract_metadata, |
|
normalize_url, |
|
is_external_url, |
|
get_base_domain, |
|
) |
|
|
|
|
|
|
|
OG_REGEX = re.compile(r'^og:') |
|
TWITTER_REGEX = re.compile(r'^twitter:') |
|
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)") |
|
|
|
|
|
def parse_dimension(dimension): |
|
if dimension: |
|
|
|
match = DIMENSION_REGEX.match(dimension) |
|
if match: |
|
number = int(match.group(1)) |
|
unit = match.group(2) or 'px' |
|
return number, unit |
|
return None, None |
|
|
|
|
|
def fetch_image_file_size(img, base_url): |
|
|
|
img_url = urljoin(base_url,img.get('src')) |
|
try: |
|
response = requests.head(img_url) |
|
if response.status_code == 200: |
|
return response.headers.get('Content-Length',None) |
|
else: |
|
print(f"Failed to retrieve file size for {img_url}") |
|
return None |
|
except InvalidSchema as e: |
|
return None |
|
finally: |
|
return |
|
|
|
class ContentScrapingStrategy(ABC): |
|
@abstractmethod |
|
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
|
pass |
|
|
|
@abstractmethod |
|
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
|
pass |
|
|
|
class WebScrapingStrategy(ContentScrapingStrategy): |
|
""" |
|
Class for web content scraping. Perhaps the most important class. |
|
|
|
How it works: |
|
1. Extract content from HTML using BeautifulSoup. |
|
2. Clean the extracted content using a content cleaning strategy. |
|
3. Filter the cleaned content using a content filtering strategy. |
|
4. Generate markdown content from the filtered content. |
|
5. Return the markdown content. |
|
""" |
|
|
|
def __init__(self, logger=None): |
|
self.logger = logger |
|
|
|
def _log(self, level, message, tag="SCRAPE", **kwargs): |
|
"""Helper method to safely use logger.""" |
|
if self.logger: |
|
log_method = getattr(self.logger, level) |
|
log_method(message=message, tag=tag, **kwargs) |
|
|
|
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
|
""" |
|
Main entry point for content scraping. |
|
|
|
Args: |
|
url (str): The URL of the page to scrape. |
|
html (str): The HTML content of the page. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: |
|
|
|
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. |
|
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. |
|
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. |
|
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' |
|
""" |
|
return self._scrap(url, html, is_async=False, **kwargs) |
|
|
|
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
|
""" |
|
Main entry point for asynchronous content scraping. |
|
|
|
Args: |
|
url (str): The URL of the page to scrape. |
|
html (str): The HTML content of the page. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: |
|
|
|
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. |
|
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. |
|
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. |
|
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' |
|
""" |
|
return await asyncio.to_thread(self._scrap, url, html, **kwargs) |
|
|
|
def flatten_nested_elements(self, node): |
|
""" |
|
Flatten nested elements in a HTML tree. |
|
|
|
Args: |
|
node (Tag): The root node of the HTML tree. |
|
|
|
Returns: |
|
Tag: The flattened HTML tree. |
|
""" |
|
if isinstance(node, NavigableString): |
|
return node |
|
if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name: |
|
return self.flatten_nested_elements(node.contents[0]) |
|
node.contents = [self.flatten_nested_elements(child) for child in node.contents] |
|
return node |
|
|
|
def find_closest_parent_with_useful_text(self, tag, **kwargs): |
|
""" |
|
Find the closest parent with useful text. |
|
|
|
Args: |
|
tag (Tag): The starting tag to search from. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
Tag: The closest parent with useful text, or None if not found. |
|
""" |
|
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) |
|
current_tag = tag |
|
while current_tag: |
|
current_tag = current_tag.parent |
|
|
|
if current_tag: |
|
text_content = current_tag.get_text(separator=' ',strip=True) |
|
|
|
if len(text_content.split()) >= image_description_min_word_threshold: |
|
return text_content |
|
return None |
|
|
|
def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False): |
|
""" |
|
Remove unwanted attributes from an HTML element. |
|
|
|
Args: |
|
element (Tag): The HTML element to remove attributes from. |
|
important_attrs (list): List of important attributes to keep. |
|
keep_data_attributes (bool): Whether to keep data attributes. |
|
|
|
Returns: |
|
None |
|
""" |
|
attrs_to_remove = [] |
|
for attr in element.attrs: |
|
if attr not in important_attrs: |
|
if keep_data_attributes: |
|
if not attr.startswith('data-'): |
|
attrs_to_remove.append(attr) |
|
else: |
|
attrs_to_remove.append(attr) |
|
|
|
for attr in attrs_to_remove: |
|
del element[attr] |
|
|
|
def process_image(self, img, url, index, total_images, **kwargs): |
|
""" |
|
Process an image element. |
|
|
|
How it works: |
|
1. Check if the image has valid display and inside undesired html elements. |
|
2. Score an image for it's usefulness. |
|
3. Extract image file metadata to extract size and extension. |
|
4. Generate a dictionary with the processed image information. |
|
5. Return the processed image information. |
|
|
|
Args: |
|
img (Tag): The image element to process. |
|
url (str): The URL of the page containing the image. |
|
index (int): The index of the image in the list of images. |
|
total_images (int): The total number of images in the list. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
dict: A dictionary containing the processed image information. |
|
""" |
|
parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') |
|
if ' ' in u else None} |
|
for u in [f"http{p}" for p in s.split("http") if p]] |
|
|
|
|
|
classes_to_check = frozenset(['button', 'icon', 'logo']) |
|
tags_to_check = frozenset(['button', 'input']) |
|
image_formats = frozenset(['jpg', 'jpeg', 'png', 'webp', 'avif', 'gif']) |
|
|
|
|
|
style = img.get('style', '') |
|
alt = img.get('alt', '') |
|
src = img.get('src', '') |
|
data_src = img.get('data-src', '') |
|
srcset = img.get('srcset', '') |
|
data_srcset = img.get('data-srcset', '') |
|
width = img.get('width') |
|
height = img.get('height') |
|
parent = img.parent |
|
parent_classes = parent.get('class', []) |
|
|
|
|
|
if ('display:none' in style or |
|
parent.name in tags_to_check or |
|
any(c in cls for c in parent_classes for cls in classes_to_check) or |
|
any(c in src for c in classes_to_check) or |
|
any(c in alt for c in classes_to_check)): |
|
return None |
|
|
|
|
|
score = 0 |
|
if width and width.isdigit(): |
|
width_val = int(width) |
|
score += 1 if width_val > 150 else 0 |
|
if height and height.isdigit(): |
|
height_val = int(height) |
|
score += 1 if height_val > 150 else 0 |
|
if alt: |
|
score += 1 |
|
score += index/total_images < 0.5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def has_image_format(url): |
|
return any(fmt in url.lower() for fmt in image_formats) |
|
|
|
|
|
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]): |
|
score += 1 |
|
if srcset or data_srcset: |
|
score += 1 |
|
if img.find_parent('picture'): |
|
score += 1 |
|
|
|
|
|
detected_format = None |
|
for url in [src, data_src, srcset, data_srcset]: |
|
if url: |
|
format_matches = [fmt for fmt in image_formats if fmt in url.lower()] |
|
if format_matches: |
|
detected_format = format_matches[0] |
|
break |
|
|
|
if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): |
|
return None |
|
|
|
|
|
unique_urls = set() |
|
image_variants = [] |
|
|
|
|
|
group_id = index |
|
|
|
|
|
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) |
|
base_info = { |
|
'alt': alt, |
|
'desc': self.find_closest_parent_with_useful_text(img, **kwargs), |
|
'score': score, |
|
'type': 'image', |
|
'group_id': group_id, |
|
'format': detected_format, |
|
} |
|
|
|
|
|
def add_variant(src, width=None): |
|
if src and not src.startswith('data:') and src not in unique_urls: |
|
unique_urls.add(src) |
|
image_variants.append({**base_info, 'src': src, 'width': width}) |
|
|
|
|
|
add_variant(src) |
|
add_variant(data_src) |
|
|
|
|
|
for attr in ('srcset', 'data-srcset'): |
|
if value := img.get(attr): |
|
for source in parse_srcset(value): |
|
add_variant(source['url'], source['width']) |
|
|
|
|
|
if picture := img.find_parent('picture'): |
|
for source in picture.find_all('source'): |
|
if srcset := source.get('srcset'): |
|
for src in parse_srcset(srcset): |
|
add_variant(src['url'], src['width']) |
|
|
|
|
|
for attr, value in img.attrs.items(): |
|
if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value: |
|
add_variant(value) |
|
|
|
return image_variants if image_variants else None |
|
|
|
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]: |
|
""" |
|
Process an HTML element. |
|
|
|
How it works: |
|
1. Check if the element is an image, video, or audio. |
|
2. Extract the element's attributes and content. |
|
3. Process the element based on its type. |
|
4. Return the processed element information. |
|
|
|
Args: |
|
url (str): The URL of the page containing the element. |
|
element (Tag): The HTML element to process. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
dict: A dictionary containing the processed element information. |
|
""" |
|
media = {'images': [], 'videos': [], 'audios': []} |
|
internal_links_dict = {} |
|
external_links_dict = {} |
|
self._process_element( |
|
url, |
|
element, |
|
media, |
|
internal_links_dict, |
|
external_links_dict, |
|
**kwargs |
|
) |
|
return { |
|
'media': media, |
|
'internal_links_dict': internal_links_dict, |
|
'external_links_dict': external_links_dict |
|
} |
|
|
|
def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool: |
|
""" |
|
Process an HTML element. |
|
""" |
|
try: |
|
if isinstance(element, NavigableString): |
|
if isinstance(element, Comment): |
|
element.extract() |
|
return False |
|
|
|
|
|
|
|
|
|
base_domain = kwargs.get("base_domain", get_base_domain(url)) |
|
|
|
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: |
|
element.decompose() |
|
return False |
|
|
|
keep_element = False |
|
|
|
exclude_domains = kwargs.get('exclude_domains', []) |
|
|
|
|
|
|
|
|
|
try: |
|
if element.name == 'a' and element.get('href'): |
|
href = element.get('href', '').strip() |
|
if not href: |
|
return False |
|
|
|
url_base = url.split('/')[2] |
|
|
|
|
|
try: |
|
normalized_href = normalize_url(href, url) |
|
except ValueError as e: |
|
|
|
return False |
|
|
|
link_data = { |
|
'href': normalized_href, |
|
'text': element.get_text().strip(), |
|
'title': element.get('title', '').strip(), |
|
'base_domain': base_domain |
|
} |
|
|
|
is_external = is_external_url(normalized_href, base_domain) |
|
|
|
keep_element = True |
|
|
|
|
|
if is_external: |
|
link_base_domain = get_base_domain(normalized_href) |
|
link_data['base_domain'] = link_base_domain |
|
if kwargs.get('exclude_external_links', False): |
|
element.decompose() |
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif exclude_domains: |
|
if link_base_domain in exclude_domains: |
|
element.decompose() |
|
return False |
|
|
|
|
|
|
|
|
|
if is_external: |
|
if normalized_href not in external_links_dict: |
|
external_links_dict[normalized_href] = link_data |
|
else: |
|
if normalized_href not in internal_links_dict: |
|
internal_links_dict[normalized_href] = link_data |
|
|
|
|
|
except Exception as e: |
|
raise Exception(f"Error processing links: {str(e)}") |
|
|
|
try: |
|
if element.name == 'img': |
|
potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original'] |
|
src = element.get('src', '') |
|
while not src and potential_sources: |
|
src = element.get(potential_sources.pop(0), '') |
|
if not src: |
|
element.decompose() |
|
return False |
|
|
|
|
|
if 'srcset' in element.attrs: |
|
src = element.attrs['srcset'].split(',')[0].split(' ')[0] |
|
|
|
|
|
if not is_external_url(src, base_domain): |
|
return True |
|
|
|
image_src_base_domain = get_base_domain(src) |
|
|
|
|
|
if kwargs.get('exclude_external_images', False): |
|
element.decompose() |
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if exclude_domains: |
|
if image_src_base_domain in exclude_domains: |
|
element.decompose() |
|
return False |
|
|
|
|
|
|
|
|
|
return True |
|
except Exception as e: |
|
raise "Error processing images" |
|
|
|
|
|
|
|
if kwargs.get('remove_forms', False) and element.name == 'form': |
|
element.decompose() |
|
return False |
|
|
|
if element.name in ['video', 'audio']: |
|
media[f"{element.name}s"].append({ |
|
'src': element.get('src'), |
|
'alt': element.get('alt'), |
|
'type': element.name, |
|
'description': self.find_closest_parent_with_useful_text(element, **kwargs) |
|
}) |
|
source_tags = element.find_all('source') |
|
for source_tag in source_tags: |
|
media[f"{element.name}s"].append({ |
|
'src': source_tag.get('src'), |
|
'alt': element.get('alt'), |
|
'type': element.name, |
|
'description': self.find_closest_parent_with_useful_text(element, **kwargs) |
|
}) |
|
return True |
|
|
|
if element.name in ONLY_TEXT_ELIGIBLE_TAGS: |
|
if kwargs.get('only_text', False): |
|
element.replace_with(element.get_text()) |
|
|
|
try: |
|
self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False)) |
|
except Exception as e: |
|
|
|
self._log('error', |
|
message="Error removing unwanted attributes: {error}", |
|
tag="SCRAPE", |
|
params={"error": str(e)} |
|
) |
|
|
|
for child in list(element.children): |
|
if isinstance(child, NavigableString) and not isinstance(child, Comment): |
|
if len(child.strip()) > 0: |
|
keep_element = True |
|
else: |
|
if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs): |
|
keep_element = True |
|
|
|
|
|
|
|
word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD) |
|
if not keep_element: |
|
word_count = len(element.get_text(strip=True).split()) |
|
keep_element = word_count >= word_count_threshold |
|
|
|
if not keep_element: |
|
element.decompose() |
|
|
|
return keep_element |
|
except Exception as e: |
|
|
|
self._log('error', |
|
message="Error processing element: {error}", |
|
tag="SCRAPE", |
|
params={"error": str(e)} |
|
) |
|
return False |
|
|
|
def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: |
|
""" |
|
Extract content from HTML using BeautifulSoup. |
|
|
|
Args: |
|
url (str): The URL of the page to scrape. |
|
html (str): The HTML content of the page to scrape. |
|
word_count_threshold (int): The minimum word count threshold for content extraction. |
|
css_selector (str): The CSS selector to use for content extraction. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Returns: |
|
dict: A dictionary containing the extracted content. |
|
""" |
|
success = True |
|
if not html: |
|
return None |
|
|
|
parser_type = kwargs.get('parser', 'lxml') |
|
soup = BeautifulSoup(html, parser_type) |
|
body = soup.body |
|
base_domain = get_base_domain(url) |
|
|
|
try: |
|
meta = extract_metadata("", soup) |
|
except Exception as e: |
|
self._log('error', |
|
message="Error extracting metadata: {error}", |
|
tag="SCRAPE", |
|
params={"error": str(e)} |
|
) |
|
meta = {} |
|
|
|
|
|
excluded_tags = set(kwargs.get('excluded_tags', []) or []) |
|
if excluded_tags: |
|
for element in body.find_all(lambda tag: tag.name in excluded_tags): |
|
element.extract() |
|
|
|
|
|
excluded_selector = kwargs.get('excluded_selector', '') |
|
if excluded_selector: |
|
is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector |
|
if is_single_selector: |
|
while element := body.select_one(excluded_selector): |
|
element.extract() |
|
else: |
|
for element in body.select(excluded_selector): |
|
element.extract() |
|
|
|
if css_selector: |
|
selected_elements = body.select(css_selector) |
|
if not selected_elements: |
|
return { |
|
'markdown': '', |
|
'cleaned_html': '', |
|
'success': True, |
|
'media': {'images': [], 'videos': [], 'audios': []}, |
|
'links': {'internal': [], 'external': []}, |
|
'metadata': {}, |
|
'message': f"No elements found for CSS selector: {css_selector}" |
|
} |
|
|
|
body = soup.new_tag('div') |
|
for el in selected_elements: |
|
body.append(el) |
|
|
|
kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS) |
|
kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', [])) |
|
if kwargs.get('exclude_social_media_links', False): |
|
kwargs['exclude_domains'] = kwargs['exclude_domains'].union(kwargs['exclude_social_media_domains']) |
|
|
|
result_obj = self.process_element( |
|
url, |
|
body, |
|
word_count_threshold = word_count_threshold, |
|
base_domain=base_domain, |
|
**kwargs |
|
) |
|
|
|
links = {'internal': [], 'external': []} |
|
media = result_obj['media'] |
|
internal_links_dict = result_obj['internal_links_dict'] |
|
external_links_dict = result_obj['external_links_dict'] |
|
|
|
|
|
links['internal'] = list(internal_links_dict.values()) |
|
links['external'] = list(external_links_dict.values()) |
|
|
|
|
|
imgs = body.find_all('img') |
|
|
|
media['images'] = [ |
|
img for result in (self.process_image(img, url, i, len(imgs)) |
|
for i, img in enumerate(imgs)) |
|
if result is not None |
|
for img in result |
|
] |
|
|
|
body = self.flatten_nested_elements(body) |
|
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') |
|
for img in imgs: |
|
src = img.get('src', '') |
|
if base64_pattern.match(src): |
|
|
|
img['src'] = base64_pattern.sub('', src) |
|
|
|
str_body = "" |
|
try: |
|
str_body = body.encode_contents().decode('utf-8') |
|
except Exception as e: |
|
|
|
success = False |
|
body = BeautifulSoup(html, 'html.parser') |
|
|
|
|
|
error_div = body.new_tag('div', id='crawl4ai_error_message') |
|
error_div.string = ''' |
|
Crawl4AI Error: This page is not fully supported. |
|
|
|
Possible reasons: |
|
1. The page may have restrictions that prevent crawling. |
|
2. The page might not be fully loaded. |
|
|
|
Suggestions: |
|
- Try calling the crawl function with these parameters: |
|
magic=True, |
|
- Set headless=False to visualize what's happening on the page. |
|
|
|
If the issue persists, please check the page's structure and any potential anti-crawling measures. |
|
''' |
|
|
|
|
|
body.body.append(error_div) |
|
str_body = body.encode_contents().decode('utf-8') |
|
|
|
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.") |
|
self._log('error', |
|
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.", |
|
tag="SCRAPE" |
|
) |
|
|
|
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ') |
|
|
|
|
|
return { |
|
|
|
'cleaned_html': cleaned_html, |
|
'success': success, |
|
'media': media, |
|
'links': links, |
|
'metadata': meta |
|
} |
|
|