feat(scraping): add LXML-based scraping mode for improved performance

Adds a new ScrapingMode enum to allow switching between BeautifulSoup and LXML parsing.
LXML mode offers 10-20x better performance for large HTML documents.

Key changes:
- Added ScrapingMode enum with BEAUTIFULSOUP and LXML options
- Implemented LXMLWebScrapingStrategy class
- Added LXML-based metadata extraction
- Updated documentation with scraping mode usage and performance considerations
- Added cssselect dependency

BREAKING CHANGE: None
This commit is contained in:
UncleCode
2025-01-12 20:46:23 +08:00
parent 825c78a048
commit f3ae5a657c
12 changed files with 1366 additions and 509 deletions

View File

@@ -1,7 +1,7 @@
# __init__.py
from .async_webcrawler import AsyncWebCrawler, CacheMode
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, ScrapingMode
from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy, CosineStrategy, JsonCssExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
@@ -14,6 +14,7 @@ __all__ = [
"AsyncWebCrawler",
"CrawlResult",
"CacheMode",
"ScrapingMode",
'BrowserConfig',
'CrawlerRunConfig',
'ExtractionStrategy',

View File

@@ -12,6 +12,7 @@ from .extraction_strategy import ExtractionStrategy
from .chunking_strategy import ChunkingStrategy
from .markdown_generation_strategy import MarkdownGenerationStrategy
from typing import Union, List
from enum import Enum
class BrowserConfig:
"""
@@ -183,6 +184,12 @@ class BrowserConfig:
)
class ScrapingMode(str, Enum):
"""Enum for different scraping modes."""
BEAUTIFULSOUP = "beautifulsoup"
LXML = "lxml"
class CrawlerRunConfig:
"""
Configuration class for controlling how the crawler runs each crawl operation.
@@ -220,6 +227,8 @@ class CrawlerRunConfig:
Default: False.
parser_type (str): Type of parser to use for HTML parsing.
Default: "lxml".
scraping_mode (ScrapingMode): Scraping mode to use.
Default: ScrapingMode.BEAUTIFULSOUP.
# Caching Parameters
cache_mode (CacheMode or None): Defines how caching is handled.
@@ -331,6 +340,7 @@ class CrawlerRunConfig:
remove_forms: bool = False,
prettiify: bool = False,
parser_type: str = "lxml",
scraping_mode: ScrapingMode = ScrapingMode.BEAUTIFULSOUP,
# SSL Parameters
fetch_ssl_certificate: bool = False,
@@ -403,6 +413,7 @@ class CrawlerRunConfig:
self.remove_forms = remove_forms
self.prettiify = prettiify
self.parser_type = parser_type
self.scraping_mode = scraping_mode
# SSL Parameters
self.fetch_ssl_certificate = fetch_ssl_certificate
@@ -489,6 +500,7 @@ class CrawlerRunConfig:
remove_forms=kwargs.get("remove_forms", False),
prettiify=kwargs.get("prettiify", False),
parser_type=kwargs.get("parser_type", "lxml"),
scraping_mode=kwargs.get("scraping_mode", ScrapingMode.BEAUTIFULSOUP),
# SSL Parameters
fetch_ssl_certificate=kwargs.get("fetch_ssl_certificate", False),
@@ -562,6 +574,7 @@ class CrawlerRunConfig:
"remove_forms": self.remove_forms,
"prettiify": self.prettiify,
"parser_type": self.parser_type,
"scraping_mode": self.scraping_mode,
"fetch_ssl_certificate": self.fetch_ssl_certificate,
"cache_mode": self.cache_mode,
"session_id": self.session_id,

View File

@@ -17,7 +17,7 @@ from .extraction_strategy import *
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
from .cache_context import CacheMode, CacheContext, _legacy_to_cache_mode
from .markdown_generation_strategy import DefaultMarkdownGenerator, MarkdownGenerationStrategy
from .content_scraping_strategy import WebScrapingStrategy
from .content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
from .async_logger import AsyncLogger
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import *
@@ -543,8 +543,11 @@ class AsyncWebCrawler:
_url = url if not kwargs.get("is_raw_html", False) else "Raw HTML"
t1 = time.perf_counter()
# Initialize scraping strategy
scrapping_strategy = WebScrapingStrategy(logger=self.logger)
# Initialize scraping strategy based on mode
if config.scraping_mode == ScrapingMode.LXML:
scrapping_strategy = LXMLWebScrapingStrategy(logger=self.logger)
else: # Default to BeautifulSoup
scrapping_strategy = WebScrapingStrategy(logger=self.logger)
# Process HTML content
params = {k:v for k, v in config.to_dict().items() if k not in ["url"]}

View File

@@ -1,4 +1,5 @@
import re # Point 1: Pre-Compile Regular Expressions
import re
from itertools import chain
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
@@ -6,27 +7,43 @@ from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import asyncio, requests, re, os
from .config import *
from bs4 import element, NavigableString, Comment
from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
# from .content_cleaning_strategy import ContentCleaningStrategy
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .models import MarkdownGenerationResult
from .utils import (
extract_metadata,
normalize_url,
is_external_url,
get_base_domain,
extract_metadata_using_lxml
)
from lxml import etree
from lxml import html as lhtml
from typing import Dict, Any, List, Tuple
# Pre-compile regular expressions for Open Graph and Twitter metadata
OG_REGEX = re.compile(r'^og:')
TWITTER_REGEX = re.compile(r'^twitter:')
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
# Function to parse srcset
def parse_srcset(s: str) -> List[Dict]:
if not s:
return []
variants = []
for part in s.split(','):
part = part.strip()
if not part:
continue
parts = part.split()
if len(parts) >= 1:
url = parts[0]
width = parts[1].rstrip('w') if len(parts) > 1 and parts[1].endswith('w') else None
variants.append({'url': url, 'width': width})
return variants
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
@@ -207,9 +224,9 @@ class WebScrapingStrategy(ContentScrapingStrategy):
Returns:
dict: A dictionary containing the processed image information.
"""
parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
if ' ' in u else None}
for u in [f"http{p}" for p in s.split("http") if p]]
# parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
# if ' ' in u else None}
# for u in [f"http{p}" for p in s.split("http") if p]]
# Constants for checks
classes_to_check = frozenset(['button', 'icon', 'logo'])
@@ -290,7 +307,6 @@ class WebScrapingStrategy(ContentScrapingStrategy):
group_id = index
# Base image info template
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
base_info = {
'alt': alt,
'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
@@ -661,7 +677,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
imgs = body.find_all('img')
media['images'] = [
img for result in (self.process_image(img, url, i, len(imgs))
img for result in (self.process_image(img, url, i, len(imgs), **kwargs)
for i, img in enumerate(imgs))
if result is not None
for img in result
@@ -701,7 +717,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'''
# Append the error div to the body
body.body.append(error_div)
body.append(error_div)
str_body = body.encode_contents().decode('utf-8')
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
@@ -721,3 +737,462 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'links': links,
'metadata': meta
}
class LXMLWebScrapingStrategy(WebScrapingStrategy):
def __init__(self, logger=None):
super().__init__(logger)
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
def _process_element(self, url: str, element: lhtml.HtmlElement, media: Dict[str, List],
internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool:
base_domain = kwargs.get("base_domain", get_base_domain(url))
exclude_domains = set(kwargs.get('exclude_domains', []))
# Process links
for link in element.xpath('.//a[@href]'):
href = link.get('href', '').strip()
if not href:
continue
try:
normalized_href = normalize_url(href, url)
link_data = {
'href': normalized_href,
'text': link.text_content().strip(),
'title': link.get('title', '').strip(),
'base_domain': base_domain
}
is_external = is_external_url(normalized_href, base_domain)
if is_external:
link_base_domain = get_base_domain(normalized_href)
link_data['base_domain'] = link_base_domain
if kwargs.get('exclude_external_links', False) or link_base_domain in exclude_domains:
link.getparent().remove(link)
continue
if normalized_href not in external_links_dict:
external_links_dict[normalized_href] = link_data
else:
if normalized_href not in internal_links_dict:
internal_links_dict[normalized_href] = link_data
except Exception as e:
self._log('error', f"Error processing link: {str(e)}", "SCRAPE")
continue
# Process images
images = element.xpath('.//img')
total_images = len(images)
for idx, img in enumerate(images):
src = img.get('src') or ''
img_domain = get_base_domain(src)
# Decide if we need to exclude this image
# 1) If its domain is in exclude_domains, remove.
# 2) Or if exclude_external_images=True and it's an external domain, remove.
if (img_domain in exclude_domains) or (
kwargs.get('exclude_external_images', False) and is_external_url(src, base_domain)
):
parent = img.getparent()
if parent is not None:
parent.remove(img)
continue
# Otherwise, process the image as usual.
try:
processed_images = self.process_image(img, url, idx, total_images, **kwargs)
if processed_images:
media['images'].extend(processed_images)
except Exception as e:
self._log('error', f"Error processing image: {str(e)}", "SCRAPE")
# Process videos and audios
for media_type in ['video', 'audio']:
for elem in element.xpath(f'.//{media_type}'):
media_info = {
'src': elem.get('src'),
'alt': elem.get('alt'),
'type': media_type,
'description': self.find_closest_parent_with_useful_text(elem, **kwargs)
}
media[f"{media_type}s"].append(media_info)
# Process source tags within media elements
for source in elem.xpath('.//source'):
if src := source.get('src'):
media[f"{media_type}s"].append({**media_info, 'src': src})
# Clean up unwanted elements
if kwargs.get('remove_forms', False):
for form in element.xpath('.//form'):
form.getparent().remove(form)
if excluded_tags := kwargs.get('excluded_tags', []):
for tag in excluded_tags:
for elem in element.xpath(f'.//{tag}'):
elem.getparent().remove(elem)
if excluded_selector := kwargs.get('excluded_selector', ''):
try:
for elem in element.cssselect(excluded_selector):
elem.getparent().remove(elem)
except Exception:
pass # Invalid selector
return True
def find_closest_parent_with_useful_text(self, element: lhtml.HtmlElement, **kwargs) -> Optional[str]:
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold',
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
current = element
while current is not None:
if current.text and len(current.text_content().split()) >= image_description_min_word_threshold:
return current.text_content().strip()
current = current.getparent()
return None
def flatten_nested_elements(self, element: lhtml.HtmlElement) -> lhtml.HtmlElement:
"""Flatten nested elements of the same type in LXML tree"""
if len(element) == 1 and element.tag == element[0].tag:
return self.flatten_nested_elements(element[0])
for child in element:
child_idx = element.index(child)
flattened_child = self.flatten_nested_elements(child)
if flattened_child is not child: # Only replace if actually flattened
element[child_idx] = flattened_child
return element
def process_image(self, img: lhtml.HtmlElement, url: str, index: int, total_images: int, **kwargs) -> Optional[List[Dict]]:
# Quick validation checks
style = img.get('style', '')
alt = img.get('alt', '')
src = img.get('src', '')
data_src = img.get('data-src', '')
srcset = img.get('srcset', '')
data_srcset = img.get('data-srcset', '')
if 'display:none' in style:
return None
parent = img.getparent()
if parent.tag in ['button', 'input']:
return None
parent_classes = parent.get('class', '').split()
if any('button' in cls or 'icon' in cls or 'logo' in cls for cls in parent_classes):
return None
# If src is in class or alt, likely an icon
if (src and any(c in src for c in ['button', 'icon', 'logo'])) or \
(alt and any(c in alt for c in ['button', 'icon', 'logo'])):
return None
# Score calculation
score = 0
if (width := img.get('width')) and width.isdigit():
score += 1 if int(width) > 150 else 0
if (height := img.get('height')) and height.isdigit():
score += 1 if int(height) > 150 else 0
if alt:
score += 1
score += index/total_images < 0.5
# Check formats in all possible sources
image_formats = {'jpg', 'jpeg', 'png', 'webp', 'avif', 'gif'}
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
score += 1
break
if srcset or data_srcset:
score += 1
if picture := img.xpath('./ancestor::picture[1]'):
score += 1
if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD):
return None
# Process image variants
unique_urls = set()
image_variants = []
base_info = {
'alt': alt,
'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
'score': score,
'type': 'image',
'group_id': index,
'format': detected_format,
}
def add_variant(src: str, width: Optional[str] = None):
if src and not src.startswith('data:') and src not in unique_urls:
unique_urls.add(src)
variant = {**base_info, 'src': src}
if width:
variant['width'] = width
image_variants.append(variant)
# Add variants from different sources
add_variant(src)
add_variant(data_src)
for srcset_attr in [srcset, data_srcset]:
if srcset_attr:
for source in parse_srcset(srcset_attr):
add_variant(source['url'], source['width'])
# Handle picture element
if picture:
for source in picture[0].xpath('.//source[@srcset]'):
if source_srcset := source.get('srcset'):
for src_data in parse_srcset(source_srcset):
add_variant(src_data['url'], src_data['width'])
# Check framework-specific attributes
for attr, value in img.attrib.items():
if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value:
add_variant(value)
return image_variants if image_variants else None
def remove_empty_elements_fast(self, root, word_count_threshold=5):
"""
Remove elements that fall below the desired word threshold in a single pass from the bottom up.
Skips non-element nodes like HtmlComment and bypasses certain tags that are allowed to have no content.
"""
bypass_tags = {'a', 'img', 'br', 'hr', 'input', 'meta', 'link', 'source', 'track', 'wbr'}
for el in reversed(list(root.iterdescendants())):
if not isinstance(el, lhtml.HtmlElement):
continue
if el.tag in bypass_tags:
continue
text_content = (el.text_content() or "").strip()
if len(text_content.split()) < word_count_threshold and not el.getchildren():
parent = el.getparent()
if parent is not None:
parent.remove(el)
return root
def remove_unwanted_attributes_fast(
self,
root: lhtml.HtmlElement,
important_attrs=None,
keep_data_attributes=False
) -> lhtml.HtmlElement:
"""
Removes all attributes from each element (including root) except those in `important_attrs`.
If `keep_data_attributes=True`, also retain any attribute starting with 'data-'.
Returns the same root element, mutated in-place, for fluent usage.
"""
if important_attrs is None:
important_attrs = set(IMPORTANT_ATTRS)
# If you want to handle the root as well, use 'include_self=True'
# so you don't miss attributes on the top-level element.
# Manually include the root, then all its descendants
for el in chain((root,), root.iterdescendants()):
# We only remove attributes on HtmlElement nodes, skip comments or text nodes
if not isinstance(el, lhtml.HtmlElement):
continue
old_attribs = dict(el.attrib)
new_attribs = {}
for attr_name, attr_val in old_attribs.items():
# If it's an important attribute, keep it
if attr_name in important_attrs:
new_attribs[attr_name] = attr_val
# Or if keep_data_attributes is True and it's a 'data-*' attribute
elif keep_data_attributes and attr_name.startswith('data-'):
new_attribs[attr_name] = attr_val
# Clear old attributes and set the filtered set
el.attrib.clear()
el.attrib.update(new_attribs)
return root
def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD,
css_selector: str = None, **kwargs) -> Dict[str, Any]:
if not html:
return None
success = True
try:
doc = lhtml.document_fromstring(html)
# Match BeautifulSoup's behavior of using body or full doc
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
body = doc
base_domain = get_base_domain(url)
# Add comment removal
if kwargs.get('remove_comments', False):
comments = body.xpath('//comment()')
for comment in comments:
comment.getparent().remove(comment)
# Handle tag-based removal first
excluded_tags = set(kwargs.get('excluded_tags', []) or [])
if excluded_tags:
for tag in excluded_tags:
for element in body.xpath(f'.//{tag}'):
if element.getparent() is not None:
element.getparent().remove(element)
# Handle CSS selector-based exclusion
excluded_selector = kwargs.get('excluded_selector', '')
if excluded_selector:
try:
for element in body.cssselect(excluded_selector):
if element.getparent() is not None:
element.getparent().remove(element)
except Exception as e:
self._log('error', f"Error with excluded CSS selector: {str(e)}", "SCRAPE")
# Extract metadata before any content filtering
try:
meta = extract_metadata_using_lxml("", doc) # Using same function as BeautifulSoup version
except Exception as e:
self._log('error', f"Error extracting metadata: {str(e)}", "SCRAPE")
meta = {}
# Handle CSS selector targeting
if css_selector:
try:
selected_elements = body.cssselect(css_selector)
if not selected_elements:
return {
'markdown': '',
'cleaned_html': '',
'success': True,
'media': {'images': [], 'videos': [], 'audios': []},
'links': {'internal': [], 'external': []},
'metadata': meta,
'message': f"No elements found for CSS selector: {css_selector}"
}
body = lhtml.Element('div')
body.extend(selected_elements)
except Exception as e:
self._log('error', f"Error with CSS selector: {str(e)}", "SCRAPE")
return None
# Remove script and style tags
for tag in ['script', 'style', 'link', 'meta', 'noscript']:
for element in body.xpath(f'.//{tag}'):
if element.getparent() is not None:
element.getparent().remove(element)
# Handle social media and domain exclusions
kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', []))
if kwargs.get('exclude_social_media_links', False):
kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS)
kwargs['exclude_domains'].update(kwargs['exclude_social_media_domains'])
# Process forms if needed
if kwargs.get('remove_forms', False):
for form in body.xpath('.//form'):
if form.getparent() is not None:
form.getparent().remove(form)
# Process content
media = {'images': [], 'videos': [], 'audios': []}
internal_links_dict = {}
external_links_dict = {}
self._process_element(
url,
body,
media,
internal_links_dict,
external_links_dict,
base_domain=base_domain,
**kwargs
)
# Handle only_text option
if kwargs.get('only_text', False):
for tag in ONLY_TEXT_ELIGIBLE_TAGS:
for element in body.xpath(f'.//{tag}'):
if element.text:
new_text = lhtml.Element('span')
new_text.text = element.text_content()
if element.getparent() is not None:
element.getparent().replace(element, new_text)
# Clean base64 images
for img in body.xpath('.//img[@src]'):
src = img.get('src', '')
if self.BASE64_PATTERN.match(src):
img.set('src', self.BASE64_PATTERN.sub('', src))
# Remove empty elements
self.remove_empty_elements_fast(body, 1)
# Remvoe unneeded attributes
self.remove_unwanted_attributes_fast(body, keep_data_attributes=kwargs.get('keep_data_attributes', False))
# Generate output HTML
cleaned_html = lhtml.tostring(body, encoding='unicode',
pretty_print=True,
method='html',
with_tail=False).strip()
return {
'cleaned_html': cleaned_html,
'success': success,
'media': media,
'links': {
'internal': list(internal_links_dict.values()),
'external': list(external_links_dict.values())
},
'metadata': meta
}
except Exception as e:
self._log('error', f"Error processing HTML: {str(e)}", "SCRAPE")
# Create error message in case of failure
error_body = lhtml.Element('div')
# Use etree.SubElement rather than lhtml.SubElement
error_div = etree.SubElement(error_body, 'div', id='crawl4ai_error_message')
error_div.text = f'''
Crawl4AI Error: This page is not fully supported.
Error Message: {str(e)}
Possible reasons:
1. The page may have restrictions that prevent crawling.
2. The page might not be fully loaded.
Suggestions:
- Try calling the crawl function with these parameters:
magic=True,
- Set headless=False to visualize what's happening on the page.
If the issue persists, please check the page's structure and any potential anti-crawling measures.
'''
cleaned_html = lhtml.tostring(error_body, encoding='unicode', pretty_print=True)
return {
'cleaned_html': cleaned_html,
'success': False,
'media': {'images': [], 'videos': [], 'audios': []},
'links': {'internal': [], 'external': []},
'metadata': {}
}

View File

@@ -1,490 +0,0 @@
from typing import Dict, Optional, Any, List, Tuple
from .models import CrawlResult
from .async_webcrawler import AsyncWebCrawler
from .async_configs import BrowserConfig, CrawlerRunConfig
from .markdown_generation_strategy import DefaultMarkdownGenerator
from .content_filter_strategy import PruningContentFilter
from rich.live import Live
from rich.table import Table
from rich.console import Console
from rich.style import Style
from rich import box
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import time
import psutil
import asyncio
import uuid
from urllib.parse import urlparse
import random
@dataclass
class DomainState:
last_request_time: float = 0
current_delay: float = 0
fail_count: int = 0
@dataclass
class CrawlerTaskResult:
task_id: str
url: str
result: CrawlResult
memory_usage: float
peak_memory: float
start_time: datetime
end_time: datetime
error_message: str = ""
class CrawlStatus(Enum):
QUEUED = "QUEUED"
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
@dataclass
class CrawlStats:
task_id: str
url: str
status: CrawlStatus
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
memory_usage: float = 0.0
peak_memory: float = 0.0
error_message: str = ""
@property
def duration(self) -> str:
if not self.start_time:
return "0:00"
end = self.end_time or datetime.now()
duration = end - self.start_time
return str(timedelta(seconds=int(duration.total_seconds())))
class DisplayMode(Enum):
DETAILED = "DETAILED"
AGGREGATED = "AGGREGATED"
class RateLimiter:
def __init__(
self,
base_delay: Tuple[float, float] = (1.0, 3.0),
max_delay: float = 60.0,
max_retries: int = 3,
rate_limit_codes: List[int] = [429, 503]
):
self.base_delay = base_delay
self.max_delay = max_delay
self.max_retries = max_retries
self.rate_limit_codes = rate_limit_codes
self.domains: Dict[str, DomainState] = {}
def get_domain(self, url: str) -> str:
return urlparse(url).netloc
async def wait_if_needed(self, url: str) -> None:
domain = self.get_domain(url)
state = self.domains.get(domain)
if not state:
self.domains[domain] = DomainState()
state = self.domains[domain]
now = time.time()
if state.last_request_time:
wait_time = max(0, state.current_delay - (now - state.last_request_time))
if wait_time > 0:
await asyncio.sleep(wait_time)
# Random delay within base range if no current delay
if state.current_delay == 0:
state.current_delay = random.uniform(*self.base_delay)
state.last_request_time = time.time()
def update_delay(self, url: str, status_code: int) -> bool:
domain = self.get_domain(url)
state = self.domains[domain]
if status_code in self.rate_limit_codes:
state.fail_count += 1
if state.fail_count > self.max_retries:
return False
# Exponential backoff with random jitter
state.current_delay = min(
state.current_delay * 2 * random.uniform(0.75, 1.25),
self.max_delay
)
else:
# Gradually reduce delay on success
state.current_delay = max(
random.uniform(*self.base_delay),
state.current_delay * 0.75
)
state.fail_count = 0
return True
class CrawlerMonitor:
def __init__(self, max_visible_rows: int = 15, display_mode: DisplayMode = DisplayMode.DETAILED):
self.console = Console()
self.max_visible_rows = max_visible_rows
self.display_mode = display_mode
self.stats: Dict[str, CrawlStats] = {}
self.process = psutil.Process()
self.start_time = datetime.now()
self.live = Live(self._create_table(), refresh_per_second=2)
def start(self):
self.live.start()
def stop(self):
self.live.stop()
def add_task(self, task_id: str, url: str):
self.stats[task_id] = CrawlStats(task_id=task_id, url=url, status=CrawlStatus.QUEUED)
self.live.update(self._create_table())
def update_task(self, task_id: str, **kwargs):
if task_id in self.stats:
for key, value in kwargs.items():
setattr(self.stats[task_id], key, value)
self.live.update(self._create_table())
def _create_aggregated_table(self) -> Table:
"""Creates a compact table showing only aggregated statistics"""
table = Table(
box=box.ROUNDED,
title="Crawler Status Overview",
title_style="bold magenta",
header_style="bold blue",
show_lines=True
)
# Calculate statistics
total_tasks = len(self.stats)
queued = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED)
in_progress = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS)
completed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED)
failed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED)
# Memory statistics
current_memory = self.process.memory_info().rss / (1024 * 1024)
total_task_memory = sum(stat.memory_usage for stat in self.stats.values())
peak_memory = max((stat.peak_memory for stat in self.stats.values()), default=0.0)
# Duration
duration = datetime.now() - self.start_time
# Create status row
table.add_column("Status", style="bold cyan")
table.add_column("Count", justify="right")
table.add_column("Percentage", justify="right")
table.add_row(
"Total Tasks",
str(total_tasks),
"100%"
)
table.add_row(
"[yellow]In Queue[/yellow]",
str(queued),
f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
)
table.add_row(
"[blue]In Progress[/blue]",
str(in_progress),
f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
)
table.add_row(
"[green]Completed[/green]",
str(completed),
f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
)
table.add_row(
"[red]Failed[/red]",
str(failed),
f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%"
)
# Add memory information
table.add_section()
table.add_row(
"[magenta]Current Memory[/magenta]",
f"{current_memory:.1f} MB",
""
)
table.add_row(
"[magenta]Total Task Memory[/magenta]",
f"{total_task_memory:.1f} MB",
""
)
table.add_row(
"[magenta]Peak Task Memory[/magenta]",
f"{peak_memory:.1f} MB",
""
)
table.add_row(
"[yellow]Runtime[/yellow]",
str(timedelta(seconds=int(duration.total_seconds()))),
""
)
return table
def _create_detailed_table(self) -> Table:
table = Table(
box=box.ROUNDED,
title="Crawler Performance Monitor",
title_style="bold magenta",
header_style="bold blue"
)
# Add columns
table.add_column("Task ID", style="cyan", no_wrap=True)
table.add_column("URL", style="cyan", no_wrap=True)
table.add_column("Status", style="bold")
table.add_column("Memory (MB)", justify="right")
table.add_column("Peak (MB)", justify="right")
table.add_column("Duration", justify="right")
table.add_column("Info", style="italic")
# Add summary row
total_memory = sum(stat.memory_usage for stat in self.stats.values())
active_count = sum(1 for stat in self.stats.values()
if stat.status == CrawlStatus.IN_PROGRESS)
completed_count = sum(1 for stat in self.stats.values()
if stat.status == CrawlStatus.COMPLETED)
failed_count = sum(1 for stat in self.stats.values()
if stat.status == CrawlStatus.FAILED)
table.add_row(
"[bold yellow]SUMMARY",
f"Total: {len(self.stats)}",
f"Active: {active_count}",
f"{total_memory:.1f}",
f"{self.process.memory_info().rss / (1024 * 1024):.1f}",
str(timedelta(seconds=int((datetime.now() - self.start_time).total_seconds()))),
f"{completed_count}{failed_count}",
style="bold"
)
table.add_section()
# Add rows for each task
visible_stats = sorted(
self.stats.values(),
key=lambda x: (
x.status != CrawlStatus.IN_PROGRESS,
x.status != CrawlStatus.QUEUED,
x.end_time or datetime.max
)
)[:self.max_visible_rows]
for stat in visible_stats:
status_style = {
CrawlStatus.QUEUED: "white",
CrawlStatus.IN_PROGRESS: "yellow",
CrawlStatus.COMPLETED: "green",
CrawlStatus.FAILED: "red"
}[stat.status]
table.add_row(
stat.task_id[:8], # Show first 8 chars of task ID
stat.url[:40] + "..." if len(stat.url) > 40 else stat.url,
f"[{status_style}]{stat.status.value}[/{status_style}]",
f"{stat.memory_usage:.1f}",
f"{stat.peak_memory:.1f}",
stat.duration,
stat.error_message[:40] if stat.error_message else ""
)
return table
def _create_table(self) -> Table:
"""Creates the appropriate table based on display mode"""
if self.display_mode == DisplayMode.AGGREGATED:
return self._create_aggregated_table()
return self._create_detailed_table()
class MemoryAdaptiveDispatcher:
def __init__(
self,
crawler: AsyncWebCrawler,
memory_threshold_percent: float = 70.0,
check_interval: float = 1.0,
max_session_permit: int = 20,
enable_rate_limiting: bool = False,
rate_limit_config: Optional[Dict[str, Any]] = None
):
self.crawler = crawler
self.memory_threshold_percent = memory_threshold_percent
self.check_interval = check_interval
self.max_session_permit = max_session_permit
self.concurrent_sessions = 0
self.enable_rate_limiting = enable_rate_limiting
self.rate_limiter = RateLimiter(**(rate_limit_config or {})) if enable_rate_limiting else None
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
task_id: str,
monitor: Optional[CrawlerMonitor] = None
) -> CrawlerTaskResult:
start_time = datetime.now()
error_message = ""
memory_usage = peak_memory = 0.0
try:
if monitor:
monitor.update_task(task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time)
self.concurrent_sessions += 1
if self.enable_rate_limiting:
await self.rate_limiter.wait_if_needed(url)
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
result = await self.crawler.arun(url, config=config, session_id=task_id)
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
if self.enable_rate_limiting and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if monitor:
monitor.update_task(task_id, status=CrawlStatus.FAILED)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=datetime.now(),
error_message=error_message
)
if not result.success:
error_message = result.error_message
if monitor:
monitor.update_task(task_id, status=CrawlStatus.FAILED)
elif monitor:
monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
except Exception as e:
error_message = str(e)
if monitor:
monitor.update_task(task_id, status=CrawlStatus.FAILED)
result = CrawlResult(url = url, html = "", metadata = {}, success=False, error_message=str(e))
finally:
end_time = datetime.now()
if monitor:
monitor.update_task(
task_id,
end_time=end_time,
memory_usage=memory_usage,
peak_memory=peak_memory,
error_message=error_message
)
self.concurrent_sessions -= 1
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=end_time,
error_message=error_message
)
async def run_urls(
self,
urls: List[str],
config: CrawlerRunConfig,
monitor: Optional[CrawlerMonitor] = None
) -> List[CrawlerTaskResult]:
if monitor:
monitor.start()
try:
pending_tasks = []
active_tasks = []
task_queue = []
# Queue all tasks
for url in urls:
task_id = str(uuid.uuid4())
if monitor:
monitor.add_task(task_id, url)
task_queue.append((url, task_id))
while task_queue or active_tasks:
# Fill up to max_session_permit
while len(active_tasks) < self.max_session_permit and task_queue:
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
break
url, task_id = task_queue.pop(0)
task = asyncio.create_task(self.crawl_url(url, config, task_id, monitor))
active_tasks.append(task)
if not active_tasks:
await asyncio.sleep(self.check_interval)
continue
done, pending = await asyncio.wait(
active_tasks,
return_when=asyncio.FIRST_COMPLETED
)
pending_tasks.extend(done)
active_tasks = list(pending)
return await asyncio.gather(*pending_tasks)
finally:
if monitor:
monitor.stop()
async def main():
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
),
cache_mode=CacheMode.BYPASS
)
urls = ["https://example.com/page1"] * 10
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(
crawler=crawler,
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10
)
dispatcher = MemoryAdaptiveDispatcher(
crawler=crawler,
enable_rate_limiting=True,
rate_limit_config={
'base_delay': (1.0, 3.0), # Random range
'max_delay': 60.0,
'max_retries': 3,
'rate_limit_codes': [429, 503]
}
)
# Optional monitor
monitor = CrawlerMonitor(max_visible_rows=15, display_mode=DisplayMode.DETAILED)
results = await dispatcher.run_urls(urls, run_config, monitor=monitor)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -868,6 +868,63 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
'metadata': meta
}
def extract_metadata_using_lxml(html, doc=None):
"""
Extract metadata from HTML using lxml for better performance.
"""
metadata = {}
if not html and doc is None:
return {}
if doc is None:
try:
doc = lhtml.document_fromstring(html)
except Exception:
return {}
# Use XPath to find head element
head = doc.xpath('//head')
if not head:
return metadata
head = head[0]
# Title - using XPath
title = head.xpath('.//title/text()')
metadata['title'] = title[0].strip() if title else None
# Meta description - using XPath with multiple attribute conditions
description = head.xpath('.//meta[@name="description"]/@content')
metadata['description'] = description[0].strip() if description else None
# Meta keywords
keywords = head.xpath('.//meta[@name="keywords"]/@content')
metadata['keywords'] = keywords[0].strip() if keywords else None
# Meta author
author = head.xpath('.//meta[@name="author"]/@content')
metadata['author'] = author[0].strip() if author else None
# Open Graph metadata - using starts-with() for performance
og_tags = head.xpath('.//meta[starts-with(@property, "og:")]')
for tag in og_tags:
property_name = tag.get('property', '').strip()
content = tag.get('content', '').strip()
if property_name and content:
metadata[property_name] = content
# Twitter Card metadata
twitter_tags = head.xpath('.//meta[starts-with(@name, "twitter:")]')
for tag in twitter_tags:
property_name = tag.get('name', '').strip()
content = tag.get('content', '').strip()
if property_name and content:
metadata[property_name] = content
return metadata
def extract_metadata(html, soup=None):
"""
Extract optimized content, media, and links from website HTML.

View File

@@ -318,7 +318,45 @@ if __name__ == "__main__":
---
## 6. Conclusion
## 6. Scraping Modes
Crawl4AI provides two different scraping modes for HTML content processing: BeautifulSoup (default) and LXML. The LXML mode offers significantly better performance, especially for large HTML documents.
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, ScrapingMode
async def main():
config = CrawlerRunConfig(
scraping_mode=ScrapingMode.LXML # Faster alternative to default BeautifulSoup
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
config=config
)
```
### Performance Considerations
The LXML mode can be up to 10-20x faster than BeautifulSoup mode, particularly when processing large HTML documents. However, please note:
1. LXML mode is currently experimental
2. In some edge cases, the parsing results might differ slightly from BeautifulSoup
3. If you encounter any inconsistencies between LXML and BeautifulSoup results, please [raise an issue](https://github.com/codeium/crawl4ai/issues) with a reproducible example
Choose LXML mode when:
- Processing large HTML documents (recommended for >100KB)
- Performance is critical
- Working with well-formed HTML
Stick to BeautifulSoup mode (default) when:
- Maximum compatibility is needed
- Working with malformed HTML
- Exact parsing behavior is critical
---
## 7. Conclusion
By mixing **css_selector** scoping, **content filtering** parameters, and advanced **extraction strategies**, you can precisely **choose** which data to keep. Key parameters in **`CrawlerRunConfig`** for content selection include:

View File

@@ -35,6 +35,7 @@ dependencies = [
"playwright",
"aiofiles",
"rich>=13.9.4",
"cssselect>=1.2.0",
]
classifiers = [
"Development Status :: 3 - Alpha",

View File

@@ -20,3 +20,4 @@ pyOpenSSL>=24.3.0
psutil>=6.1.1
nltk>=3.9.1
rich>=13.9.4
cssselect>=1.2.0

View File

@@ -0,0 +1,16 @@
{
"tests": [
{
"case": "complicated_exclude_all_links",
"lxml_mode": {
"differences": {},
"execution_time": 0.0019578933715820312
},
"original_time": 0.0059909820556640625
}
],
"summary": {
"passed": 1,
"failed": 0
}
}

52
scraper_evaluation.json Normal file
View File

@@ -0,0 +1,52 @@
{
"original": {
"performance": [],
"differences": []
},
"batch": {
"performance": [
{
"case": "basic",
"metrics": {
"time": 0.8874530792236328,
"memory": 98.328125
}
}
],
"differences": [
{
"case": "basic",
"differences": {
"images_count": {
"old": 50,
"new": 0,
"diff": -50
}
}
}
]
},
"lxml": {
"performance": [
{
"case": "basic",
"metrics": {
"time": 1.210719108581543,
"memory": 99.921875
}
}
],
"differences": [
{
"case": "basic",
"differences": {
"images_count": {
"old": 50,
"new": 0,
"diff": -50
}
}
}
]
}
}

View File

@@ -0,0 +1,690 @@
import json
import time
from bs4 import BeautifulSoup
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
from typing import Dict, Any, List, Tuple
import difflib
from lxml import html as lhtml, etree
def normalize_dom(element):
"""
Recursively normalizes an lxml HTML element:
- Removes comment nodes
- Sorts attributes on each node
- Removes <head> if you want (optional)
Returns the same element (mutated).
"""
# Remove comment nodes
comments = element.xpath('//comment()')
for c in comments:
p = c.getparent()
if p is not None:
p.remove(c)
# If you'd like to remove <head>, or unify <html>/<body>, you could do so here.
# For example, remove <head> entirely:
# heads = element.xpath('//head')
# for h in heads:
# parent = h.getparent()
# if parent is not None:
# parent.remove(h)
# Sort attributes (to avoid false positives due to attr order)
for el in element.iter():
if el.attrib:
# Convert to a sorted list of (k, v), then reassign
sorted_attribs = sorted(el.attrib.items())
el.attrib.clear()
for k, v in sorted_attribs:
el.set(k, v)
return element
def strip_html_body(root):
"""
If 'root' is <html>, find its <body> child and move all of <body>'s children
into a new <div>. Return that <div>.
If 'root' is <body>, similarly move all of its children into a new <div> and return it.
Otherwise, return 'root' as-is.
"""
tag_name = (root.tag or "").lower()
# Case 1: The root is <html>
if tag_name == 'html':
bodies = root.xpath('./body')
if bodies:
body = bodies[0]
new_div = lhtml.Element("div")
for child in body:
new_div.append(child)
return new_div
else:
# No <body> found; just return the <html> root
return root
# Case 2: The root is <body>
elif tag_name == 'body':
new_div = lhtml.Element("div")
for child in root:
new_div.append(child)
return new_div
# Case 3: Neither <html> nor <body>
else:
return root
def compare_nodes(node1, node2, differences, path="/"):
"""
Recursively compare two lxml nodes, appending textual differences to `differences`.
`path` is used to indicate the location in the tree (like an XPath).
"""
# 1) Compare tag names
if node1.tag != node2.tag:
differences.append(f"Tag mismatch at {path}: '{node1.tag}' vs. '{node2.tag}'")
return
# 2) Compare attributes
# By now, they are sorted in normalize_dom()
attrs1 = list(node1.attrib.items())
attrs2 = list(node2.attrib.items())
if attrs1 != attrs2:
differences.append(f"Attribute mismatch at {path}/{node1.tag}: {attrs1} vs. {attrs2}")
# 3) Compare text (trim or unify whitespace as needed)
text1 = (node1.text or "").strip()
text2 = (node2.text or "").strip()
# Normalize whitespace
text1 = " ".join(text1.split())
text2 = " ".join(text2.split())
if text1 != text2:
# If you prefer ignoring newlines or multiple whitespace, do a more robust cleanup
differences.append(f"Text mismatch at {path}/{node1.tag}: '{text1}' vs. '{text2}'")
# 4) Compare number of children
children1 = list(node1)
children2 = list(node2)
if len(children1) != len(children2):
differences.append(
f"Child count mismatch at {path}/{node1.tag}: {len(children1)} vs. {len(children2)}"
)
return # If counts differ, no point comparing child by child
# 5) Recursively compare each child
for i, (c1, c2) in enumerate(zip(children1, children2)):
# Build a path for child
child_path = f"{path}/{node1.tag}[{i}]"
compare_nodes(c1, c2, differences, child_path)
# 6) Compare tail text
tail1 = (node1.tail or "").strip()
tail2 = (node2.tail or "").strip()
if tail1 != tail2:
differences.append(f"Tail mismatch after {path}/{node1.tag}: '{tail1}' vs. '{tail2}'")
def compare_html_structurally(html1, html2):
"""
Compare two HTML strings using a structural approach with lxml.
Returns a list of differences (if any). If empty, they're effectively the same.
"""
# 1) Parse both
try:
tree1 = lhtml.fromstring(html1)
except etree.ParserError:
return ["Error parsing HTML1"]
try:
tree2 = lhtml.fromstring(html2)
except etree.ParserError:
return ["Error parsing HTML2"]
# 2) Normalize both DOMs (remove comments, sort attributes, etc.)
tree1 = normalize_dom(tree1)
tree2 = normalize_dom(tree2)
# 3) Possibly strip <html>/<body> wrappers for better apples-to-apples comparison
tree1 = strip_html_body(tree1)
tree2 = strip_html_body(tree2)
# 4) Compare recursively
differences = []
compare_nodes(tree1, tree2, differences, path="")
return differences
def generate_large_html(n_elements=1000):
html = ['<!DOCTYPE html><html><head></head><body>']
for i in range(n_elements):
html.append(f'''
<div class="article">
<h2>Heading {i}</h2>
<p>This is paragraph {i} with some content and a <a href="http://example.com/{i}">link</a></p>
<img src="image{i}.jpg" alt="Image {i}">
<ul>
<li>List item {i}.1</li>
<li>List item {i}.2</li>
</ul>
</div>
''')
html.append('</body></html>')
return ''.join(html)
def generate_complicated_html():
"""
HTML with multiple domains, forms, data attributes,
various images, comments, style, and noscript to test all parameter toggles.
"""
return """
<!DOCTYPE html>
<html>
<head>
<title>Complicated Test Page</title>
<meta name="description" content="A very complicated page for testing.">
<style>
.hidden { display: none; }
.highlight { color: red; }
</style>
</head>
<body>
<!-- This is a comment that we may remove if remove_comments=True -->
<header>
<h1>Main Title of the Page</h1>
<nav>
<a href="http://example.com/home">Home</a>
<a href="http://social.com/profile">Social Profile</a>
<a href="javascript:void(0)">JS Void Link</a>
</nav>
</header>
<noscript>
<p>JavaScript is disabled or not supported.</p>
</noscript>
<form action="submit.php" method="post">
<input type="text" name="username" />
<button type="submit">Submit</button>
</form>
<section>
<article>
<h2>Article Title</h2>
<p>
This paragraph has a good amount of text to exceed word_count_threshold if it's
set to something small. But it might not exceed a very high threshold.
</p>
<img src="http://images.example.com/photo.jpg" alt="Descriptive alt text"
style="width:200px;height:150px;" data-lazy="true">
<img src="icon.png" alt="Icon" style="display:none;">
<p>Another short text. <a href="/local-link">Local Link</a></p>
</article>
</section>
<section id="promo-section">
<p>Promo text <a href="http://ads.example.com/ad">Ad Link</a></p>
</section>
<aside class="sidebar">
<img src="data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAA..." alt="Base64 Image">
<div data-info="secret" class="social-widget">
<p>Follow us on <a href="http://facebook.com/brand">Facebook</a></p>
</div>
</aside>
<!-- Another comment below this line -->
<script>console.log("script that might be removed");</script>
<div style="display:none;">
<p>This is hidden</p>
</div>
<footer>
<small>Footer Info &copy; 2025</small>
</footer>
</body>
</html>
"""
def get_test_scenarios():
"""
Returns a dictionary of parameter sets (test scenarios) for the scraper.
Each scenario name maps to a dictionary of keyword arguments
that will be passed into scrap() for testing various features.
"""
TEST_SCENARIOS = {
# "default": {},
# "exclude_domains": {
# "exclude_domains": {"images.example.com", "ads.example.com"}
# },
# "exclude_social_media_links": {
# "exclude_social_media_links": True
# },
# "high_word_threshold": {
# "word_count_threshold": 100
# },
# "keep_data_attrs": {
# "keep_data_attributes": True
# },
# "remove_forms_and_comments": {
# "remove_forms": True,
# "remove_comments": True
# },
# "exclude_tags_and_selector": {
# "excluded_tags": ["aside", "script"],
# "excluded_selector": ".social-widget"
# },
# "only_text_mode": {
# "only_text": True
# },
# "combo_mode": {
# "exclude_domains": {"images.example.com", "ads.example.com"},
# "exclude_social_media_links": True,
# "remove_forms": True,
# "remove_comments": True,
# "excluded_tags": ["aside"],
# "excluded_selector": "#promo-section",
# "only_text": False,
# "keep_data_attributes": True,
# "word_count_threshold": 20
# },
# "exclude_external_images": {
# "exclude_external_images": True,
# "exclude_social_media_links": True
# },
# "strict_image_scoring": {
# "image_score_threshold": 3,
# "image_description_min_word_threshold": 10
# },
# "custom_css_selector": {
# "css_selector": "section#promo-section"
# },
# "remove_noscript": {
# "excluded_tags": ["noscript"]
# },
# "exclude_external_links": {
# "exclude_external_links": True
# },
# "large_word_count": {
# "word_count_threshold": 500
# },
# "super_strict_images": {
# "image_score_threshold": 5,
# "image_description_min_word_threshold": 15
# },
# "exclude_style_and_script": {
# "excluded_tags": ["style", "script"]
# },
# "keep_data_and_remove_forms": {
# "keep_data_attributes": True,
# "remove_forms": True
# },
# "only_text_high_word_count": {
# "only_text": True,
# "word_count_threshold": 40
# },
# "reduce_to_selector": {
# "css_selector": "section > article"
# },
# "exclude_all_links": {
# # Removes all external links and also excludes example.com & social.com
# "exclude_domains": {"example.com", "social.com", "facebook.com"},
# "exclude_external_links": True
# },
# "comprehensive_removal": {
# # Exclude multiple tags, remove forms & comments,
# # and also remove targeted selectors
# "excluded_tags": ["aside", "noscript", "script"],
# "excluded_selector": "#promo-section, .social-widget",
# "remove_comments": True,
# "remove_forms": True
# }
}
return TEST_SCENARIOS
class ScraperEquivalenceTester:
def __init__(self):
self.test_cases = {
'basic': self.generate_basic_html(),
'complex': self.generate_complex_html(),
'malformed': self.generate_malformed_html(),
# 'real_world': self.load_real_samples()
}
def generate_basic_html(self):
return generate_large_html(1000) # Your existing function
def generate_complex_html(self):
return """
<html><body>
<div class="nested-content">
<article>
<h1>Main Title</h1>
<img src="test.jpg" srcset="test-1x.jpg 1x, test-2x.jpg 2x" data-src="lazy.jpg">
<p>Text with <a href="http://test.com">mixed <b>formatting</b></a></p>
<iframe src="embedded.html"></iframe>
</article>
<nav>
<ul>
<li><a href="/page1">Link 1</a></li>
<li><a href="javascript:void(0)">JS Link</a></li>
</ul>
</nav>
</div>
</body></html>
"""
def generate_malformed_html(self):
return """
<div>Unclosed div
<p>Unclosed paragraph
<a href="test.com">Link</a>
<img src=no-quotes>
<script>document.write("<div>Dynamic</div>");</script>
<!-- Malformed comment -- > -->
<![CDATA[Test CDATA]]>
"""
def load_real_samples(self):
# Load some real-world HTML samples you've collected
samples = {
'article': open('tests/samples/article.html').read(),
'product': open('tests/samples/product.html').read(),
'blog': open('tests/samples/blog.html').read()
}
return samples
def deep_compare_links(self, old_links: Dict, new_links: Dict) -> List[str]:
"""Detailed comparison of link structures"""
differences = []
for category in ['internal', 'external']:
old_urls = {link['href'] for link in old_links[category]}
new_urls = {link['href'] for link in new_links[category]}
missing = old_urls - new_urls
extra = new_urls - old_urls
if missing:
differences.append(f"Missing {category} links: {missing}")
if extra:
differences.append(f"Extra {category} links: {extra}")
# Compare link attributes for common URLs
common = old_urls & new_urls
for url in common:
old_link = next(l for l in old_links[category] if l['href'] == url)
new_link = next(l for l in new_links[category] if l['href'] == url)
for attr in ['text', 'title']:
if old_link[attr] != new_link[attr]:
differences.append(
f"Link attribute mismatch for {url} - {attr}:"
f" old='{old_link[attr]}' vs new='{new_link[attr]}'"
)
return differences
def deep_compare_media(self, old_media: Dict, new_media: Dict) -> List[str]:
"""Detailed comparison of media elements"""
differences = []
for media_type in ['images', 'videos', 'audios']:
old_srcs = {item['src'] for item in old_media[media_type]}
new_srcs = {item['src'] for item in new_media[media_type]}
missing = old_srcs - new_srcs
extra = new_srcs - old_srcs
if missing:
differences.append(f"Missing {media_type}: {missing}")
if extra:
differences.append(f"Extra {media_type}: {extra}")
# Compare media attributes for common sources
common = old_srcs & new_srcs
for src in common:
old_item = next(m for m in old_media[media_type] if m['src'] == src)
new_item = next(m for m in new_media[media_type] if m['src'] == src)
for attr in ['alt', 'description']:
if old_item.get(attr) != new_item.get(attr):
differences.append(
f"{media_type} attribute mismatch for {src} - {attr}:"
f" old='{old_item.get(attr)}' vs new='{new_item.get(attr)}'"
)
return differences
def compare_html_content(self, old_html: str, new_html: str) -> List[str]:
"""Compare HTML content structure and text"""
# return compare_html_structurally(old_html, new_html)
differences = []
def normalize_html(html: str) -> Tuple[str, str]:
soup = BeautifulSoup(html, 'lxml')
# Get both structure and text
structure = ' '.join(tag.name for tag in soup.find_all())
text = ' '.join(soup.get_text().split())
return structure, text
old_structure, old_text = normalize_html(old_html)
new_structure, new_text = normalize_html(new_html)
# Compare structure
if abs(len(old_structure) - len(new_structure)) > 100:
# if old_structure != new_structure:
diff = difflib.unified_diff(
old_structure.split(),
new_structure.split(),
lineterm=''
)
differences.append("HTML structure differences:\n" + '\n'.join(diff))
# Compare text content
if abs(len(old_text) - len(new_text)) > 100:
# if old_text != new_text:
# Show detailed text differences
text_diff = difflib.unified_diff(
old_text.split(),
new_text.split(),
lineterm=''
)
differences.append("Text content differences:\n" + '\n'.join(text_diff))
return differences
def compare_results(self, old_result: Dict, new_result: Dict) -> Dict[str, List[str]]:
"""Comprehensive comparison of scraper outputs"""
differences = {}
# Compare links
link_differences = self.deep_compare_links(old_result['links'], new_result['links'])
if link_differences:
differences['links'] = link_differences
# Compare media
media_differences = self.deep_compare_media(old_result['media'], new_result['media'])
if media_differences:
differences['media'] = media_differences
# Compare HTML
html_differences = self.compare_html_content(
old_result['cleaned_html'],
new_result['cleaned_html']
)
if html_differences:
differences['html'] = html_differences
return differences
def run_tests(self) -> Dict:
"""Run comparison tests using the complicated HTML with multiple parameter scenarios."""
# We'll still keep some "test_cases" logic from above (basic, complex, malformed).
# But we add a new section for the complicated HTML scenarios.
results = {
'tests': [],
'summary': {'passed': 0, 'failed': 0}
}
# 1) First, run the existing 3 built-in test cases (basic, complex, malformed).
# for case_name, html in self.test_cases.items():
# print(f"\nTesting built-in case: {case_name}...")
# original = WebScrapingStrategy()
# lxml = LXMLWebScrapingStrategy()
# start = time.time()
# orig_result = original.scrap("http://test.com", html)
# orig_time = time.time() - start
# print("\nOriginal Mode:")
# print(f"Cleaned HTML size: {len(orig_result['cleaned_html'])/1024:.2f} KB")
# print(f"Images: {len(orig_result['media']['images'])}")
# print(f"External links: {len(orig_result['links']['external'])}")
# print(f"Times - Original: {orig_time:.3f}s")
# start = time.time()
# lxml_result = lxml.scrap("http://test.com", html)
# lxml_time = time.time() - start
# print("\nLXML Mode:")
# print(f"Cleaned HTML size: {len(lxml_result['cleaned_html'])/1024:.2f} KB")
# print(f"Images: {len(lxml_result['media']['images'])}")
# print(f"External links: {len(lxml_result['links']['external'])}")
# print(f"Times - LXML: {lxml_time:.3f}s")
# # Compare
# diffs = {}
# link_diff = self.deep_compare_links(orig_result['links'], lxml_result['links'])
# if link_diff:
# diffs['links'] = link_diff
# media_diff = self.deep_compare_media(orig_result['media'], lxml_result['media'])
# if media_diff:
# diffs['media'] = media_diff
# html_diff = self.compare_html_content(orig_result['cleaned_html'], lxml_result['cleaned_html'])
# if html_diff:
# diffs['html'] = html_diff
# test_result = {
# 'case': case_name,
# 'lxml_mode': {
# 'differences': diffs,
# 'execution_time': lxml_time
# },
# 'original_time': orig_time
# }
# results['tests'].append(test_result)
# if not diffs:
# results['summary']['passed'] += 1
# else:
# results['summary']['failed'] += 1
# 2) Now, run the complicated HTML with multiple parameter scenarios.
complicated_html = generate_complicated_html()
print("\n=== Testing complicated HTML with multiple parameter scenarios ===")
# Create the scrapers once (or you can re-create if needed)
original = WebScrapingStrategy()
lxml = LXMLWebScrapingStrategy()
for scenario_name, params in get_test_scenarios().items():
print(f"\nScenario: {scenario_name}")
start = time.time()
orig_result = original.scrap("http://test.com", complicated_html, **params)
orig_time = time.time() - start
start = time.time()
lxml_result = lxml.scrap("http://test.com", complicated_html, **params)
lxml_time = time.time() - start
diffs = {}
link_diff = self.deep_compare_links(orig_result['links'], lxml_result['links'])
if link_diff:
diffs['links'] = link_diff
media_diff = self.deep_compare_media(orig_result['media'], lxml_result['media'])
if media_diff:
diffs['media'] = media_diff
html_diff = self.compare_html_content(orig_result['cleaned_html'], lxml_result['cleaned_html'])
if html_diff:
diffs['html'] = html_diff
test_result = {
'case': f"complicated_{scenario_name}",
'lxml_mode': {
'differences': diffs,
'execution_time': lxml_time
},
'original_time': orig_time
}
results['tests'].append(test_result)
if not diffs:
results['summary']['passed'] += 1
print(f"✅ [OK] No differences found. Time(Orig: {orig_time:.3f}s, LXML: {lxml_time:.3f}s)")
else:
results['summary']['failed'] += 1
print("❌ Differences found:")
for category, dlist in diffs.items():
print(f" {category}:")
for d in dlist:
print(f" - {d}")
return results
def print_report(self, results: Dict):
"""Generate detailed equivalence report"""
print("\n=== Scraper Equivalence Test Report ===\n")
print(f"Total Cases: {len(results['tests'])}")
print(f"Passed: {results['summary']['passed']}")
print(f"Failed: {results['summary']['failed']}")
for test in results['tests']:
print(f"\nTest Case: {test['case']}")
if not test['lxml_mode']['differences']:
print("✅ All implementations produced identical results")
print(f"Times - Original: {test['original_time']:.3f}s, "
f"LXML: {test['lxml_mode']['execution_time']:.3f}s")
else:
print("❌ Differences found:")
if test['lxml_mode']['differences']:
print("\nLXML Mode Differences:")
for category, diffs in test['lxml_mode']['differences'].items():
print(f"\n{category}:")
for diff in diffs:
print(f" - {diff}")
def main():
tester = ScraperEquivalenceTester()
results = tester.run_tests()
tester.print_report(results)
# Save detailed results for debugging
with open('scraper_equivalence_results.json', 'w') as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()