import re # Point 1: Pre-Compile Regular Expressions import time from abc import ABC, abstractmethod from typing import Dict, Any, Optional from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor import asyncio, requests, re, os from .config import * from bs4 import element, NavigableString, Comment from bs4 import PageElement, Tag from urllib.parse import urljoin from requests.exceptions import InvalidSchema # from .content_cleaning_strategy import ContentCleaningStrategy from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator from .models import MarkdownGenerationResult from .utils import ( extract_metadata, normalize_url, is_external_url, get_base_domain, ) # Pre-compile regular expressions for Open Graph and Twitter metadata OG_REGEX = re.compile(r'^og:') TWITTER_REGEX = re.compile(r'^twitter:') DIMENSION_REGEX = re.compile(r"(\d+)(\D*)") # Function to parse image height/width value and units def parse_dimension(dimension): if dimension: # match = re.match(r"(\d+)(\D*)", dimension) match = DIMENSION_REGEX.match(dimension) if match: number = int(match.group(1)) unit = match.group(2) or 'px' # Default unit is 'px' if not specified return number, unit return None, None # Fetch image file metadata to extract size and extension def fetch_image_file_size(img, base_url): #If src is relative path construct full URL, if not it may be CDN URL img_url = urljoin(base_url,img.get('src')) try: response = requests.head(img_url) if response.status_code == 200: return response.headers.get('Content-Length',None) else: print(f"Failed to retrieve file size for {img_url}") return None except InvalidSchema as e: return None finally: return class ContentScrapingStrategy(ABC): @abstractmethod def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: pass @abstractmethod async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: pass class WebScrapingStrategy(ContentScrapingStrategy): """ Class for web content scraping. Perhaps the most important class. How it works: 1. Extract content from HTML using BeautifulSoup. 2. Clean the extracted content using a content cleaning strategy. 3. Filter the cleaned content using a content filtering strategy. 4. Generate markdown content from the filtered content. 5. Return the markdown content. """ def __init__(self, logger=None): self.logger = logger def _log(self, level, message, tag="SCRAPE", **kwargs): """Helper method to safely use logger.""" if self.logger: log_method = getattr(self.logger, level) log_method(message=message, tag=tag, **kwargs) def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: """ Main entry point for content scraping. Args: url (str): The URL of the page to scrape. html (str): The HTML content of the page. **kwargs: Additional keyword arguments. Returns: Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: - 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. - 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. - 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. - 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' """ return self._scrap(url, html, is_async=False, **kwargs) async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: """ Main entry point for asynchronous content scraping. Args: url (str): The URL of the page to scrape. html (str): The HTML content of the page. **kwargs: Additional keyword arguments. Returns: Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: - 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. - 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. - 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. - 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' """ return await asyncio.to_thread(self._scrap, url, html, **kwargs) def flatten_nested_elements(self, node): """ Flatten nested elements in a HTML tree. Args: node (Tag): The root node of the HTML tree. Returns: Tag: The flattened HTML tree. """ if isinstance(node, NavigableString): return node if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name: return self.flatten_nested_elements(node.contents[0]) node.contents = [self.flatten_nested_elements(child) for child in node.contents] return node def find_closest_parent_with_useful_text(self, tag, **kwargs): """ Find the closest parent with useful text. Args: tag (Tag): The starting tag to search from. **kwargs: Additional keyword arguments. Returns: Tag: The closest parent with useful text, or None if not found. """ image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) current_tag = tag while current_tag: current_tag = current_tag.parent # Get the text content of the parent tag if current_tag: text_content = current_tag.get_text(separator=' ',strip=True) # Check if the text content has at least word_count_threshold if len(text_content.split()) >= image_description_min_word_threshold: return text_content return None def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False): """ Remove unwanted attributes from an HTML element. Args: element (Tag): The HTML element to remove attributes from. important_attrs (list): List of important attributes to keep. keep_data_attributes (bool): Whether to keep data attributes. Returns: None """ attrs_to_remove = [] for attr in element.attrs: if attr not in important_attrs: if keep_data_attributes: if not attr.startswith('data-'): attrs_to_remove.append(attr) else: attrs_to_remove.append(attr) for attr in attrs_to_remove: del element[attr] def process_image(self, img, url, index, total_images, **kwargs): """ Process an image element. How it works: 1. Check if the image has valid display and inside undesired html elements. 2. Score an image for it's usefulness. 3. Extract image file metadata to extract size and extension. 4. Generate a dictionary with the processed image information. 5. Return the processed image information. Args: img (Tag): The image element to process. url (str): The URL of the page containing the image. index (int): The index of the image in the list of images. total_images (int): The total number of images in the list. **kwargs: Additional keyword arguments. Returns: dict: A dictionary containing the processed image information. """ parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') if ' ' in u else None} for u in [f"http{p}" for p in s.split("http") if p]] # Constants for checks classes_to_check = frozenset(['button', 'icon', 'logo']) tags_to_check = frozenset(['button', 'input']) image_formats = frozenset(['jpg', 'jpeg', 'png', 'webp', 'avif', 'gif']) # Pre-fetch commonly used attributes style = img.get('style', '') alt = img.get('alt', '') src = img.get('src', '') data_src = img.get('data-src', '') srcset = img.get('srcset', '') data_srcset = img.get('data-srcset', '') width = img.get('width') height = img.get('height') parent = img.parent parent_classes = parent.get('class', []) # Quick validation checks if ('display:none' in style or parent.name in tags_to_check or any(c in cls for c in parent_classes for cls in classes_to_check) or any(c in src for c in classes_to_check) or any(c in alt for c in classes_to_check)): return None # Quick score calculation score = 0 if width and width.isdigit(): width_val = int(width) score += 1 if width_val > 150 else 0 if height and height.isdigit(): height_val = int(height) score += 1 if height_val > 150 else 0 if alt: score += 1 score += index/total_images < 0.5 # image_format = '' # if "data:image/" in src: # image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0] # else: # image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0] # if image_format in ('jpg', 'png', 'webp', 'avif'): # score += 1 # Check for image format in all possible sources def has_image_format(url): return any(fmt in url.lower() for fmt in image_formats) # Score for having proper image sources if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]): score += 1 if srcset or data_srcset: score += 1 if img.find_parent('picture'): score += 1 # Detect format from any available source detected_format = None for url in [src, data_src, srcset, data_srcset]: if url: format_matches = [fmt for fmt in image_formats if fmt in url.lower()] if format_matches: detected_format = format_matches[0] break if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): return None # Use set for deduplication unique_urls = set() image_variants = [] # Generate a unique group ID for this set of variants group_id = index # Base image info template image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) base_info = { 'alt': alt, 'desc': self.find_closest_parent_with_useful_text(img, **kwargs), 'score': score, 'type': 'image', 'group_id': group_id, # Group ID for this set of variants 'format': detected_format, } # Inline function for adding variants def add_variant(src, width=None): if src and not src.startswith('data:') and src not in unique_urls: unique_urls.add(src) image_variants.append({**base_info, 'src': src, 'width': width}) # Process all sources add_variant(src) add_variant(data_src) # Handle srcset and data-srcset in one pass for attr in ('srcset', 'data-srcset'): if value := img.get(attr): for source in parse_srcset(value): add_variant(source['url'], source['width']) # Quick picture element check if picture := img.find_parent('picture'): for source in picture.find_all('source'): if srcset := source.get('srcset'): for src in parse_srcset(srcset): add_variant(src['url'], src['width']) # Framework-specific attributes in one pass for attr, value in img.attrs.items(): if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value: add_variant(value) return image_variants if image_variants else None def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]: """ Process an HTML element. How it works: 1. Check if the element is an image, video, or audio. 2. Extract the element's attributes and content. 3. Process the element based on its type. 4. Return the processed element information. Args: url (str): The URL of the page containing the element. element (Tag): The HTML element to process. **kwargs: Additional keyword arguments. Returns: dict: A dictionary containing the processed element information. """ media = {'images': [], 'videos': [], 'audios': []} internal_links_dict = {} external_links_dict = {} self._process_element( url, element, media, internal_links_dict, external_links_dict, **kwargs ) return { 'media': media, 'internal_links_dict': internal_links_dict, 'external_links_dict': external_links_dict } def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool: """ Process an HTML element. """ try: if isinstance(element, NavigableString): if isinstance(element, Comment): element.extract() return False # if element.name == 'img': # process_image(element, url, 0, 1) # return True base_domain = kwargs.get("base_domain", get_base_domain(url)) if element.name in ['script', 'style', 'link', 'meta', 'noscript']: element.decompose() return False keep_element = False exclude_domains = kwargs.get('exclude_domains', []) # exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS)) # exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', []) # exclude_social_media_domains = list(set(exclude_social_media_domains)) try: if element.name == 'a' and element.get('href'): href = element.get('href', '').strip() if not href: # Skip empty hrefs return False url_base = url.split('/')[2] # Normalize the URL try: normalized_href = normalize_url(href, url) except ValueError as e: # logging.warning(f"Invalid URL format: {href}, Error: {str(e)}") return False link_data = { 'href': normalized_href, 'text': element.get_text().strip(), 'title': element.get('title', '').strip(), 'base_domain': base_domain } is_external = is_external_url(normalized_href, base_domain) keep_element = True # Handle external link exclusions if is_external: link_base_domain = get_base_domain(normalized_href) link_data['base_domain'] = link_base_domain if kwargs.get('exclude_external_links', False): element.decompose() return False # elif kwargs.get('exclude_social_media_links', False): # if link_base_domain in exclude_social_media_domains: # element.decompose() # return False # if any(domain in normalized_href.lower() for domain in exclude_social_media_domains): # element.decompose() # return False elif exclude_domains: if link_base_domain in exclude_domains: element.decompose() return False # if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])): # element.decompose() # return False if is_external: if normalized_href not in external_links_dict: external_links_dict[normalized_href] = link_data else: if normalized_href not in internal_links_dict: internal_links_dict[normalized_href] = link_data except Exception as e: raise Exception(f"Error processing links: {str(e)}") try: if element.name == 'img': potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original'] src = element.get('src', '') while not src and potential_sources: src = element.get(potential_sources.pop(0), '') if not src: element.decompose() return False # If it is srcset pick up the first image if 'srcset' in element.attrs: src = element.attrs['srcset'].split(',')[0].split(' ')[0] # If image src is internal, then skip if not is_external_url(src, base_domain): return True image_src_base_domain = get_base_domain(src) # Check flag if we should remove external images if kwargs.get('exclude_external_images', False): element.decompose() return False # src_url_base = src.split('/')[2] # url_base = url.split('/')[2] # if url_base not in src_url_base: # element.decompose() # return False # if kwargs.get('exclude_social_media_links', False): # if image_src_base_domain in exclude_social_media_domains: # element.decompose() # return False # src_url_base = src.split('/')[2] # url_base = url.split('/')[2] # if any(domain in src for domain in exclude_social_media_domains): # element.decompose() # return False # Handle exclude domains if exclude_domains: if image_src_base_domain in exclude_domains: element.decompose() return False # if any(domain in src for domain in kwargs.get('exclude_domains', [])): # element.decompose() # return False return True # Always keep image elements except Exception as e: raise "Error processing images" # Check if flag to remove all forms is set if kwargs.get('remove_forms', False) and element.name == 'form': element.decompose() return False if element.name in ['video', 'audio']: media[f"{element.name}s"].append({ 'src': element.get('src'), 'alt': element.get('alt'), 'type': element.name, 'description': self.find_closest_parent_with_useful_text(element, **kwargs) }) source_tags = element.find_all('source') for source_tag in source_tags: media[f"{element.name}s"].append({ 'src': source_tag.get('src'), 'alt': element.get('alt'), 'type': element.name, 'description': self.find_closest_parent_with_useful_text(element, **kwargs) }) return True # Always keep video and audio elements if element.name in ONLY_TEXT_ELIGIBLE_TAGS: if kwargs.get('only_text', False): element.replace_with(element.get_text()) try: self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False)) except Exception as e: # print('Error removing unwanted attributes:', str(e)) self._log('error', message="Error removing unwanted attributes: {error}", tag="SCRAPE", params={"error": str(e)} ) # Process children for child in list(element.children): if isinstance(child, NavigableString) and not isinstance(child, Comment): if len(child.strip()) > 0: keep_element = True else: if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs): keep_element = True # Check word count word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD) if not keep_element: word_count = len(element.get_text(strip=True).split()) keep_element = word_count >= word_count_threshold if not keep_element: element.decompose() return keep_element except Exception as e: # print('Error processing element:', str(e)) self._log('error', message="Error processing element: {error}", tag="SCRAPE", params={"error": str(e)} ) return False def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: """ Extract content from HTML using BeautifulSoup. Args: url (str): The URL of the page to scrape. html (str): The HTML content of the page to scrape. word_count_threshold (int): The minimum word count threshold for content extraction. css_selector (str): The CSS selector to use for content extraction. **kwargs: Additional keyword arguments. Returns: dict: A dictionary containing the extracted content. """ success = True if not html: return None parser_type = kwargs.get('parser', 'lxml') soup = BeautifulSoup(html, parser_type) body = soup.body base_domain = get_base_domain(url) try: meta = extract_metadata("", soup) except Exception as e: self._log('error', message="Error extracting metadata: {error}", tag="SCRAPE", params={"error": str(e)} ) meta = {} # Handle tag-based removal first - faster than CSS selection excluded_tags = set(kwargs.get('excluded_tags', []) or []) if excluded_tags: for element in body.find_all(lambda tag: tag.name in excluded_tags): element.extract() # Handle CSS selector-based removal excluded_selector = kwargs.get('excluded_selector', '') if excluded_selector: is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector if is_single_selector: while element := body.select_one(excluded_selector): element.extract() else: for element in body.select(excluded_selector): element.extract() if css_selector: selected_elements = body.select(css_selector) if not selected_elements: return { 'markdown': '', 'cleaned_html': '', 'success': True, 'media': {'images': [], 'videos': [], 'audios': []}, 'links': {'internal': [], 'external': []}, 'metadata': {}, 'message': f"No elements found for CSS selector: {css_selector}" } # raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") body = soup.new_tag('div') for el in selected_elements: body.append(el) kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS) kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', [])) if kwargs.get('exclude_social_media_links', False): kwargs['exclude_domains'] = kwargs['exclude_domains'].union(kwargs['exclude_social_media_domains']) result_obj = self.process_element( url, body, word_count_threshold = word_count_threshold, base_domain=base_domain, **kwargs ) links = {'internal': [], 'external': []} media = result_obj['media'] internal_links_dict = result_obj['internal_links_dict'] external_links_dict = result_obj['external_links_dict'] # Update the links dictionary with unique links links['internal'] = list(internal_links_dict.values()) links['external'] = list(external_links_dict.values()) # # Process images using ThreadPoolExecutor imgs = body.find_all('img') media['images'] = [ img for result in (self.process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) if result is not None for img in result ] body = self.flatten_nested_elements(body) base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') for img in imgs: src = img.get('src', '') if base64_pattern.match(src): # Replace base64 data with empty string img['src'] = base64_pattern.sub('', src) str_body = "" try: str_body = body.encode_contents().decode('utf-8') except Exception as e: # Reset body to the original HTML success = False body = BeautifulSoup(html, 'html.parser') # Create a new div with a special ID error_div = body.new_tag('div', id='crawl4ai_error_message') error_div.string = ''' Crawl4AI Error: This page is not fully supported. Possible reasons: 1. The page may have restrictions that prevent crawling. 2. The page might not be fully loaded. Suggestions: - Try calling the crawl function with these parameters: magic=True, - Set headless=False to visualize what's happening on the page. If the issue persists, please check the page's structure and any potential anti-crawling measures. ''' # Append the error div to the body body.body.append(error_div) str_body = body.encode_contents().decode('utf-8') print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.") self._log('error', message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.", tag="SCRAPE" ) cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ') return { # **markdown_content, 'cleaned_html': cleaned_html, 'success': success, 'media': media, 'links': links, 'metadata': meta }