diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py
index 972ca04e..1c33b311 100644
--- a/crawl4ai/__init__.py
+++ b/crawl4ai/__init__.py
@@ -1,7 +1,7 @@
# __init__.py
from .async_webcrawler import AsyncWebCrawler, CacheMode
-from .async_configs import BrowserConfig, CrawlerRunConfig
+from .async_configs import BrowserConfig, CrawlerRunConfig, ScrapingMode
from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy, CosineStrategy, JsonCssExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
@@ -14,6 +14,7 @@ __all__ = [
"AsyncWebCrawler",
"CrawlResult",
"CacheMode",
+ "ScrapingMode",
'BrowserConfig',
'CrawlerRunConfig',
'ExtractionStrategy',
diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py
index 5094d610..3c5c0433 100644
--- a/crawl4ai/async_configs.py
+++ b/crawl4ai/async_configs.py
@@ -12,6 +12,7 @@ from .extraction_strategy import ExtractionStrategy
from .chunking_strategy import ChunkingStrategy
from .markdown_generation_strategy import MarkdownGenerationStrategy
from typing import Union, List
+from enum import Enum
class BrowserConfig:
"""
@@ -183,6 +184,12 @@ class BrowserConfig:
)
+class ScrapingMode(str, Enum):
+ """Enum for different scraping modes."""
+ BEAUTIFULSOUP = "beautifulsoup"
+ LXML = "lxml"
+
+
class CrawlerRunConfig:
"""
Configuration class for controlling how the crawler runs each crawl operation.
@@ -220,6 +227,8 @@ class CrawlerRunConfig:
Default: False.
parser_type (str): Type of parser to use for HTML parsing.
Default: "lxml".
+ scraping_mode (ScrapingMode): Scraping mode to use.
+ Default: ScrapingMode.BEAUTIFULSOUP.
# Caching Parameters
cache_mode (CacheMode or None): Defines how caching is handled.
@@ -331,6 +340,7 @@ class CrawlerRunConfig:
remove_forms: bool = False,
prettiify: bool = False,
parser_type: str = "lxml",
+ scraping_mode: ScrapingMode = ScrapingMode.BEAUTIFULSOUP,
# SSL Parameters
fetch_ssl_certificate: bool = False,
@@ -403,6 +413,7 @@ class CrawlerRunConfig:
self.remove_forms = remove_forms
self.prettiify = prettiify
self.parser_type = parser_type
+ self.scraping_mode = scraping_mode
# SSL Parameters
self.fetch_ssl_certificate = fetch_ssl_certificate
@@ -489,6 +500,7 @@ class CrawlerRunConfig:
remove_forms=kwargs.get("remove_forms", False),
prettiify=kwargs.get("prettiify", False),
parser_type=kwargs.get("parser_type", "lxml"),
+ scraping_mode=kwargs.get("scraping_mode", ScrapingMode.BEAUTIFULSOUP),
# SSL Parameters
fetch_ssl_certificate=kwargs.get("fetch_ssl_certificate", False),
@@ -562,6 +574,7 @@ class CrawlerRunConfig:
"remove_forms": self.remove_forms,
"prettiify": self.prettiify,
"parser_type": self.parser_type,
+ "scraping_mode": self.scraping_mode,
"fetch_ssl_certificate": self.fetch_ssl_certificate,
"cache_mode": self.cache_mode,
"session_id": self.session_id,
diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py
index 82b96070..f13fdae1 100644
--- a/crawl4ai/async_webcrawler.py
+++ b/crawl4ai/async_webcrawler.py
@@ -17,7 +17,7 @@ from .extraction_strategy import *
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
from .cache_context import CacheMode, CacheContext, _legacy_to_cache_mode
from .markdown_generation_strategy import DefaultMarkdownGenerator, MarkdownGenerationStrategy
-from .content_scraping_strategy import WebScrapingStrategy
+from .content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
from .async_logger import AsyncLogger
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import *
@@ -543,8 +543,11 @@ class AsyncWebCrawler:
_url = url if not kwargs.get("is_raw_html", False) else "Raw HTML"
t1 = time.perf_counter()
- # Initialize scraping strategy
- scrapping_strategy = WebScrapingStrategy(logger=self.logger)
+ # Initialize scraping strategy based on mode
+ if config.scraping_mode == ScrapingMode.LXML:
+ scrapping_strategy = LXMLWebScrapingStrategy(logger=self.logger)
+ else: # Default to BeautifulSoup
+ scrapping_strategy = WebScrapingStrategy(logger=self.logger)
# Process HTML content
params = {k:v for k, v in config.to_dict().items() if k not in ["url"]}
diff --git a/crawl4ai/content_scraping_strategy.py b/crawl4ai/content_scraping_strategy.py
index f3a96cf3..e9f631c7 100644
--- a/crawl4ai/content_scraping_strategy.py
+++ b/crawl4ai/content_scraping_strategy.py
@@ -1,4 +1,5 @@
-import re # Point 1: Pre-Compile Regular Expressions
+import re
+from itertools import chain
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
@@ -6,27 +7,43 @@ from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import asyncio, requests, re, os
from .config import *
-from bs4 import element, NavigableString, Comment
+from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
-# from .content_cleaning_strategy import ContentCleaningStrategy
-from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter
-from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
-from .models import MarkdownGenerationResult
from .utils import (
extract_metadata,
normalize_url,
is_external_url,
get_base_domain,
+ extract_metadata_using_lxml
)
-
+from lxml import etree
+from lxml import html as lhtml
+from typing import Dict, Any, List, Tuple
# Pre-compile regular expressions for Open Graph and Twitter metadata
OG_REGEX = re.compile(r'^og:')
TWITTER_REGEX = re.compile(r'^twitter:')
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
+
+# Function to parse srcset
+def parse_srcset(s: str) -> List[Dict]:
+ if not s:
+ return []
+ variants = []
+ for part in s.split(','):
+ part = part.strip()
+ if not part:
+ continue
+ parts = part.split()
+ if len(parts) >= 1:
+ url = parts[0]
+ width = parts[1].rstrip('w') if len(parts) > 1 and parts[1].endswith('w') else None
+ variants.append({'url': url, 'width': width})
+ return variants
+
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
@@ -207,9 +224,9 @@ class WebScrapingStrategy(ContentScrapingStrategy):
Returns:
dict: A dictionary containing the processed image information.
"""
- parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
- if ' ' in u else None}
- for u in [f"http{p}" for p in s.split("http") if p]]
+ # parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
+ # if ' ' in u else None}
+ # for u in [f"http{p}" for p in s.split("http") if p]]
# Constants for checks
classes_to_check = frozenset(['button', 'icon', 'logo'])
@@ -290,7 +307,6 @@ class WebScrapingStrategy(ContentScrapingStrategy):
group_id = index
# Base image info template
- image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
base_info = {
'alt': alt,
'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
@@ -661,7 +677,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
imgs = body.find_all('img')
media['images'] = [
- img for result in (self.process_image(img, url, i, len(imgs))
+ img for result in (self.process_image(img, url, i, len(imgs), **kwargs)
for i, img in enumerate(imgs))
if result is not None
for img in result
@@ -701,7 +717,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'''
# Append the error div to the body
- body.body.append(error_div)
+ body.append(error_div)
str_body = body.encode_contents().decode('utf-8')
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
@@ -721,3 +737,462 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'links': links,
'metadata': meta
}
+
+class LXMLWebScrapingStrategy(WebScrapingStrategy):
+ def __init__(self, logger=None):
+ super().__init__(logger)
+ self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
+ self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
+
+ def _process_element(self, url: str, element: lhtml.HtmlElement, media: Dict[str, List],
+ internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool:
+ base_domain = kwargs.get("base_domain", get_base_domain(url))
+ exclude_domains = set(kwargs.get('exclude_domains', []))
+
+ # Process links
+ for link in element.xpath('.//a[@href]'):
+ href = link.get('href', '').strip()
+ if not href:
+ continue
+
+ try:
+ normalized_href = normalize_url(href, url)
+ link_data = {
+ 'href': normalized_href,
+ 'text': link.text_content().strip(),
+ 'title': link.get('title', '').strip(),
+ 'base_domain': base_domain
+ }
+
+ is_external = is_external_url(normalized_href, base_domain)
+ if is_external:
+ link_base_domain = get_base_domain(normalized_href)
+ link_data['base_domain'] = link_base_domain
+ if kwargs.get('exclude_external_links', False) or link_base_domain in exclude_domains:
+ link.getparent().remove(link)
+ continue
+
+ if normalized_href not in external_links_dict:
+ external_links_dict[normalized_href] = link_data
+ else:
+ if normalized_href not in internal_links_dict:
+ internal_links_dict[normalized_href] = link_data
+
+ except Exception as e:
+ self._log('error', f"Error processing link: {str(e)}", "SCRAPE")
+ continue
+
+ # Process images
+ images = element.xpath('.//img')
+ total_images = len(images)
+
+ for idx, img in enumerate(images):
+ src = img.get('src') or ''
+ img_domain = get_base_domain(src)
+
+ # Decide if we need to exclude this image
+ # 1) If its domain is in exclude_domains, remove.
+ # 2) Or if exclude_external_images=True and it's an external domain, remove.
+ if (img_domain in exclude_domains) or (
+ kwargs.get('exclude_external_images', False) and is_external_url(src, base_domain)
+ ):
+ parent = img.getparent()
+ if parent is not None:
+ parent.remove(img)
+ continue
+
+ # Otherwise, process the image as usual.
+ try:
+ processed_images = self.process_image(img, url, idx, total_images, **kwargs)
+ if processed_images:
+ media['images'].extend(processed_images)
+ except Exception as e:
+ self._log('error', f"Error processing image: {str(e)}", "SCRAPE")
+
+ # Process videos and audios
+ for media_type in ['video', 'audio']:
+ for elem in element.xpath(f'.//{media_type}'):
+ media_info = {
+ 'src': elem.get('src'),
+ 'alt': elem.get('alt'),
+ 'type': media_type,
+ 'description': self.find_closest_parent_with_useful_text(elem, **kwargs)
+ }
+ media[f"{media_type}s"].append(media_info)
+
+ # Process source tags within media elements
+ for source in elem.xpath('.//source'):
+ if src := source.get('src'):
+ media[f"{media_type}s"].append({**media_info, 'src': src})
+
+ # Clean up unwanted elements
+ if kwargs.get('remove_forms', False):
+ for form in element.xpath('.//form'):
+ form.getparent().remove(form)
+
+ if excluded_tags := kwargs.get('excluded_tags', []):
+ for tag in excluded_tags:
+ for elem in element.xpath(f'.//{tag}'):
+ elem.getparent().remove(elem)
+
+ if excluded_selector := kwargs.get('excluded_selector', ''):
+ try:
+ for elem in element.cssselect(excluded_selector):
+ elem.getparent().remove(elem)
+ except Exception:
+ pass # Invalid selector
+
+ return True
+
+ def find_closest_parent_with_useful_text(self, element: lhtml.HtmlElement, **kwargs) -> Optional[str]:
+ image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold',
+ IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
+ current = element
+ while current is not None:
+ if current.text and len(current.text_content().split()) >= image_description_min_word_threshold:
+ return current.text_content().strip()
+ current = current.getparent()
+ return None
+
+ def flatten_nested_elements(self, element: lhtml.HtmlElement) -> lhtml.HtmlElement:
+ """Flatten nested elements of the same type in LXML tree"""
+ if len(element) == 1 and element.tag == element[0].tag:
+ return self.flatten_nested_elements(element[0])
+
+ for child in element:
+ child_idx = element.index(child)
+ flattened_child = self.flatten_nested_elements(child)
+ if flattened_child is not child: # Only replace if actually flattened
+ element[child_idx] = flattened_child
+
+ return element
+
+ def process_image(self, img: lhtml.HtmlElement, url: str, index: int, total_images: int, **kwargs) -> Optional[List[Dict]]:
+ # Quick validation checks
+ style = img.get('style', '')
+ alt = img.get('alt', '')
+ src = img.get('src', '')
+ data_src = img.get('data-src', '')
+ srcset = img.get('srcset', '')
+ data_srcset = img.get('data-srcset', '')
+
+ if 'display:none' in style:
+ return None
+
+ parent = img.getparent()
+ if parent.tag in ['button', 'input']:
+ return None
+
+ parent_classes = parent.get('class', '').split()
+ if any('button' in cls or 'icon' in cls or 'logo' in cls for cls in parent_classes):
+ return None
+
+ # If src is in class or alt, likely an icon
+ if (src and any(c in src for c in ['button', 'icon', 'logo'])) or \
+ (alt and any(c in alt for c in ['button', 'icon', 'logo'])):
+ return None
+
+ # Score calculation
+ score = 0
+ if (width := img.get('width')) and width.isdigit():
+ score += 1 if int(width) > 150 else 0
+ if (height := img.get('height')) and height.isdigit():
+ score += 1 if int(height) > 150 else 0
+ if alt:
+ score += 1
+ score += index/total_images < 0.5
+
+ # Check formats in all possible sources
+ image_formats = {'jpg', 'jpeg', 'png', 'webp', 'avif', 'gif'}
+ detected_format = None
+ for url in [src, data_src, srcset, data_srcset]:
+ if url:
+ format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
+ if format_matches:
+ detected_format = format_matches[0]
+ score += 1
+ break
+
+ if srcset or data_srcset:
+ score += 1
+
+ if picture := img.xpath('./ancestor::picture[1]'):
+ score += 1
+
+ if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD):
+ return None
+
+ # Process image variants
+ unique_urls = set()
+ image_variants = []
+ base_info = {
+ 'alt': alt,
+ 'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
+ 'score': score,
+ 'type': 'image',
+ 'group_id': index,
+ 'format': detected_format,
+ }
+
+ def add_variant(src: str, width: Optional[str] = None):
+ if src and not src.startswith('data:') and src not in unique_urls:
+ unique_urls.add(src)
+ variant = {**base_info, 'src': src}
+ if width:
+ variant['width'] = width
+ image_variants.append(variant)
+
+ # Add variants from different sources
+ add_variant(src)
+ add_variant(data_src)
+
+ for srcset_attr in [srcset, data_srcset]:
+ if srcset_attr:
+ for source in parse_srcset(srcset_attr):
+ add_variant(source['url'], source['width'])
+
+ # Handle picture element
+ if picture:
+ for source in picture[0].xpath('.//source[@srcset]'):
+ if source_srcset := source.get('srcset'):
+ for src_data in parse_srcset(source_srcset):
+ add_variant(src_data['url'], src_data['width'])
+
+ # Check framework-specific attributes
+ for attr, value in img.attrib.items():
+ if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value:
+ add_variant(value)
+
+ return image_variants if image_variants else None
+
+ def remove_empty_elements_fast(self, root, word_count_threshold=5):
+ """
+ Remove elements that fall below the desired word threshold in a single pass from the bottom up.
+ Skips non-element nodes like HtmlComment and bypasses certain tags that are allowed to have no content.
+ """
+ bypass_tags = {'a', 'img', 'br', 'hr', 'input', 'meta', 'link', 'source', 'track', 'wbr'}
+
+ for el in reversed(list(root.iterdescendants())):
+ if not isinstance(el, lhtml.HtmlElement):
+ continue
+
+ if el.tag in bypass_tags:
+ continue
+
+ text_content = (el.text_content() or "").strip()
+ if len(text_content.split()) < word_count_threshold and not el.getchildren():
+ parent = el.getparent()
+ if parent is not None:
+ parent.remove(el)
+
+ return root
+
+ def remove_unwanted_attributes_fast(
+ self,
+ root: lhtml.HtmlElement,
+ important_attrs=None,
+ keep_data_attributes=False
+ ) -> lhtml.HtmlElement:
+ """
+ Removes all attributes from each element (including root) except those in `important_attrs`.
+ If `keep_data_attributes=True`, also retain any attribute starting with 'data-'.
+
+ Returns the same root element, mutated in-place, for fluent usage.
+ """
+ if important_attrs is None:
+ important_attrs = set(IMPORTANT_ATTRS)
+
+ # If you want to handle the root as well, use 'include_self=True'
+ # so you don't miss attributes on the top-level element.
+ # Manually include the root, then all its descendants
+ for el in chain((root,), root.iterdescendants()):
+ # We only remove attributes on HtmlElement nodes, skip comments or text nodes
+ if not isinstance(el, lhtml.HtmlElement):
+ continue
+
+ old_attribs = dict(el.attrib)
+ new_attribs = {}
+
+ for attr_name, attr_val in old_attribs.items():
+ # If it's an important attribute, keep it
+ if attr_name in important_attrs:
+ new_attribs[attr_name] = attr_val
+ # Or if keep_data_attributes is True and it's a 'data-*' attribute
+ elif keep_data_attributes and attr_name.startswith('data-'):
+ new_attribs[attr_name] = attr_val
+
+ # Clear old attributes and set the filtered set
+ el.attrib.clear()
+ el.attrib.update(new_attribs)
+
+ return root
+
+ def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD,
+ css_selector: str = None, **kwargs) -> Dict[str, Any]:
+ if not html:
+ return None
+
+ success = True
+ try:
+ doc = lhtml.document_fromstring(html)
+ # Match BeautifulSoup's behavior of using body or full doc
+ # body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
+ body = doc
+
+ base_domain = get_base_domain(url)
+
+ # Add comment removal
+ if kwargs.get('remove_comments', False):
+ comments = body.xpath('//comment()')
+ for comment in comments:
+ comment.getparent().remove(comment)
+
+ # Handle tag-based removal first
+ excluded_tags = set(kwargs.get('excluded_tags', []) or [])
+ if excluded_tags:
+ for tag in excluded_tags:
+ for element in body.xpath(f'.//{tag}'):
+ if element.getparent() is not None:
+ element.getparent().remove(element)
+
+ # Handle CSS selector-based exclusion
+ excluded_selector = kwargs.get('excluded_selector', '')
+ if excluded_selector:
+ try:
+ for element in body.cssselect(excluded_selector):
+ if element.getparent() is not None:
+ element.getparent().remove(element)
+ except Exception as e:
+ self._log('error', f"Error with excluded CSS selector: {str(e)}", "SCRAPE")
+
+ # Extract metadata before any content filtering
+ try:
+ meta = extract_metadata_using_lxml("", doc) # Using same function as BeautifulSoup version
+ except Exception as e:
+ self._log('error', f"Error extracting metadata: {str(e)}", "SCRAPE")
+ meta = {}
+
+ # Handle CSS selector targeting
+ if css_selector:
+ try:
+ selected_elements = body.cssselect(css_selector)
+ if not selected_elements:
+ return {
+ 'markdown': '',
+ 'cleaned_html': '',
+ 'success': True,
+ 'media': {'images': [], 'videos': [], 'audios': []},
+ 'links': {'internal': [], 'external': []},
+ 'metadata': meta,
+ 'message': f"No elements found for CSS selector: {css_selector}"
+ }
+ body = lhtml.Element('div')
+ body.extend(selected_elements)
+ except Exception as e:
+ self._log('error', f"Error with CSS selector: {str(e)}", "SCRAPE")
+ return None
+
+ # Remove script and style tags
+ for tag in ['script', 'style', 'link', 'meta', 'noscript']:
+ for element in body.xpath(f'.//{tag}'):
+ if element.getparent() is not None:
+ element.getparent().remove(element)
+
+ # Handle social media and domain exclusions
+ kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', []))
+ if kwargs.get('exclude_social_media_links', False):
+ kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS)
+ kwargs['exclude_domains'].update(kwargs['exclude_social_media_domains'])
+
+ # Process forms if needed
+ if kwargs.get('remove_forms', False):
+ for form in body.xpath('.//form'):
+ if form.getparent() is not None:
+ form.getparent().remove(form)
+
+
+ # Process content
+ media = {'images': [], 'videos': [], 'audios': []}
+ internal_links_dict = {}
+ external_links_dict = {}
+
+ self._process_element(
+ url,
+ body,
+ media,
+ internal_links_dict,
+ external_links_dict,
+ base_domain=base_domain,
+ **kwargs
+ )
+
+ # Handle only_text option
+ if kwargs.get('only_text', False):
+ for tag in ONLY_TEXT_ELIGIBLE_TAGS:
+ for element in body.xpath(f'.//{tag}'):
+ if element.text:
+ new_text = lhtml.Element('span')
+ new_text.text = element.text_content()
+ if element.getparent() is not None:
+ element.getparent().replace(element, new_text)
+
+ # Clean base64 images
+ for img in body.xpath('.//img[@src]'):
+ src = img.get('src', '')
+ if self.BASE64_PATTERN.match(src):
+ img.set('src', self.BASE64_PATTERN.sub('', src))
+
+
+ # Remove empty elements
+ self.remove_empty_elements_fast(body, 1)
+
+ # Remvoe unneeded attributes
+ self.remove_unwanted_attributes_fast(body, keep_data_attributes=kwargs.get('keep_data_attributes', False))
+
+
+ # Generate output HTML
+ cleaned_html = lhtml.tostring(body, encoding='unicode',
+ pretty_print=True,
+ method='html',
+ with_tail=False).strip()
+ return {
+ 'cleaned_html': cleaned_html,
+ 'success': success,
+ 'media': media,
+ 'links': {
+ 'internal': list(internal_links_dict.values()),
+ 'external': list(external_links_dict.values())
+ },
+ 'metadata': meta
+ }
+
+ except Exception as e:
+ self._log('error', f"Error processing HTML: {str(e)}", "SCRAPE")
+ # Create error message in case of failure
+ error_body = lhtml.Element('div')
+ # Use etree.SubElement rather than lhtml.SubElement
+ error_div = etree.SubElement(error_body, 'div', id='crawl4ai_error_message')
+ error_div.text = f'''
+ Crawl4AI Error: This page is not fully supported.
+
+ Error Message: {str(e)}
+
+ Possible reasons:
+ 1. The page may have restrictions that prevent crawling.
+ 2. The page might not be fully loaded.
+
+ Suggestions:
+ - Try calling the crawl function with these parameters:
+ magic=True,
+ - Set headless=False to visualize what's happening on the page.
+
+ If the issue persists, please check the page's structure and any potential anti-crawling measures.
+ '''
+ cleaned_html = lhtml.tostring(error_body, encoding='unicode', pretty_print=True)
+ return {
+ 'cleaned_html': cleaned_html,
+ 'success': False,
+ 'media': {'images': [], 'videos': [], 'audios': []},
+ 'links': {'internal': [], 'external': []},
+ 'metadata': {}
+ }
\ No newline at end of file
diff --git a/crawl4ai/dispatcher copy.py b/crawl4ai/dispatcher copy.py
deleted file mode 100644
index cbdc330e..00000000
--- a/crawl4ai/dispatcher copy.py
+++ /dev/null
@@ -1,490 +0,0 @@
-from typing import Dict, Optional, Any, List, Tuple
-from .models import CrawlResult
-from .async_webcrawler import AsyncWebCrawler
-from .async_configs import BrowserConfig, CrawlerRunConfig
-from .markdown_generation_strategy import DefaultMarkdownGenerator
-from .content_filter_strategy import PruningContentFilter
-from rich.live import Live
-from rich.table import Table
-from rich.console import Console
-from rich.style import Style
-from rich import box
-from datetime import datetime, timedelta
-from dataclasses import dataclass
-from enum import Enum
-import time
-import psutil
-import asyncio
-import uuid
-from urllib.parse import urlparse
-import random
-
-
-@dataclass
-class DomainState:
- last_request_time: float = 0
- current_delay: float = 0
- fail_count: int = 0
-
-@dataclass
-class CrawlerTaskResult:
- task_id: str
- url: str
- result: CrawlResult
- memory_usage: float
- peak_memory: float
- start_time: datetime
- end_time: datetime
- error_message: str = ""
-
-class CrawlStatus(Enum):
- QUEUED = "QUEUED"
- IN_PROGRESS = "IN_PROGRESS"
- COMPLETED = "COMPLETED"
- FAILED = "FAILED"
-
-@dataclass
-class CrawlStats:
- task_id: str
- url: str
- status: CrawlStatus
- start_time: Optional[datetime] = None
- end_time: Optional[datetime] = None
- memory_usage: float = 0.0
- peak_memory: float = 0.0
- error_message: str = ""
-
- @property
- def duration(self) -> str:
- if not self.start_time:
- return "0:00"
- end = self.end_time or datetime.now()
- duration = end - self.start_time
- return str(timedelta(seconds=int(duration.total_seconds())))
-
-class DisplayMode(Enum):
- DETAILED = "DETAILED"
- AGGREGATED = "AGGREGATED"
-
-class RateLimiter:
- def __init__(
- self,
- base_delay: Tuple[float, float] = (1.0, 3.0),
- max_delay: float = 60.0,
- max_retries: int = 3,
- rate_limit_codes: List[int] = [429, 503]
- ):
- self.base_delay = base_delay
- self.max_delay = max_delay
- self.max_retries = max_retries
- self.rate_limit_codes = rate_limit_codes
- self.domains: Dict[str, DomainState] = {}
-
- def get_domain(self, url: str) -> str:
- return urlparse(url).netloc
-
- async def wait_if_needed(self, url: str) -> None:
- domain = self.get_domain(url)
- state = self.domains.get(domain)
-
- if not state:
- self.domains[domain] = DomainState()
- state = self.domains[domain]
-
- now = time.time()
- if state.last_request_time:
- wait_time = max(0, state.current_delay - (now - state.last_request_time))
- if wait_time > 0:
- await asyncio.sleep(wait_time)
-
- # Random delay within base range if no current delay
- if state.current_delay == 0:
- state.current_delay = random.uniform(*self.base_delay)
-
- state.last_request_time = time.time()
-
- def update_delay(self, url: str, status_code: int) -> bool:
- domain = self.get_domain(url)
- state = self.domains[domain]
-
- if status_code in self.rate_limit_codes:
- state.fail_count += 1
- if state.fail_count > self.max_retries:
- return False
-
- # Exponential backoff with random jitter
- state.current_delay = min(
- state.current_delay * 2 * random.uniform(0.75, 1.25),
- self.max_delay
- )
- else:
- # Gradually reduce delay on success
- state.current_delay = max(
- random.uniform(*self.base_delay),
- state.current_delay * 0.75
- )
- state.fail_count = 0
-
- return True
-
-class CrawlerMonitor:
- def __init__(self, max_visible_rows: int = 15, display_mode: DisplayMode = DisplayMode.DETAILED):
- self.console = Console()
- self.max_visible_rows = max_visible_rows
- self.display_mode = display_mode
- self.stats: Dict[str, CrawlStats] = {}
- self.process = psutil.Process()
- self.start_time = datetime.now()
- self.live = Live(self._create_table(), refresh_per_second=2)
-
- def start(self):
- self.live.start()
-
- def stop(self):
- self.live.stop()
-
- def add_task(self, task_id: str, url: str):
- self.stats[task_id] = CrawlStats(task_id=task_id, url=url, status=CrawlStatus.QUEUED)
- self.live.update(self._create_table())
-
- def update_task(self, task_id: str, **kwargs):
- if task_id in self.stats:
- for key, value in kwargs.items():
- setattr(self.stats[task_id], key, value)
- self.live.update(self._create_table())
-
- def _create_aggregated_table(self) -> Table:
- """Creates a compact table showing only aggregated statistics"""
- table = Table(
- box=box.ROUNDED,
- title="Crawler Status Overview",
- title_style="bold magenta",
- header_style="bold blue",
- show_lines=True
- )
-
- # Calculate statistics
- total_tasks = len(self.stats)
- queued = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED)
- in_progress = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS)
- completed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED)
- failed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED)
-
- # Memory statistics
- current_memory = self.process.memory_info().rss / (1024 * 1024)
- total_task_memory = sum(stat.memory_usage for stat in self.stats.values())
- peak_memory = max((stat.peak_memory for stat in self.stats.values()), default=0.0)
-
- # Duration
- duration = datetime.now() - self.start_time
-
- # Create status row
- table.add_column("Status", style="bold cyan")
- table.add_column("Count", justify="right")
- table.add_column("Percentage", justify="right")
-
- table.add_row(
- "Total Tasks",
- str(total_tasks),
- "100%"
- )
- table.add_row(
- "[yellow]In Queue[/yellow]",
- str(queued),
- f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
- )
- table.add_row(
- "[blue]In Progress[/blue]",
- str(in_progress),
- f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
- )
- table.add_row(
- "[green]Completed[/green]",
- str(completed),
- f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
- )
- table.add_row(
- "[red]Failed[/red]",
- str(failed),
- f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
- )
-
- # Add memory information
- table.add_section()
- table.add_row(
- "[magenta]Current Memory[/magenta]",
- f"{current_memory:.1f} MB",
- ""
- )
- table.add_row(
- "[magenta]Total Task Memory[/magenta]",
- f"{total_task_memory:.1f} MB",
- ""
- )
- table.add_row(
- "[magenta]Peak Task Memory[/magenta]",
- f"{peak_memory:.1f} MB",
- ""
- )
- table.add_row(
- "[yellow]Runtime[/yellow]",
- str(timedelta(seconds=int(duration.total_seconds()))),
- ""
- )
-
- return table
-
- def _create_detailed_table(self) -> Table:
- table = Table(
- box=box.ROUNDED,
- title="Crawler Performance Monitor",
- title_style="bold magenta",
- header_style="bold blue"
- )
-
- # Add columns
- table.add_column("Task ID", style="cyan", no_wrap=True)
- table.add_column("URL", style="cyan", no_wrap=True)
- table.add_column("Status", style="bold")
- table.add_column("Memory (MB)", justify="right")
- table.add_column("Peak (MB)", justify="right")
- table.add_column("Duration", justify="right")
- table.add_column("Info", style="italic")
-
- # Add summary row
- total_memory = sum(stat.memory_usage for stat in self.stats.values())
- active_count = sum(1 for stat in self.stats.values()
- if stat.status == CrawlStatus.IN_PROGRESS)
- completed_count = sum(1 for stat in self.stats.values()
- if stat.status == CrawlStatus.COMPLETED)
- failed_count = sum(1 for stat in self.stats.values()
- if stat.status == CrawlStatus.FAILED)
-
- table.add_row(
- "[bold yellow]SUMMARY",
- f"Total: {len(self.stats)}",
- f"Active: {active_count}",
- f"{total_memory:.1f}",
- f"{self.process.memory_info().rss / (1024 * 1024):.1f}",
- str(timedelta(seconds=int((datetime.now() - self.start_time).total_seconds()))),
- f"✓{completed_count} ✗{failed_count}",
- style="bold"
- )
-
- table.add_section()
-
- # Add rows for each task
- visible_stats = sorted(
- self.stats.values(),
- key=lambda x: (
- x.status != CrawlStatus.IN_PROGRESS,
- x.status != CrawlStatus.QUEUED,
- x.end_time or datetime.max
- )
- )[:self.max_visible_rows]
-
- for stat in visible_stats:
- status_style = {
- CrawlStatus.QUEUED: "white",
- CrawlStatus.IN_PROGRESS: "yellow",
- CrawlStatus.COMPLETED: "green",
- CrawlStatus.FAILED: "red"
- }[stat.status]
-
- table.add_row(
- stat.task_id[:8], # Show first 8 chars of task ID
- stat.url[:40] + "..." if len(stat.url) > 40 else stat.url,
- f"[{status_style}]{stat.status.value}[/{status_style}]",
- f"{stat.memory_usage:.1f}",
- f"{stat.peak_memory:.1f}",
- stat.duration,
- stat.error_message[:40] if stat.error_message else ""
- )
-
- return table
-
- def _create_table(self) -> Table:
- """Creates the appropriate table based on display mode"""
- if self.display_mode == DisplayMode.AGGREGATED:
- return self._create_aggregated_table()
- return self._create_detailed_table()
-
-class MemoryAdaptiveDispatcher:
- def __init__(
- self,
- crawler: AsyncWebCrawler,
- memory_threshold_percent: float = 70.0,
- check_interval: float = 1.0,
- max_session_permit: int = 20,
- enable_rate_limiting: bool = False,
- rate_limit_config: Optional[Dict[str, Any]] = None
- ):
- self.crawler = crawler
- self.memory_threshold_percent = memory_threshold_percent
- self.check_interval = check_interval
- self.max_session_permit = max_session_permit
- self.concurrent_sessions = 0
- self.enable_rate_limiting = enable_rate_limiting
- self.rate_limiter = RateLimiter(**(rate_limit_config or {})) if enable_rate_limiting else None
-
- async def crawl_url(
- self,
- url: str,
- config: CrawlerRunConfig,
- task_id: str,
- monitor: Optional[CrawlerMonitor] = None
- ) -> CrawlerTaskResult:
- start_time = datetime.now()
- error_message = ""
- memory_usage = peak_memory = 0.0
-
- try:
- if monitor:
- monitor.update_task(task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time)
- self.concurrent_sessions += 1
-
- if self.enable_rate_limiting:
- await self.rate_limiter.wait_if_needed(url)
-
- process = psutil.Process()
- start_memory = process.memory_info().rss / (1024 * 1024)
- result = await self.crawler.arun(url, config=config, session_id=task_id)
- end_memory = process.memory_info().rss / (1024 * 1024)
-
- memory_usage = peak_memory = end_memory - start_memory
-
- if self.enable_rate_limiting and result.status_code:
- if not self.rate_limiter.update_delay(url, result.status_code):
- error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
- if monitor:
- monitor.update_task(task_id, status=CrawlStatus.FAILED)
- return CrawlerTaskResult(
- task_id=task_id,
- url=url,
- result=result,
- memory_usage=memory_usage,
- peak_memory=peak_memory,
- start_time=start_time,
- end_time=datetime.now(),
- error_message=error_message
- )
-
- if not result.success:
- error_message = result.error_message
- if monitor:
- monitor.update_task(task_id, status=CrawlStatus.FAILED)
- elif monitor:
- monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
-
- except Exception as e:
- error_message = str(e)
- if monitor:
- monitor.update_task(task_id, status=CrawlStatus.FAILED)
- result = CrawlResult(url = url, html = "", metadata = {}, success=False, error_message=str(e))
-
- finally:
- end_time = datetime.now()
- if monitor:
- monitor.update_task(
- task_id,
- end_time=end_time,
- memory_usage=memory_usage,
- peak_memory=peak_memory,
- error_message=error_message
- )
- self.concurrent_sessions -= 1
-
- return CrawlerTaskResult(
- task_id=task_id,
- url=url,
- result=result,
- memory_usage=memory_usage,
- peak_memory=peak_memory,
- start_time=start_time,
- end_time=end_time,
- error_message=error_message
- )
-
- async def run_urls(
- self,
- urls: List[str],
- config: CrawlerRunConfig,
- monitor: Optional[CrawlerMonitor] = None
- ) -> List[CrawlerTaskResult]:
- if monitor:
- monitor.start()
-
- try:
- pending_tasks = []
- active_tasks = []
- task_queue = []
-
- # Queue all tasks
- for url in urls:
- task_id = str(uuid.uuid4())
- if monitor:
- monitor.add_task(task_id, url)
- task_queue.append((url, task_id))
-
- while task_queue or active_tasks:
- # Fill up to max_session_permit
- while len(active_tasks) < self.max_session_permit and task_queue:
- if psutil.virtual_memory().percent >= self.memory_threshold_percent:
- break
-
- url, task_id = task_queue.pop(0)
- task = asyncio.create_task(self.crawl_url(url, config, task_id, monitor))
- active_tasks.append(task)
-
- if not active_tasks:
- await asyncio.sleep(self.check_interval)
- continue
-
- done, pending = await asyncio.wait(
- active_tasks,
- return_when=asyncio.FIRST_COMPLETED
- )
-
- pending_tasks.extend(done)
- active_tasks = list(pending)
-
- return await asyncio.gather(*pending_tasks)
- finally:
- if monitor:
- monitor.stop()
-
-async def main():
- browser_config = BrowserConfig(headless=True, verbose=False)
- run_config = CrawlerRunConfig(
- markdown_generator=DefaultMarkdownGenerator(
- content_filter=PruningContentFilter(threshold=0.48)
- ),
- cache_mode=CacheMode.BYPASS
- )
-
- urls = ["https://example.com/page1"] * 10
-
- async with AsyncWebCrawler(config=browser_config) as crawler:
- dispatcher = MemoryAdaptiveDispatcher(
- crawler=crawler,
- memory_threshold_percent=70.0,
- check_interval=1.0,
- max_session_permit=10
- )
- dispatcher = MemoryAdaptiveDispatcher(
- crawler=crawler,
- enable_rate_limiting=True,
- rate_limit_config={
- 'base_delay': (1.0, 3.0), # Random range
- 'max_delay': 60.0,
- 'max_retries': 3,
- 'rate_limit_codes': [429, 503]
- }
- )
-
- # Optional monitor
- monitor = CrawlerMonitor(max_visible_rows=15, display_mode=DisplayMode.DETAILED)
- results = await dispatcher.run_urls(urls, run_config, monitor=monitor)
-
-if __name__ == "__main__":
- asyncio.run(main())
\ No newline at end of file
diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py
index 6fd7429f..4dbac2a6 100644
--- a/crawl4ai/utils.py
+++ b/crawl4ai/utils.py
@@ -868,6 +868,63 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
'metadata': meta
}
+def extract_metadata_using_lxml(html, doc=None):
+ """
+ Extract metadata from HTML using lxml for better performance.
+ """
+ metadata = {}
+
+ if not html and doc is None:
+ return {}
+
+ if doc is None:
+ try:
+ doc = lhtml.document_fromstring(html)
+ except Exception:
+ return {}
+
+ # Use XPath to find head element
+ head = doc.xpath('//head')
+ if not head:
+ return metadata
+
+ head = head[0]
+
+ # Title - using XPath
+ title = head.xpath('.//title/text()')
+ metadata['title'] = title[0].strip() if title else None
+
+ # Meta description - using XPath with multiple attribute conditions
+ description = head.xpath('.//meta[@name="description"]/@content')
+ metadata['description'] = description[0].strip() if description else None
+
+ # Meta keywords
+ keywords = head.xpath('.//meta[@name="keywords"]/@content')
+ metadata['keywords'] = keywords[0].strip() if keywords else None
+
+ # Meta author
+ author = head.xpath('.//meta[@name="author"]/@content')
+ metadata['author'] = author[0].strip() if author else None
+
+ # Open Graph metadata - using starts-with() for performance
+ og_tags = head.xpath('.//meta[starts-with(@property, "og:")]')
+ for tag in og_tags:
+ property_name = tag.get('property', '').strip()
+ content = tag.get('content', '').strip()
+ if property_name and content:
+ metadata[property_name] = content
+
+ # Twitter Card metadata
+ twitter_tags = head.xpath('.//meta[starts-with(@name, "twitter:")]')
+ for tag in twitter_tags:
+ property_name = tag.get('name', '').strip()
+ content = tag.get('content', '').strip()
+ if property_name and content:
+ metadata[property_name] = content
+
+ return metadata
+
+
def extract_metadata(html, soup=None):
"""
Extract optimized content, media, and links from website HTML.
diff --git a/docs/md_v2/core/content-selection.md b/docs/md_v2/core/content-selection.md
index 9774f9a7..254081ae 100644
--- a/docs/md_v2/core/content-selection.md
+++ b/docs/md_v2/core/content-selection.md
@@ -318,7 +318,45 @@ if __name__ == "__main__":
---
-## 6. Conclusion
+## 6. Scraping Modes
+
+Crawl4AI provides two different scraping modes for HTML content processing: BeautifulSoup (default) and LXML. The LXML mode offers significantly better performance, especially for large HTML documents.
+
+```python
+from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, ScrapingMode
+
+async def main():
+ config = CrawlerRunConfig(
+ scraping_mode=ScrapingMode.LXML # Faster alternative to default BeautifulSoup
+ )
+ async with AsyncWebCrawler() as crawler:
+ result = await crawler.arun(
+ url="https://example.com",
+ config=config
+ )
+```
+
+### Performance Considerations
+
+The LXML mode can be up to 10-20x faster than BeautifulSoup mode, particularly when processing large HTML documents. However, please note:
+
+1. LXML mode is currently experimental
+2. In some edge cases, the parsing results might differ slightly from BeautifulSoup
+3. If you encounter any inconsistencies between LXML and BeautifulSoup results, please [raise an issue](https://github.com/codeium/crawl4ai/issues) with a reproducible example
+
+Choose LXML mode when:
+- Processing large HTML documents (recommended for >100KB)
+- Performance is critical
+- Working with well-formed HTML
+
+Stick to BeautifulSoup mode (default) when:
+- Maximum compatibility is needed
+- Working with malformed HTML
+- Exact parsing behavior is critical
+
+---
+
+## 7. Conclusion
By mixing **css_selector** scoping, **content filtering** parameters, and advanced **extraction strategies**, you can precisely **choose** which data to keep. Key parameters in **`CrawlerRunConfig`** for content selection include:
diff --git a/pyproject.toml b/pyproject.toml
index 5f663e92..7ca779d5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,6 +35,7 @@ dependencies = [
"playwright",
"aiofiles",
"rich>=13.9.4",
+ "cssselect>=1.2.0",
]
classifiers = [
"Development Status :: 3 - Alpha",
diff --git a/requirements.txt b/requirements.txt
index 43a1fd63..19832b50 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -19,4 +19,5 @@ pydantic>=2.10
pyOpenSSL>=24.3.0
psutil>=6.1.1
nltk>=3.9.1
-rich>=13.9.4
\ No newline at end of file
+rich>=13.9.4
+cssselect>=1.2.0
\ No newline at end of file
diff --git a/scraper_equivalence_results.json b/scraper_equivalence_results.json
new file mode 100644
index 00000000..2ad1080a
--- /dev/null
+++ b/scraper_equivalence_results.json
@@ -0,0 +1,16 @@
+{
+ "tests": [
+ {
+ "case": "complicated_exclude_all_links",
+ "lxml_mode": {
+ "differences": {},
+ "execution_time": 0.0019578933715820312
+ },
+ "original_time": 0.0059909820556640625
+ }
+ ],
+ "summary": {
+ "passed": 1,
+ "failed": 0
+ }
+}
\ No newline at end of file
diff --git a/scraper_evaluation.json b/scraper_evaluation.json
new file mode 100644
index 00000000..9606d906
--- /dev/null
+++ b/scraper_evaluation.json
@@ -0,0 +1,52 @@
+{
+ "original": {
+ "performance": [],
+ "differences": []
+ },
+ "batch": {
+ "performance": [
+ {
+ "case": "basic",
+ "metrics": {
+ "time": 0.8874530792236328,
+ "memory": 98.328125
+ }
+ }
+ ],
+ "differences": [
+ {
+ "case": "basic",
+ "differences": {
+ "images_count": {
+ "old": 50,
+ "new": 0,
+ "diff": -50
+ }
+ }
+ }
+ ]
+ },
+ "lxml": {
+ "performance": [
+ {
+ "case": "basic",
+ "metrics": {
+ "time": 1.210719108581543,
+ "memory": 99.921875
+ }
+ }
+ ],
+ "differences": [
+ {
+ "case": "basic",
+ "differences": {
+ "images_count": {
+ "old": 50,
+ "new": 0,
+ "diff": -50
+ }
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/tests/async/test_evaluation_scraping_methods_performance.configs.py b/tests/async/test_evaluation_scraping_methods_performance.configs.py
new file mode 100644
index 00000000..e6305736
--- /dev/null
+++ b/tests/async/test_evaluation_scraping_methods_performance.configs.py
@@ -0,0 +1,690 @@
+import json
+import time
+from bs4 import BeautifulSoup
+from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
+from typing import Dict, Any, List, Tuple
+import difflib
+from lxml import html as lhtml, etree
+
+def normalize_dom(element):
+ """
+ Recursively normalizes an lxml HTML element:
+ - Removes comment nodes
+ - Sorts attributes on each node
+ - Removes
if you want (optional)
+ Returns the same element (mutated).
+ """
+ # Remove comment nodes
+ comments = element.xpath('//comment()')
+ for c in comments:
+ p = c.getparent()
+ if p is not None:
+ p.remove(c)
+
+ # If you'd like to remove , or unify /, you could do so here.
+ # For example, remove entirely:
+ # heads = element.xpath('//head')
+ # for h in heads:
+ # parent = h.getparent()
+ # if parent is not None:
+ # parent.remove(h)
+
+ # Sort attributes (to avoid false positives due to attr order)
+ for el in element.iter():
+ if el.attrib:
+ # Convert to a sorted list of (k, v), then reassign
+ sorted_attribs = sorted(el.attrib.items())
+ el.attrib.clear()
+ for k, v in sorted_attribs:
+ el.set(k, v)
+
+ return element
+
+
+def strip_html_body(root):
+ """
+ If 'root' is , find its child and move all of 's children
+ into a new
. Return that
.
+
+ If 'root' is , similarly move all of its children into a new
and return it.
+
+ Otherwise, return 'root' as-is.
+ """
+ tag_name = (root.tag or "").lower()
+
+ # Case 1: The root is
+ if tag_name == 'html':
+ bodies = root.xpath('./body')
+ if bodies:
+ body = bodies[0]
+ new_div = lhtml.Element("div")
+ for child in body:
+ new_div.append(child)
+ return new_div
+ else:
+ # No found; just return the root
+ return root
+
+ # Case 2: The root is
+ elif tag_name == 'body':
+ new_div = lhtml.Element("div")
+ for child in root:
+ new_div.append(child)
+ return new_div
+
+ # Case 3: Neither nor
+ else:
+ return root
+
+
+def compare_nodes(node1, node2, differences, path="/"):
+ """
+ Recursively compare two lxml nodes, appending textual differences to `differences`.
+ `path` is used to indicate the location in the tree (like an XPath).
+ """
+ # 1) Compare tag names
+ if node1.tag != node2.tag:
+ differences.append(f"Tag mismatch at {path}: '{node1.tag}' vs. '{node2.tag}'")
+ return
+
+ # 2) Compare attributes
+ # By now, they are sorted in normalize_dom()
+ attrs1 = list(node1.attrib.items())
+ attrs2 = list(node2.attrib.items())
+ if attrs1 != attrs2:
+ differences.append(f"Attribute mismatch at {path}/{node1.tag}: {attrs1} vs. {attrs2}")
+
+ # 3) Compare text (trim or unify whitespace as needed)
+ text1 = (node1.text or "").strip()
+ text2 = (node2.text or "").strip()
+ # Normalize whitespace
+ text1 = " ".join(text1.split())
+ text2 = " ".join(text2.split())
+ if text1 != text2:
+ # If you prefer ignoring newlines or multiple whitespace, do a more robust cleanup
+ differences.append(f"Text mismatch at {path}/{node1.tag}: '{text1}' vs. '{text2}'")
+
+ # 4) Compare number of children
+ children1 = list(node1)
+ children2 = list(node2)
+ if len(children1) != len(children2):
+ differences.append(
+ f"Child count mismatch at {path}/{node1.tag}: {len(children1)} vs. {len(children2)}"
+ )
+ return # If counts differ, no point comparing child by child
+
+ # 5) Recursively compare each child
+ for i, (c1, c2) in enumerate(zip(children1, children2)):
+ # Build a path for child
+ child_path = f"{path}/{node1.tag}[{i}]"
+ compare_nodes(c1, c2, differences, child_path)
+
+ # 6) Compare tail text
+ tail1 = (node1.tail or "").strip()
+ tail2 = (node2.tail or "").strip()
+ if tail1 != tail2:
+ differences.append(f"Tail mismatch after {path}/{node1.tag}: '{tail1}' vs. '{tail2}'")
+
+
+def compare_html_structurally(html1, html2):
+ """
+ Compare two HTML strings using a structural approach with lxml.
+ Returns a list of differences (if any). If empty, they're effectively the same.
+ """
+ # 1) Parse both
+ try:
+ tree1 = lhtml.fromstring(html1)
+ except etree.ParserError:
+ return ["Error parsing HTML1"]
+
+ try:
+ tree2 = lhtml.fromstring(html2)
+ except etree.ParserError:
+ return ["Error parsing HTML2"]
+
+ # 2) Normalize both DOMs (remove comments, sort attributes, etc.)
+ tree1 = normalize_dom(tree1)
+ tree2 = normalize_dom(tree2)
+
+ # 3) Possibly strip / wrappers for better apples-to-apples comparison
+ tree1 = strip_html_body(tree1)
+ tree2 = strip_html_body(tree2)
+
+ # 4) Compare recursively
+ differences = []
+ compare_nodes(tree1, tree2, differences, path="")
+ return differences
+
+
+
+def generate_large_html(n_elements=1000):
+ html = ['']
+ for i in range(n_elements):
+ html.append(f'''
+
+
Heading {i}
+
This is paragraph {i} with some content and a link
+
+
+
List item {i}.1
+
List item {i}.2
+
+
+ ''')
+ html.append('')
+ return ''.join(html)
+
+def generate_complicated_html():
+ """
+ HTML with multiple domains, forms, data attributes,
+ various images, comments, style, and noscript to test all parameter toggles.
+ """
+ return """
+
+
+
+ Complicated Test Page
+
+
+
+
+
+
+
+
+
Main Title of the Page
+
+
+
+
+
+
+
+
+
+
Article Title
+
+ This paragraph has a good amount of text to exceed word_count_threshold if it's
+ set to something small. But it might not exceed a very high threshold.
+