diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index 2c6c2f29..ae0b31fb 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -6,7 +6,7 @@ from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, L from .content_scraping_strategy import ( ContentScrapingStrategy, - WebScrapingStrategy, + # WebScrapingStrategy, LXMLWebScrapingStrategy, ) from .async_logger import ( @@ -100,7 +100,7 @@ __all__ = [ "CrawlerHub", "CacheMode", "ContentScrapingStrategy", - "WebScrapingStrategy", + # "WebScrapingStrategy", "LXMLWebScrapingStrategy", "BrowserConfig", "CrawlerRunConfig", diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 3fcd9911..3dc80dae 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -17,7 +17,7 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy from .chunking_strategy import ChunkingStrategy, RegexChunking from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator -from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy +from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy from .deep_crawling import DeepCrawlStrategy from .cache_context import CacheMode @@ -725,7 +725,7 @@ class CrawlerRunConfig(): parser_type (str): Type of parser to use for HTML parsing. Default: "lxml". scraping_strategy (ContentScrapingStrategy): Scraping strategy to use. - Default: WebScrapingStrategy. + Default: LXMLWebScrapingStrategy. proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}. If None, no additional proxy config. Default: None. @@ -979,7 +979,7 @@ class CrawlerRunConfig(): self.remove_forms = remove_forms self.prettiify = prettiify self.parser_type = parser_type - self.scraping_strategy = scraping_strategy or WebScrapingStrategy() + self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy() self.proxy_config = proxy_config self.proxy_rotation_strategy = proxy_rotation_strategy diff --git a/crawl4ai/content_scraping_strategy.py b/crawl4ai/content_scraping_strategy.py index 1dfbce84..3510f64f 100644 --- a/crawl4ai/content_scraping_strategy.py +++ b/crawl4ai/content_scraping_strategy.py @@ -2,7 +2,7 @@ import re from itertools import chain from abc import ABC, abstractmethod from typing import Dict, Any, Optional -from bs4 import BeautifulSoup +# from bs4 import BeautifulSoup import asyncio import requests from .config import ( @@ -13,12 +13,12 @@ from .config import ( IMPORTANT_ATTRS, SOCIAL_MEDIA_DOMAINS, ) -from bs4 import NavigableString, Comment -from bs4 import PageElement, Tag +# from bs4 import NavigableString, Comment +# from bs4 import PageElement, Tag from urllib.parse import urljoin from requests.exceptions import InvalidSchema from .utils import ( - extract_metadata, + # extract_metadata, normalize_url, is_external_url, get_base_domain, @@ -96,20 +96,16 @@ class ContentScrapingStrategy(ABC): pass -class WebScrapingStrategy(ContentScrapingStrategy): - """ - Class for web content scraping. Perhaps the most important class. - - How it works: - 1. Extract content from HTML using BeautifulSoup. - 2. Clean the extracted content using a content cleaning strategy. - 3. Filter the cleaned content using a content filtering strategy. - 4. Generate markdown content from the filtered content. - 5. Return the markdown content. - """ - +class LXMLWebScrapingStrategy(ContentScrapingStrategy): def __init__(self, logger=None): self.logger = logger + self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)") + self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)') + + # Constants for image processing + self.classes_to_check = frozenset(["button", "icon", "logo"]) + self.tags_to_check = frozenset(["button", "input"]) + self.image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"]) def _log(self, level, message, tag="SCRAPE", **kwargs): """Helper method to safely use logger.""" @@ -130,7 +126,8 @@ class WebScrapingStrategy(ContentScrapingStrategy): ScrapingResult: A structured result containing the scraped content. """ actual_url = kwargs.get("redirected_url", url) - raw_result = self._scrap(actual_url, html, is_async=False, **kwargs) + raw_result = self._scrap(actual_url, html, **kwargs) + if raw_result is None: return ScrapingResult( cleaned_html="", @@ -194,388 +191,15 @@ class WebScrapingStrategy(ContentScrapingStrategy): Returns: ScrapingResult: A structured result containing the scraped content. """ - return await asyncio.to_thread(self._scrap, url, html, **kwargs) + return await asyncio.to_thread(self.scrap, url, html, **kwargs) - def is_data_table(self, table: Tag, **kwargs) -> bool: - """ - Determine if a table element is a data table (not a layout table). - - Args: - table (Tag): BeautifulSoup Tag representing a table element - **kwargs: Additional keyword arguments including table_score_threshold - - Returns: - bool: True if the table is a data table, False otherwise - """ - score = 0 - - # Check for thead and tbody - has_thead = len(table.select('thead')) > 0 - has_tbody = len(table.select('tbody')) > 0 - if has_thead: - score += 2 - if has_tbody: - score += 1 - - # Check for th elements - th_count = len(table.select('th')) - if th_count > 0: - score += 2 - if has_thead or len(table.select('tr:first-child th')) > 0: - score += 1 - - # Check for nested tables - if len(table.select('table')) > 0: - score -= 3 - - # Role attribute check - role = table.get('role', '').lower() - if role in {'presentation', 'none'}: - score -= 3 - - # Column consistency - rows = table.select('tr') - if not rows: - return False - - col_counts = [len(row.select('td, th')) for row in rows] - avg_cols = sum(col_counts) / len(col_counts) - variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts) - if variance < 1: - score += 2 - - # Caption and summary - if table.select('caption'): - score += 2 - if table.has_attr('summary') and table['summary']: - score += 1 - - # Text density - total_text = sum(len(cell.get_text().strip()) for row in rows for cell in row.select('td, th')) - total_tags = sum(1 for _ in table.descendants if isinstance(_, Tag)) - text_ratio = total_text / (total_tags + 1e-5) - if text_ratio > 20: - score += 3 - elif text_ratio > 10: - score += 2 - - # Data attributes - data_attrs = sum(1 for attr in table.attrs if attr.startswith('data-')) - score += data_attrs * 0.5 - - # Size check - if avg_cols >= 2 and len(rows) >= 2: - score += 2 - - threshold = kwargs.get('table_score_threshold', 7) - return score >= threshold - - def extract_table_data(self, table: Tag) -> dict: - """ - Extract structured data from a table element. - - Args: - table (Tag): BeautifulSoup Tag representing a table element - - Returns: - dict: Dictionary containing table data (headers, rows, caption, summary) - """ - caption_elem = table.select_one('caption') - caption = caption_elem.get_text().strip() if caption_elem else "" - summary = table.get('summary', '').strip() - - # Extract headers with colspan handling - headers = [] - thead_rows = table.select('thead tr') - if thead_rows: - header_cells = thead_rows[0].select('th') - for cell in header_cells: - text = cell.get_text().strip() - colspan = int(cell.get('colspan', 1)) - headers.extend([text] * colspan) - else: - first_row = table.select('tr:first-child') - if first_row: - for cell in first_row[0].select('th, td'): - text = cell.get_text().strip() - colspan = int(cell.get('colspan', 1)) - headers.extend([text] * colspan) - - # Extract rows with colspan handling - rows = [] - all_rows = table.select('tr') - thead = table.select_one('thead') - tbody_rows = [] - - if thead: - thead_rows = thead.select('tr') - tbody_rows = [row for row in all_rows if row not in thead_rows] - else: - if all_rows and all_rows[0].select('th'): - tbody_rows = all_rows[1:] - else: - tbody_rows = all_rows - - for row in tbody_rows: - # for row in table.select('tr:not(:has(ancestor::thead))'): - row_data = [] - for cell in row.select('td'): - text = cell.get_text().strip() - colspan = int(cell.get('colspan', 1)) - row_data.extend([text] * colspan) - if row_data: - rows.append(row_data) - - # Align rows with headers - max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0) - aligned_rows = [] - for row in rows: - aligned = row[:max_columns] + [''] * (max_columns - len(row)) - aligned_rows.append(aligned) - - if not headers: - headers = [f"Column {i+1}" for i in range(max_columns)] - - return { - "headers": headers, - "rows": aligned_rows, - "caption": caption, - "summary": summary, - } - - def flatten_nested_elements(self, node): - """ - Flatten nested elements in a HTML tree. - - Args: - node (Tag): The root node of the HTML tree. - - Returns: - Tag: The flattened HTML tree. - """ - if isinstance(node, NavigableString): - return node - if ( - len(node.contents) == 1 - and isinstance(node.contents[0], Tag) - and node.contents[0].name == node.name - ): - return self.flatten_nested_elements(node.contents[0]) - node.contents = [self.flatten_nested_elements(child) for child in node.contents] - return node - - def find_closest_parent_with_useful_text(self, tag, **kwargs): - """ - Find the closest parent with useful text. - - Args: - tag (Tag): The starting tag to search from. - **kwargs: Additional keyword arguments. - - Returns: - Tag: The closest parent with useful text, or None if not found. - """ - image_description_min_word_threshold = kwargs.get( - "image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD - ) - current_tag = tag - while current_tag: - current_tag = current_tag.parent - # Get the text content of the parent tag - if current_tag: - text_content = current_tag.get_text(separator=" ", strip=True) - # Check if the text content has at least word_count_threshold - if len(text_content.split()) >= image_description_min_word_threshold: - return text_content - return None - - def remove_unwanted_attributes( - self, element, important_attrs, keep_data_attributes=False - ): - """ - Remove unwanted attributes from an HTML element. - - Args: - element (Tag): The HTML element to remove attributes from. - important_attrs (list): List of important attributes to keep. - keep_data_attributes (bool): Whether to keep data attributes. - - Returns: - None - """ - attrs_to_remove = [] - for attr in element.attrs: - if attr not in important_attrs: - if keep_data_attributes: - if not attr.startswith("data-"): - attrs_to_remove.append(attr) - else: - attrs_to_remove.append(attr) - - for attr in attrs_to_remove: - del element[attr] - - def process_image(self, img, url, index, total_images, **kwargs): - """ - Process an image element. - - How it works: - 1. Check if the image has valid display and inside undesired html elements. - 2. Score an image for it's usefulness. - 3. Extract image file metadata to extract size and extension. - 4. Generate a dictionary with the processed image information. - 5. Return the processed image information. - - Args: - img (Tag): The image element to process. - url (str): The URL of the page containing the image. - index (int): The index of the image in the list of images. - total_images (int): The total number of images in the list. - **kwargs: Additional keyword arguments. - - Returns: - dict: A dictionary containing the processed image information. - """ - # parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') - # if ' ' in u else None} - # for u in [f"http{p}" for p in s.split("http") if p]] - - # Constants for checks - classes_to_check = frozenset(["button", "icon", "logo"]) - tags_to_check = frozenset(["button", "input"]) - image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"]) - - # Pre-fetch commonly used attributes - style = img.get("style", "") - alt = img.get("alt", "") - src = img.get("src", "") - data_src = img.get("data-src", "") - srcset = img.get("srcset", "") - data_srcset = img.get("data-srcset", "") - width = img.get("width") - height = img.get("height") - parent = img.parent - parent_classes = parent.get("class", []) - - # Quick validation checks - if ( - "display:none" in style - or parent.name in tags_to_check - or any(c in cls for c in parent_classes for cls in classes_to_check) - or any(c in src for c in classes_to_check) - or any(c in alt for c in classes_to_check) - ): - return None - - # Quick score calculation - score = 0 - if width and width.isdigit(): - width_val = int(width) - score += 1 if width_val > 150 else 0 - if height and height.isdigit(): - height_val = int(height) - score += 1 if height_val > 150 else 0 - if alt: - score += 1 - score += index / total_images < 0.5 - - # image_format = '' - # if "data:image/" in src: - # image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0] - # else: - # image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0] - - # if image_format in ('jpg', 'png', 'webp', 'avif'): - # score += 1 - - # Check for image format in all possible sources - def has_image_format(url): - return any(fmt in url.lower() for fmt in image_formats) - - # Score for having proper image sources - if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]): - score += 1 - if srcset or data_srcset: - score += 1 - if img.find_parent("picture"): - score += 1 - - # Detect format from any available source - detected_format = None - for url in [src, data_src, srcset, data_srcset]: - if url: - format_matches = [fmt for fmt in image_formats if fmt in url.lower()] - if format_matches: - detected_format = format_matches[0] - break - - if score <= kwargs.get("image_score_threshold", IMAGE_SCORE_THRESHOLD): - return None - - # Use set for deduplication - unique_urls = set() - image_variants = [] - - # Generate a unique group ID for this set of variants - group_id = index - - # Base image info template - base_info = { - "alt": alt, - "desc": self.find_closest_parent_with_useful_text(img, **kwargs), - "score": score, - "type": "image", - "group_id": group_id, # Group ID for this set of variants - "format": detected_format, - } - - # Inline function for adding variants - def add_variant(src, width=None): - if src and not src.startswith("data:") and src not in unique_urls: - unique_urls.add(src) - image_variants.append({**base_info, "src": src, "width": width}) - - # Process all sources - add_variant(src) - add_variant(data_src) - - # Handle srcset and data-srcset in one pass - for attr in ("srcset", "data-srcset"): - if value := img.get(attr): - for source in parse_srcset(value): - add_variant(source["url"], source["width"]) - - # Quick picture element check - if picture := img.find_parent("picture"): - for source in picture.find_all("source"): - if srcset := source.get("srcset"): - for src in parse_srcset(srcset): - add_variant(src["url"], src["width"]) - - # Framework-specific attributes in one pass - for attr, value in img.attrs.items(): - if ( - attr.startswith("data-") - and ("src" in attr or "srcset" in attr) - and "http" in value - ): - add_variant(value) - - return image_variants if image_variants else None - - def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]: + def process_element(self, url: str, element: lhtml.HtmlElement, **kwargs) -> Dict[str, Any]: """ Process an HTML element. - How it works: - 1. Check if the element is an image, video, or audio. - 2. Extract the element's attributes and content. - 3. Process the element based on its type. - 4. Return the processed element information. - Args: url (str): The URL of the page containing the element. - element (Tag): The HTML element to process. + element (lhtml.HtmlElement): The HTML element to process. **kwargs: Additional keyword arguments. Returns: @@ -584,451 +208,40 @@ class WebScrapingStrategy(ContentScrapingStrategy): media = {"images": [], "videos": [], "audios": [], "tables": []} internal_links_dict = {} external_links_dict = {} + self._process_element( url, element, media, internal_links_dict, external_links_dict, **kwargs ) + return { "media": media, "internal_links_dict": internal_links_dict, "external_links_dict": external_links_dict, } - def _process_element( - self, - url, - element: PageElement, - media: Dict[str, Any], - internal_links_dict: Dict[str, Any], - external_links_dict: Dict[str, Any], - **kwargs, - ) -> bool: + def remove_unwanted_attributes(self, element: lhtml.HtmlElement, important_attrs: List[str], keep_data_attributes: bool = False): """ - Process an HTML element. - """ - try: - if isinstance(element, NavigableString): - if isinstance(element, Comment): - element.extract() - return False - - # if element.name == 'img': - # process_image(element, url, 0, 1) - # return True - base_domain = kwargs.get("base_domain", get_base_domain(url)) - - if element.name in ["script", "style", "link", "meta", "noscript"]: - element.decompose() - return False - - keep_element = False - # Special case for table elements - always preserve structure - if element.name in ["tr", "td", "th"]: - keep_element = True - - exclude_domains = kwargs.get("exclude_domains", []) - # exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS)) - # exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', []) - # exclude_social_media_domains = list(set(exclude_social_media_domains)) - - try: - if element.name == "a" and element.get("href"): - href = element.get("href", "").strip() - if not href: # Skip empty hrefs - return False - - # url_base = url.split("/")[2] - - # Normalize the URL - try: - normalized_href = normalize_url(href, url) - except ValueError: - # logging.warning(f"Invalid URL format: {href}, Error: {str(e)}") - return False - - link_data = { - "href": normalized_href, - "text": element.get_text().strip(), - "title": element.get("title", "").strip(), - "base_domain": base_domain, - } - - is_external = is_external_url(normalized_href, base_domain) - - keep_element = True - - # Handle external link exclusions - if is_external: - link_base_domain = get_base_domain(normalized_href) - link_data["base_domain"] = link_base_domain - if kwargs.get("exclude_external_links", False): - element.decompose() - return False - # elif kwargs.get('exclude_social_media_links', False): - # if link_base_domain in exclude_social_media_domains: - # element.decompose() - # return False - # if any(domain in normalized_href.lower() for domain in exclude_social_media_domains): - # element.decompose() - # return False - elif exclude_domains: - if link_base_domain in exclude_domains: - element.decompose() - return False - # if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])): - # element.decompose() - # return False - - if is_external: - if normalized_href not in external_links_dict: - external_links_dict[normalized_href] = link_data - else: - if kwargs.get("exclude_internal_links", False): - element.decompose() - return False - if normalized_href not in internal_links_dict: - internal_links_dict[normalized_href] = link_data - - except Exception as e: - raise Exception(f"Error processing links: {str(e)}") - - try: - if element.name == "img": - potential_sources = [ - "src", - "data-src", - "srcset" "data-lazy-src", - "data-original", - ] - src = element.get("src", "") - while not src and potential_sources: - src = element.get(potential_sources.pop(0), "") - if not src: - element.decompose() - return False - - # If it is srcset pick up the first image - if "srcset" in element.attrs: - src = element.attrs["srcset"].split(",")[0].split(" ")[0] - - # If image src is internal, then skip - if not is_external_url(src, base_domain): - return True - - image_src_base_domain = get_base_domain(src) - - # Check flag if we should remove external images - if kwargs.get("exclude_external_images", False): - element.decompose() - return False - # src_url_base = src.split('/')[2] - # url_base = url.split('/')[2] - # if url_base not in src_url_base: - # element.decompose() - # return False - - # if kwargs.get('exclude_social_media_links', False): - # if image_src_base_domain in exclude_social_media_domains: - # element.decompose() - # return False - # src_url_base = src.split('/')[2] - # url_base = url.split('/')[2] - # if any(domain in src for domain in exclude_social_media_domains): - # element.decompose() - # return False - - # Handle exclude domains - if exclude_domains: - if image_src_base_domain in exclude_domains: - element.decompose() - return False - # if any(domain in src for domain in kwargs.get('exclude_domains', [])): - # element.decompose() - # return False - - return True # Always keep image elements - except Exception: - raise "Error processing images" - - # Check if flag to remove all forms is set - if kwargs.get("remove_forms", False) and element.name == "form": - element.decompose() - return False - - if element.name in ["video", "audio"]: - media[f"{element.name}s"].append( - { - "src": element.get("src"), - "alt": element.get("alt"), - "type": element.name, - "description": self.find_closest_parent_with_useful_text( - element, **kwargs - ), - } - ) - source_tags = element.find_all("source") - for source_tag in source_tags: - media[f"{element.name}s"].append( - { - "src": source_tag.get("src"), - "alt": element.get("alt"), - "type": element.name, - "description": self.find_closest_parent_with_useful_text( - element, **kwargs - ), - } - ) - return True # Always keep video and audio elements - - if element.name in ONLY_TEXT_ELIGIBLE_TAGS: - if kwargs.get("only_text", False): - element.replace_with(element.get_text()) - - try: - self.remove_unwanted_attributes( - element, IMPORTANT_ATTRS + kwargs.get("keep_attrs", []) , kwargs.get("keep_data_attributes", False) - ) - except Exception as e: - # print('Error removing unwanted attributes:', str(e)) - self._log( - "error", - message="Error removing unwanted attributes: {error}", - tag="SCRAPE", - params={"error": str(e)}, - ) - # Process children - for child in list(element.children): - if isinstance(child, NavigableString) and not isinstance( - child, Comment - ): - if len(child.strip()) > 0: - keep_element = True - else: - if self._process_element( - url, - child, - media, - internal_links_dict, - external_links_dict, - **kwargs, - ): - keep_element = True - - # Check word count - word_count_threshold = kwargs.get( - "word_count_threshold", MIN_WORD_THRESHOLD - ) - if not keep_element: - word_count = len(element.get_text(strip=True).split()) - keep_element = word_count >= word_count_threshold - - if not keep_element: - element.decompose() - - return keep_element - except Exception as e: - # print('Error processing element:', str(e)) - self._log( - "error", - message="Error processing element: {error}", - tag="SCRAPE", - params={"error": str(e)}, - ) - return False - - def _scrap( - self, - url: str, - html: str, - word_count_threshold: int = MIN_WORD_THRESHOLD, - css_selector: str = None, - target_elements: List[str] = None, - **kwargs, - ) -> Dict[str, Any]: - """ - Extract content from HTML using BeautifulSoup. + Remove unwanted attributes from an HTML element. Args: - url (str): The URL of the page to scrape. - html (str): The HTML content of the page to scrape. - word_count_threshold (int): The minimum word count threshold for content extraction. - css_selector (str): The CSS selector to use for content extraction. - **kwargs: Additional keyword arguments. + element (lhtml.HtmlElement): The HTML element to remove attributes from. + important_attrs (List[str]): List of important attributes to keep. + keep_data_attributes (bool): Whether to keep data attributes. Returns: - dict: A dictionary containing the extracted content. + None """ - success = True - if not html: - return None + attrs_to_remove = [] + for attr in element.attrib: + if attr not in important_attrs: + if keep_data_attributes: + if not attr.startswith("data-"): + attrs_to_remove.append(attr) + else: + attrs_to_remove.append(attr) - parser_type = kwargs.get("parser", "lxml") - soup = BeautifulSoup(html, parser_type) - body = soup.body - if body is None: - raise Exception("'' tag is not found in fetched html. Consider adding wait_for=\"css:body\" to wait for body tag to be loaded into DOM.") - base_domain = get_base_domain(url) - - # Early removal of all images if exclude_all_images is set - # This happens before any processing to minimize memory usage - if kwargs.get("exclude_all_images", False): - for img in body.find_all('img'): - img.decompose() - - try: - meta = extract_metadata("", soup) - except Exception as e: - self._log( - "error", - message="Error extracting metadata: {error}", - tag="SCRAPE", - params={"error": str(e)}, - ) - meta = {} - - # Handle tag-based removal first - faster than CSS selection - excluded_tags = set(kwargs.get("excluded_tags", []) or []) - if excluded_tags: - for element in body.find_all(lambda tag: tag.name in excluded_tags): - element.extract() - - # Handle CSS selector-based removal - excluded_selector = kwargs.get("excluded_selector", "") - if excluded_selector: - is_single_selector = ( - "," not in excluded_selector and " " not in excluded_selector - ) - if is_single_selector: - while element := body.select_one(excluded_selector): - element.extract() - else: - for element in body.select(excluded_selector): - element.extract() - - content_element = None - if target_elements: - try: - for_content_targeted_element = [] - for target_element in target_elements: - for_content_targeted_element.extend(body.select(target_element)) - content_element = soup.new_tag("div") - for el in for_content_targeted_element: - content_element.append(copy.deepcopy(el)) - except Exception as e: - self._log("error", f"Error with target element detection: {str(e)}", "SCRAPE") - return None - else: - content_element = body - - kwargs["exclude_social_media_domains"] = set( - kwargs.get("exclude_social_media_domains", []) + SOCIAL_MEDIA_DOMAINS - ) - kwargs["exclude_domains"] = set(kwargs.get("exclude_domains", [])) - if kwargs.get("exclude_social_media_links", False): - kwargs["exclude_domains"] = kwargs["exclude_domains"].union( - kwargs["exclude_social_media_domains"] - ) - - result_obj = self.process_element( - url, - body, - word_count_threshold=word_count_threshold, - base_domain=base_domain, - **kwargs, - ) - - links = {"internal": [], "external": []} - media = result_obj["media"] - internal_links_dict = result_obj["internal_links_dict"] - external_links_dict = result_obj["external_links_dict"] - - # Update the links dictionary with unique links - links["internal"] = list(internal_links_dict.values()) - links["external"] = list(external_links_dict.values()) - - # # Process images using ThreadPoolExecutor - imgs = body.find_all("img") - - media["images"] = [ - img - for result in ( - self.process_image(img, url, i, len(imgs), **kwargs) - for i, img in enumerate(imgs) - ) - if result is not None - for img in result - ] - - # Process tables if not excluded - excluded_tags = set(kwargs.get("excluded_tags", []) or []) - if 'table' not in excluded_tags: - tables = body.find_all('table') - for table in tables: - if self.is_data_table(table, **kwargs): - table_data = self.extract_table_data(table) - media["tables"].append(table_data) - - body = self.flatten_nested_elements(body) - base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') - for img in imgs: - src = img.get("src", "") - if base64_pattern.match(src): - # Replace base64 data with empty string - img["src"] = base64_pattern.sub("", src) - - str_body = "" - try: - str_body = content_element.encode_contents().decode("utf-8") - except Exception: - # Reset body to the original HTML - success = False - body = BeautifulSoup(html, "html.parser") - - # Create a new div with a special ID - error_div = body.new_tag("div", id="crawl4ai_error_message") - error_div.string = """ - Crawl4AI Error: This page is not fully supported. - - Possible reasons: - 1. The page may have restrictions that prevent crawling. - 2. The page might not be fully loaded. - - Suggestions: - - Try calling the crawl function with these parameters: - magic=True, - - Set headless=False to visualize what's happening on the page. - - If the issue persists, please check the page's structure and any potential anti-crawling measures. - """ - - # Append the error div to the body - body.append(error_div) - str_body = body.encode_contents().decode("utf-8") - - print( - "[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details." - ) - self._log( - "error", - message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.", - tag="SCRAPE", - ) - - cleaned_html = str_body.replace("\n\n", "\n").replace(" ", " ") - - return { - "cleaned_html": cleaned_html, - "success": success, - "media": media, - "links": links, - "metadata": meta, - } - - -class LXMLWebScrapingStrategy(WebScrapingStrategy): - def __init__(self, logger=None): - super().__init__(logger) - self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)") - self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)') + for attr in attrs_to_remove: + del element.attrib[attr] def _process_element( self, @@ -1190,7 +403,7 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy): return None parent = img.getparent() - if parent.tag in ["button", "input"]: + if parent.tag in self.tags_to_check: return None parent_classes = parent.get("class", "").split() @@ -1200,8 +413,8 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy): return None # If src is in class or alt, likely an icon - if (src and any(c in src for c in ["button", "icon", "logo"])) or ( - alt and any(c in alt for c in ["button", "icon", "logo"]) + if (src and any(c in src for c in self.classes_to_check)) or ( + alt and any(c in alt for c in self.classes_to_check) ): return None @@ -1216,11 +429,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy): score += index / total_images < 0.5 # Check formats in all possible sources - image_formats = {"jpg", "jpeg", "png", "webp", "avif", "gif"} detected_format = None for url in [src, data_src, srcset, data_srcset]: if url: - format_matches = [fmt for fmt in image_formats if fmt in url.lower()] + format_matches = [fmt for fmt in self.image_formats if fmt in url.lower()] if format_matches: detected_format = format_matches[0] score += 1 @@ -1611,7 +823,9 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy): # Remove unneeded attributes self.remove_unwanted_attributes_fast( - body, keep_data_attributes=kwargs.get("keep_data_attributes", False) + body, + important_attrs=IMPORTANT_ATTRS + kwargs.get("keep_attrs", []), + keep_data_attributes=kwargs.get("keep_data_attributes", False) ) # Generate output HTML diff --git a/crawl4ai/types.py b/crawl4ai/types.py index 63fd45ba..bf341f35 100644 --- a/crawl4ai/types.py +++ b/crawl4ai/types.py @@ -19,7 +19,7 @@ LLMConfig = Union['LLMConfigType'] # Content scraping types ContentScrapingStrategy = Union['ContentScrapingStrategyType'] -WebScrapingStrategy = Union['WebScrapingStrategyType'] +# WebScrapingStrategy = Union['WebScrapingStrategyType'] LXMLWebScrapingStrategy = Union['LXMLWebScrapingStrategyType'] # Proxy types @@ -106,7 +106,7 @@ if TYPE_CHECKING: # Content scraping imports from .content_scraping_strategy import ( ContentScrapingStrategy as ContentScrapingStrategyType, - WebScrapingStrategy as WebScrapingStrategyType, + # WebScrapingStrategy as WebScrapingStrategyType, LXMLWebScrapingStrategy as LXMLWebScrapingStrategyType, ) diff --git a/docs/examples/scraping_strategies_performance.py b/docs/examples/scraping_strategies_performance.py index 87fb8ac5..2c3f491e 100644 --- a/docs/examples/scraping_strategies_performance.py +++ b/docs/examples/scraping_strategies_performance.py @@ -1,6 +1,12 @@ -import time, re -from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy import time +import os +import sys + +parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(parent_dir) +__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) + +from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy import functools from collections import defaultdict @@ -57,7 +63,7 @@ methods_to_profile = [ # Apply decorators to both strategies -for strategy, name in [(WebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]: +for strategy, name in [(LXMLWebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]: for method in methods_to_profile: apply_decorators(strategy, method, name) @@ -85,7 +91,7 @@ def generate_large_html(n_elements=1000): def test_scraping(): # Initialize both scrapers - original_scraper = WebScrapingStrategy() + original_scraper = LXMLWebScrapingStrategy() selected_scraper = LXMLWebScrapingStrategy() # Generate test HTML diff --git a/tests/async/test_content_scraper_strategy.py b/tests/async/test_content_scraper_strategy.py index e6caf240..7a88a66f 100644 --- a/tests/async/test_content_scraper_strategy.py +++ b/tests/async/test_content_scraper_strategy.py @@ -12,10 +12,10 @@ parent_dir = os.path.dirname( sys.path.append(parent_dir) __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) -from crawl4ai.content_scraping_strategy import WebScrapingStrategy -from crawl4ai.content_scraping_strategy import ( - WebScrapingStrategy as WebScrapingStrategyCurrent, -) +from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy +# from crawl4ai.content_scraping_strategy import ( +# WebScrapingStrategy as WebScrapingStrategyCurrent, +# ) # from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent @@ -32,8 +32,8 @@ class TestResult: class StrategyTester: def __init__(self): - self.new_scraper = WebScrapingStrategy() - self.current_scraper = WebScrapingStrategyCurrent() + self.new_scraper = LXMLWebScrapingStrategy() + self.current_scraper = LXMLWebScrapingStrategy() with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f: self.WIKI_HTML = f.read() self.results = {"new": [], "current": []} diff --git a/tests/async/test_evaluation_scraping_methods_performance.configs.py b/tests/async/test_evaluation_scraping_methods_performance.configs.py index 797cf681..65ca8684 100644 --- a/tests/async/test_evaluation_scraping_methods_performance.configs.py +++ b/tests/async/test_evaluation_scraping_methods_performance.configs.py @@ -2,7 +2,6 @@ import json import time from bs4 import BeautifulSoup from crawl4ai.content_scraping_strategy import ( - WebScrapingStrategy, LXMLWebScrapingStrategy, ) from typing import Dict, List, Tuple @@ -274,7 +273,7 @@ def get_test_scenarios(): that will be passed into scrap() for testing various features. """ TEST_SCENARIOS = { - # "default": {}, + "default": {}, # "exclude_domains": { # "exclude_domains": {"images.example.com", "ads.example.com"} # }, @@ -609,19 +608,26 @@ class ScraperEquivalenceTester: print("\n=== Testing complicated HTML with multiple parameter scenarios ===") # Create the scrapers once (or you can re-create if needed) - original = WebScrapingStrategy() + # original = WebScrapingStrategy() + original = LXMLWebScrapingStrategy() lxml = LXMLWebScrapingStrategy() + + # Base URL for testing + url = "http://test.com" + url = "https://kidocode.com" for scenario_name, params in get_test_scenarios().items(): print(f"\nScenario: {scenario_name}") start = time.time() - orig_result = original.scrap("http://test.com", complicated_html, **params) + orig_result = original.scrap(url, complicated_html, **params) orig_time = time.time() - start + orig_result = orig_result.model_dump() start = time.time() - lxml_result = lxml.scrap("http://test.com", complicated_html, **params) + lxml_result = lxml.scrap(url, complicated_html, **params) lxml_time = time.time() - start + lxml_result = lxml_result.model_dump() diffs = {} link_diff = self.deep_compare_links(