Compare commits
2 Commits
main
...
feature/sc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b3b728dcd | ||
|
|
bfec5156ad |
@@ -6,7 +6,7 @@ from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, L
|
||||
|
||||
from .content_scraping_strategy import (
|
||||
ContentScrapingStrategy,
|
||||
WebScrapingStrategy,
|
||||
# WebScrapingStrategy,
|
||||
LXMLWebScrapingStrategy,
|
||||
)
|
||||
from .async_logger import (
|
||||
@@ -100,7 +100,7 @@ __all__ = [
|
||||
"CrawlerHub",
|
||||
"CacheMode",
|
||||
"ContentScrapingStrategy",
|
||||
"WebScrapingStrategy",
|
||||
# "WebScrapingStrategy",
|
||||
"LXMLWebScrapingStrategy",
|
||||
"BrowserConfig",
|
||||
"CrawlerRunConfig",
|
||||
|
||||
@@ -17,7 +17,7 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy
|
||||
from .chunking_strategy import ChunkingStrategy, RegexChunking
|
||||
|
||||
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
|
||||
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy
|
||||
from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy
|
||||
from .deep_crawling import DeepCrawlStrategy
|
||||
|
||||
from .cache_context import CacheMode
|
||||
@@ -725,7 +725,7 @@ class CrawlerRunConfig():
|
||||
parser_type (str): Type of parser to use for HTML parsing.
|
||||
Default: "lxml".
|
||||
scraping_strategy (ContentScrapingStrategy): Scraping strategy to use.
|
||||
Default: WebScrapingStrategy.
|
||||
Default: LXMLWebScrapingStrategy.
|
||||
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
|
||||
If None, no additional proxy config. Default: None.
|
||||
|
||||
@@ -979,7 +979,7 @@ class CrawlerRunConfig():
|
||||
self.remove_forms = remove_forms
|
||||
self.prettiify = prettiify
|
||||
self.parser_type = parser_type
|
||||
self.scraping_strategy = scraping_strategy or WebScrapingStrategy()
|
||||
self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy()
|
||||
self.proxy_config = proxy_config
|
||||
self.proxy_rotation_strategy = proxy_rotation_strategy
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import re
|
||||
from itertools import chain
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Any, Optional
|
||||
from bs4 import BeautifulSoup
|
||||
# from bs4 import BeautifulSoup
|
||||
import asyncio
|
||||
import requests
|
||||
from .config import (
|
||||
@@ -13,12 +13,12 @@ from .config import (
|
||||
IMPORTANT_ATTRS,
|
||||
SOCIAL_MEDIA_DOMAINS,
|
||||
)
|
||||
from bs4 import NavigableString, Comment
|
||||
from bs4 import PageElement, Tag
|
||||
# from bs4 import NavigableString, Comment
|
||||
# from bs4 import PageElement, Tag
|
||||
from urllib.parse import urljoin
|
||||
from requests.exceptions import InvalidSchema
|
||||
from .utils import (
|
||||
extract_metadata,
|
||||
# extract_metadata,
|
||||
normalize_url,
|
||||
is_external_url,
|
||||
get_base_domain,
|
||||
@@ -96,20 +96,16 @@ class ContentScrapingStrategy(ABC):
|
||||
pass
|
||||
|
||||
|
||||
class WebScrapingStrategy(ContentScrapingStrategy):
|
||||
"""
|
||||
Class for web content scraping. Perhaps the most important class.
|
||||
|
||||
How it works:
|
||||
1. Extract content from HTML using BeautifulSoup.
|
||||
2. Clean the extracted content using a content cleaning strategy.
|
||||
3. Filter the cleaned content using a content filtering strategy.
|
||||
4. Generate markdown content from the filtered content.
|
||||
5. Return the markdown content.
|
||||
"""
|
||||
|
||||
class LXMLWebScrapingStrategy(ContentScrapingStrategy):
|
||||
def __init__(self, logger=None):
|
||||
self.logger = logger
|
||||
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
|
||||
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
|
||||
|
||||
# Constants for image processing
|
||||
self.classes_to_check = frozenset(["button", "icon", "logo"])
|
||||
self.tags_to_check = frozenset(["button", "input"])
|
||||
self.image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
|
||||
|
||||
def _log(self, level, message, tag="SCRAPE", **kwargs):
|
||||
"""Helper method to safely use logger."""
|
||||
@@ -130,7 +126,8 @@ class WebScrapingStrategy(ContentScrapingStrategy):
|
||||
ScrapingResult: A structured result containing the scraped content.
|
||||
"""
|
||||
actual_url = kwargs.get("redirected_url", url)
|
||||
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
|
||||
raw_result = self._scrap(actual_url, html, **kwargs)
|
||||
|
||||
if raw_result is None:
|
||||
return ScrapingResult(
|
||||
cleaned_html="",
|
||||
@@ -194,388 +191,15 @@ class WebScrapingStrategy(ContentScrapingStrategy):
|
||||
Returns:
|
||||
ScrapingResult: A structured result containing the scraped content.
|
||||
"""
|
||||
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
|
||||
return await asyncio.to_thread(self.scrap, url, html, **kwargs)
|
||||
|
||||
def is_data_table(self, table: Tag, **kwargs) -> bool:
|
||||
"""
|
||||
Determine if a table element is a data table (not a layout table).
|
||||
|
||||
Args:
|
||||
table (Tag): BeautifulSoup Tag representing a table element
|
||||
**kwargs: Additional keyword arguments including table_score_threshold
|
||||
|
||||
Returns:
|
||||
bool: True if the table is a data table, False otherwise
|
||||
"""
|
||||
score = 0
|
||||
|
||||
# Check for thead and tbody
|
||||
has_thead = len(table.select('thead')) > 0
|
||||
has_tbody = len(table.select('tbody')) > 0
|
||||
if has_thead:
|
||||
score += 2
|
||||
if has_tbody:
|
||||
score += 1
|
||||
|
||||
# Check for th elements
|
||||
th_count = len(table.select('th'))
|
||||
if th_count > 0:
|
||||
score += 2
|
||||
if has_thead or len(table.select('tr:first-child th')) > 0:
|
||||
score += 1
|
||||
|
||||
# Check for nested tables
|
||||
if len(table.select('table')) > 0:
|
||||
score -= 3
|
||||
|
||||
# Role attribute check
|
||||
role = table.get('role', '').lower()
|
||||
if role in {'presentation', 'none'}:
|
||||
score -= 3
|
||||
|
||||
# Column consistency
|
||||
rows = table.select('tr')
|
||||
if not rows:
|
||||
return False
|
||||
|
||||
col_counts = [len(row.select('td, th')) for row in rows]
|
||||
avg_cols = sum(col_counts) / len(col_counts)
|
||||
variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts)
|
||||
if variance < 1:
|
||||
score += 2
|
||||
|
||||
# Caption and summary
|
||||
if table.select('caption'):
|
||||
score += 2
|
||||
if table.has_attr('summary') and table['summary']:
|
||||
score += 1
|
||||
|
||||
# Text density
|
||||
total_text = sum(len(cell.get_text().strip()) for row in rows for cell in row.select('td, th'))
|
||||
total_tags = sum(1 for _ in table.descendants if isinstance(_, Tag))
|
||||
text_ratio = total_text / (total_tags + 1e-5)
|
||||
if text_ratio > 20:
|
||||
score += 3
|
||||
elif text_ratio > 10:
|
||||
score += 2
|
||||
|
||||
# Data attributes
|
||||
data_attrs = sum(1 for attr in table.attrs if attr.startswith('data-'))
|
||||
score += data_attrs * 0.5
|
||||
|
||||
# Size check
|
||||
if avg_cols >= 2 and len(rows) >= 2:
|
||||
score += 2
|
||||
|
||||
threshold = kwargs.get('table_score_threshold', 7)
|
||||
return score >= threshold
|
||||
|
||||
def extract_table_data(self, table: Tag) -> dict:
|
||||
"""
|
||||
Extract structured data from a table element.
|
||||
|
||||
Args:
|
||||
table (Tag): BeautifulSoup Tag representing a table element
|
||||
|
||||
Returns:
|
||||
dict: Dictionary containing table data (headers, rows, caption, summary)
|
||||
"""
|
||||
caption_elem = table.select_one('caption')
|
||||
caption = caption_elem.get_text().strip() if caption_elem else ""
|
||||
summary = table.get('summary', '').strip()
|
||||
|
||||
# Extract headers with colspan handling
|
||||
headers = []
|
||||
thead_rows = table.select('thead tr')
|
||||
if thead_rows:
|
||||
header_cells = thead_rows[0].select('th')
|
||||
for cell in header_cells:
|
||||
text = cell.get_text().strip()
|
||||
colspan = int(cell.get('colspan', 1))
|
||||
headers.extend([text] * colspan)
|
||||
else:
|
||||
first_row = table.select('tr:first-child')
|
||||
if first_row:
|
||||
for cell in first_row[0].select('th, td'):
|
||||
text = cell.get_text().strip()
|
||||
colspan = int(cell.get('colspan', 1))
|
||||
headers.extend([text] * colspan)
|
||||
|
||||
# Extract rows with colspan handling
|
||||
rows = []
|
||||
all_rows = table.select('tr')
|
||||
thead = table.select_one('thead')
|
||||
tbody_rows = []
|
||||
|
||||
if thead:
|
||||
thead_rows = thead.select('tr')
|
||||
tbody_rows = [row for row in all_rows if row not in thead_rows]
|
||||
else:
|
||||
if all_rows and all_rows[0].select('th'):
|
||||
tbody_rows = all_rows[1:]
|
||||
else:
|
||||
tbody_rows = all_rows
|
||||
|
||||
for row in tbody_rows:
|
||||
# for row in table.select('tr:not(:has(ancestor::thead))'):
|
||||
row_data = []
|
||||
for cell in row.select('td'):
|
||||
text = cell.get_text().strip()
|
||||
colspan = int(cell.get('colspan', 1))
|
||||
row_data.extend([text] * colspan)
|
||||
if row_data:
|
||||
rows.append(row_data)
|
||||
|
||||
# Align rows with headers
|
||||
max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0)
|
||||
aligned_rows = []
|
||||
for row in rows:
|
||||
aligned = row[:max_columns] + [''] * (max_columns - len(row))
|
||||
aligned_rows.append(aligned)
|
||||
|
||||
if not headers:
|
||||
headers = [f"Column {i+1}" for i in range(max_columns)]
|
||||
|
||||
return {
|
||||
"headers": headers,
|
||||
"rows": aligned_rows,
|
||||
"caption": caption,
|
||||
"summary": summary,
|
||||
}
|
||||
|
||||
def flatten_nested_elements(self, node):
|
||||
"""
|
||||
Flatten nested elements in a HTML tree.
|
||||
|
||||
Args:
|
||||
node (Tag): The root node of the HTML tree.
|
||||
|
||||
Returns:
|
||||
Tag: The flattened HTML tree.
|
||||
"""
|
||||
if isinstance(node, NavigableString):
|
||||
return node
|
||||
if (
|
||||
len(node.contents) == 1
|
||||
and isinstance(node.contents[0], Tag)
|
||||
and node.contents[0].name == node.name
|
||||
):
|
||||
return self.flatten_nested_elements(node.contents[0])
|
||||
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
|
||||
return node
|
||||
|
||||
def find_closest_parent_with_useful_text(self, tag, **kwargs):
|
||||
"""
|
||||
Find the closest parent with useful text.
|
||||
|
||||
Args:
|
||||
tag (Tag): The starting tag to search from.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
Tag: The closest parent with useful text, or None if not found.
|
||||
"""
|
||||
image_description_min_word_threshold = kwargs.get(
|
||||
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
|
||||
)
|
||||
current_tag = tag
|
||||
while current_tag:
|
||||
current_tag = current_tag.parent
|
||||
# Get the text content of the parent tag
|
||||
if current_tag:
|
||||
text_content = current_tag.get_text(separator=" ", strip=True)
|
||||
# Check if the text content has at least word_count_threshold
|
||||
if len(text_content.split()) >= image_description_min_word_threshold:
|
||||
return text_content
|
||||
return None
|
||||
|
||||
def remove_unwanted_attributes(
|
||||
self, element, important_attrs, keep_data_attributes=False
|
||||
):
|
||||
"""
|
||||
Remove unwanted attributes from an HTML element.
|
||||
|
||||
Args:
|
||||
element (Tag): The HTML element to remove attributes from.
|
||||
important_attrs (list): List of important attributes to keep.
|
||||
keep_data_attributes (bool): Whether to keep data attributes.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
attrs_to_remove = []
|
||||
for attr in element.attrs:
|
||||
if attr not in important_attrs:
|
||||
if keep_data_attributes:
|
||||
if not attr.startswith("data-"):
|
||||
attrs_to_remove.append(attr)
|
||||
else:
|
||||
attrs_to_remove.append(attr)
|
||||
|
||||
for attr in attrs_to_remove:
|
||||
del element[attr]
|
||||
|
||||
def process_image(self, img, url, index, total_images, **kwargs):
|
||||
"""
|
||||
Process an image element.
|
||||
|
||||
How it works:
|
||||
1. Check if the image has valid display and inside undesired html elements.
|
||||
2. Score an image for it's usefulness.
|
||||
3. Extract image file metadata to extract size and extension.
|
||||
4. Generate a dictionary with the processed image information.
|
||||
5. Return the processed image information.
|
||||
|
||||
Args:
|
||||
img (Tag): The image element to process.
|
||||
url (str): The URL of the page containing the image.
|
||||
index (int): The index of the image in the list of images.
|
||||
total_images (int): The total number of images in the list.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the processed image information.
|
||||
"""
|
||||
# parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
|
||||
# if ' ' in u else None}
|
||||
# for u in [f"http{p}" for p in s.split("http") if p]]
|
||||
|
||||
# Constants for checks
|
||||
classes_to_check = frozenset(["button", "icon", "logo"])
|
||||
tags_to_check = frozenset(["button", "input"])
|
||||
image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
|
||||
|
||||
# Pre-fetch commonly used attributes
|
||||
style = img.get("style", "")
|
||||
alt = img.get("alt", "")
|
||||
src = img.get("src", "")
|
||||
data_src = img.get("data-src", "")
|
||||
srcset = img.get("srcset", "")
|
||||
data_srcset = img.get("data-srcset", "")
|
||||
width = img.get("width")
|
||||
height = img.get("height")
|
||||
parent = img.parent
|
||||
parent_classes = parent.get("class", [])
|
||||
|
||||
# Quick validation checks
|
||||
if (
|
||||
"display:none" in style
|
||||
or parent.name in tags_to_check
|
||||
or any(c in cls for c in parent_classes for cls in classes_to_check)
|
||||
or any(c in src for c in classes_to_check)
|
||||
or any(c in alt for c in classes_to_check)
|
||||
):
|
||||
return None
|
||||
|
||||
# Quick score calculation
|
||||
score = 0
|
||||
if width and width.isdigit():
|
||||
width_val = int(width)
|
||||
score += 1 if width_val > 150 else 0
|
||||
if height and height.isdigit():
|
||||
height_val = int(height)
|
||||
score += 1 if height_val > 150 else 0
|
||||
if alt:
|
||||
score += 1
|
||||
score += index / total_images < 0.5
|
||||
|
||||
# image_format = ''
|
||||
# if "data:image/" in src:
|
||||
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0]
|
||||
# else:
|
||||
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0]
|
||||
|
||||
# if image_format in ('jpg', 'png', 'webp', 'avif'):
|
||||
# score += 1
|
||||
|
||||
# Check for image format in all possible sources
|
||||
def has_image_format(url):
|
||||
return any(fmt in url.lower() for fmt in image_formats)
|
||||
|
||||
# Score for having proper image sources
|
||||
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]):
|
||||
score += 1
|
||||
if srcset or data_srcset:
|
||||
score += 1
|
||||
if img.find_parent("picture"):
|
||||
score += 1
|
||||
|
||||
# Detect format from any available source
|
||||
detected_format = None
|
||||
for url in [src, data_src, srcset, data_srcset]:
|
||||
if url:
|
||||
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
|
||||
if format_matches:
|
||||
detected_format = format_matches[0]
|
||||
break
|
||||
|
||||
if score <= kwargs.get("image_score_threshold", IMAGE_SCORE_THRESHOLD):
|
||||
return None
|
||||
|
||||
# Use set for deduplication
|
||||
unique_urls = set()
|
||||
image_variants = []
|
||||
|
||||
# Generate a unique group ID for this set of variants
|
||||
group_id = index
|
||||
|
||||
# Base image info template
|
||||
base_info = {
|
||||
"alt": alt,
|
||||
"desc": self.find_closest_parent_with_useful_text(img, **kwargs),
|
||||
"score": score,
|
||||
"type": "image",
|
||||
"group_id": group_id, # Group ID for this set of variants
|
||||
"format": detected_format,
|
||||
}
|
||||
|
||||
# Inline function for adding variants
|
||||
def add_variant(src, width=None):
|
||||
if src and not src.startswith("data:") and src not in unique_urls:
|
||||
unique_urls.add(src)
|
||||
image_variants.append({**base_info, "src": src, "width": width})
|
||||
|
||||
# Process all sources
|
||||
add_variant(src)
|
||||
add_variant(data_src)
|
||||
|
||||
# Handle srcset and data-srcset in one pass
|
||||
for attr in ("srcset", "data-srcset"):
|
||||
if value := img.get(attr):
|
||||
for source in parse_srcset(value):
|
||||
add_variant(source["url"], source["width"])
|
||||
|
||||
# Quick picture element check
|
||||
if picture := img.find_parent("picture"):
|
||||
for source in picture.find_all("source"):
|
||||
if srcset := source.get("srcset"):
|
||||
for src in parse_srcset(srcset):
|
||||
add_variant(src["url"], src["width"])
|
||||
|
||||
# Framework-specific attributes in one pass
|
||||
for attr, value in img.attrs.items():
|
||||
if (
|
||||
attr.startswith("data-")
|
||||
and ("src" in attr or "srcset" in attr)
|
||||
and "http" in value
|
||||
):
|
||||
add_variant(value)
|
||||
|
||||
return image_variants if image_variants else None
|
||||
|
||||
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
|
||||
def process_element(self, url: str, element: lhtml.HtmlElement, **kwargs) -> Dict[str, Any]:
|
||||
"""
|
||||
Process an HTML element.
|
||||
|
||||
How it works:
|
||||
1. Check if the element is an image, video, or audio.
|
||||
2. Extract the element's attributes and content.
|
||||
3. Process the element based on its type.
|
||||
4. Return the processed element information.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the page containing the element.
|
||||
element (Tag): The HTML element to process.
|
||||
element (lhtml.HtmlElement): The HTML element to process.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
@@ -584,451 +208,40 @@ class WebScrapingStrategy(ContentScrapingStrategy):
|
||||
media = {"images": [], "videos": [], "audios": [], "tables": []}
|
||||
internal_links_dict = {}
|
||||
external_links_dict = {}
|
||||
|
||||
self._process_element(
|
||||
url, element, media, internal_links_dict, external_links_dict, **kwargs
|
||||
)
|
||||
|
||||
return {
|
||||
"media": media,
|
||||
"internal_links_dict": internal_links_dict,
|
||||
"external_links_dict": external_links_dict,
|
||||
}
|
||||
|
||||
def _process_element(
|
||||
self,
|
||||
url,
|
||||
element: PageElement,
|
||||
media: Dict[str, Any],
|
||||
internal_links_dict: Dict[str, Any],
|
||||
external_links_dict: Dict[str, Any],
|
||||
**kwargs,
|
||||
) -> bool:
|
||||
def remove_unwanted_attributes(self, element: lhtml.HtmlElement, important_attrs: List[str], keep_data_attributes: bool = False):
|
||||
"""
|
||||
Process an HTML element.
|
||||
"""
|
||||
try:
|
||||
if isinstance(element, NavigableString):
|
||||
if isinstance(element, Comment):
|
||||
element.extract()
|
||||
return False
|
||||
|
||||
# if element.name == 'img':
|
||||
# process_image(element, url, 0, 1)
|
||||
# return True
|
||||
base_domain = kwargs.get("base_domain", get_base_domain(url))
|
||||
|
||||
if element.name in ["script", "style", "link", "meta", "noscript"]:
|
||||
element.decompose()
|
||||
return False
|
||||
|
||||
keep_element = False
|
||||
# Special case for table elements - always preserve structure
|
||||
if element.name in ["tr", "td", "th"]:
|
||||
keep_element = True
|
||||
|
||||
exclude_domains = kwargs.get("exclude_domains", [])
|
||||
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS))
|
||||
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
|
||||
# exclude_social_media_domains = list(set(exclude_social_media_domains))
|
||||
|
||||
try:
|
||||
if element.name == "a" and element.get("href"):
|
||||
href = element.get("href", "").strip()
|
||||
if not href: # Skip empty hrefs
|
||||
return False
|
||||
|
||||
# url_base = url.split("/")[2]
|
||||
|
||||
# Normalize the URL
|
||||
try:
|
||||
normalized_href = normalize_url(href, url)
|
||||
except ValueError:
|
||||
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}")
|
||||
return False
|
||||
|
||||
link_data = {
|
||||
"href": normalized_href,
|
||||
"text": element.get_text().strip(),
|
||||
"title": element.get("title", "").strip(),
|
||||
"base_domain": base_domain,
|
||||
}
|
||||
|
||||
is_external = is_external_url(normalized_href, base_domain)
|
||||
|
||||
keep_element = True
|
||||
|
||||
# Handle external link exclusions
|
||||
if is_external:
|
||||
link_base_domain = get_base_domain(normalized_href)
|
||||
link_data["base_domain"] = link_base_domain
|
||||
if kwargs.get("exclude_external_links", False):
|
||||
element.decompose()
|
||||
return False
|
||||
# elif kwargs.get('exclude_social_media_links', False):
|
||||
# if link_base_domain in exclude_social_media_domains:
|
||||
# element.decompose()
|
||||
# return False
|
||||
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains):
|
||||
# element.decompose()
|
||||
# return False
|
||||
elif exclude_domains:
|
||||
if link_base_domain in exclude_domains:
|
||||
element.decompose()
|
||||
return False
|
||||
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])):
|
||||
# element.decompose()
|
||||
# return False
|
||||
|
||||
if is_external:
|
||||
if normalized_href not in external_links_dict:
|
||||
external_links_dict[normalized_href] = link_data
|
||||
else:
|
||||
if kwargs.get("exclude_internal_links", False):
|
||||
element.decompose()
|
||||
return False
|
||||
if normalized_href not in internal_links_dict:
|
||||
internal_links_dict[normalized_href] = link_data
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Error processing links: {str(e)}")
|
||||
|
||||
try:
|
||||
if element.name == "img":
|
||||
potential_sources = [
|
||||
"src",
|
||||
"data-src",
|
||||
"srcset" "data-lazy-src",
|
||||
"data-original",
|
||||
]
|
||||
src = element.get("src", "")
|
||||
while not src and potential_sources:
|
||||
src = element.get(potential_sources.pop(0), "")
|
||||
if not src:
|
||||
element.decompose()
|
||||
return False
|
||||
|
||||
# If it is srcset pick up the first image
|
||||
if "srcset" in element.attrs:
|
||||
src = element.attrs["srcset"].split(",")[0].split(" ")[0]
|
||||
|
||||
# If image src is internal, then skip
|
||||
if not is_external_url(src, base_domain):
|
||||
return True
|
||||
|
||||
image_src_base_domain = get_base_domain(src)
|
||||
|
||||
# Check flag if we should remove external images
|
||||
if kwargs.get("exclude_external_images", False):
|
||||
element.decompose()
|
||||
return False
|
||||
# src_url_base = src.split('/')[2]
|
||||
# url_base = url.split('/')[2]
|
||||
# if url_base not in src_url_base:
|
||||
# element.decompose()
|
||||
# return False
|
||||
|
||||
# if kwargs.get('exclude_social_media_links', False):
|
||||
# if image_src_base_domain in exclude_social_media_domains:
|
||||
# element.decompose()
|
||||
# return False
|
||||
# src_url_base = src.split('/')[2]
|
||||
# url_base = url.split('/')[2]
|
||||
# if any(domain in src for domain in exclude_social_media_domains):
|
||||
# element.decompose()
|
||||
# return False
|
||||
|
||||
# Handle exclude domains
|
||||
if exclude_domains:
|
||||
if image_src_base_domain in exclude_domains:
|
||||
element.decompose()
|
||||
return False
|
||||
# if any(domain in src for domain in kwargs.get('exclude_domains', [])):
|
||||
# element.decompose()
|
||||
# return False
|
||||
|
||||
return True # Always keep image elements
|
||||
except Exception:
|
||||
raise "Error processing images"
|
||||
|
||||
# Check if flag to remove all forms is set
|
||||
if kwargs.get("remove_forms", False) and element.name == "form":
|
||||
element.decompose()
|
||||
return False
|
||||
|
||||
if element.name in ["video", "audio"]:
|
||||
media[f"{element.name}s"].append(
|
||||
{
|
||||
"src": element.get("src"),
|
||||
"alt": element.get("alt"),
|
||||
"type": element.name,
|
||||
"description": self.find_closest_parent_with_useful_text(
|
||||
element, **kwargs
|
||||
),
|
||||
}
|
||||
)
|
||||
source_tags = element.find_all("source")
|
||||
for source_tag in source_tags:
|
||||
media[f"{element.name}s"].append(
|
||||
{
|
||||
"src": source_tag.get("src"),
|
||||
"alt": element.get("alt"),
|
||||
"type": element.name,
|
||||
"description": self.find_closest_parent_with_useful_text(
|
||||
element, **kwargs
|
||||
),
|
||||
}
|
||||
)
|
||||
return True # Always keep video and audio elements
|
||||
|
||||
if element.name in ONLY_TEXT_ELIGIBLE_TAGS:
|
||||
if kwargs.get("only_text", False):
|
||||
element.replace_with(element.get_text())
|
||||
|
||||
try:
|
||||
self.remove_unwanted_attributes(
|
||||
element, IMPORTANT_ATTRS + kwargs.get("keep_attrs", []) , kwargs.get("keep_data_attributes", False)
|
||||
)
|
||||
except Exception as e:
|
||||
# print('Error removing unwanted attributes:', str(e))
|
||||
self._log(
|
||||
"error",
|
||||
message="Error removing unwanted attributes: {error}",
|
||||
tag="SCRAPE",
|
||||
params={"error": str(e)},
|
||||
)
|
||||
# Process children
|
||||
for child in list(element.children):
|
||||
if isinstance(child, NavigableString) and not isinstance(
|
||||
child, Comment
|
||||
):
|
||||
if len(child.strip()) > 0:
|
||||
keep_element = True
|
||||
else:
|
||||
if self._process_element(
|
||||
url,
|
||||
child,
|
||||
media,
|
||||
internal_links_dict,
|
||||
external_links_dict,
|
||||
**kwargs,
|
||||
):
|
||||
keep_element = True
|
||||
|
||||
# Check word count
|
||||
word_count_threshold = kwargs.get(
|
||||
"word_count_threshold", MIN_WORD_THRESHOLD
|
||||
)
|
||||
if not keep_element:
|
||||
word_count = len(element.get_text(strip=True).split())
|
||||
keep_element = word_count >= word_count_threshold
|
||||
|
||||
if not keep_element:
|
||||
element.decompose()
|
||||
|
||||
return keep_element
|
||||
except Exception as e:
|
||||
# print('Error processing element:', str(e))
|
||||
self._log(
|
||||
"error",
|
||||
message="Error processing element: {error}",
|
||||
tag="SCRAPE",
|
||||
params={"error": str(e)},
|
||||
)
|
||||
return False
|
||||
|
||||
def _scrap(
|
||||
self,
|
||||
url: str,
|
||||
html: str,
|
||||
word_count_threshold: int = MIN_WORD_THRESHOLD,
|
||||
css_selector: str = None,
|
||||
target_elements: List[str] = None,
|
||||
**kwargs,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract content from HTML using BeautifulSoup.
|
||||
Remove unwanted attributes from an HTML element.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the page to scrape.
|
||||
html (str): The HTML content of the page to scrape.
|
||||
word_count_threshold (int): The minimum word count threshold for content extraction.
|
||||
css_selector (str): The CSS selector to use for content extraction.
|
||||
**kwargs: Additional keyword arguments.
|
||||
element (lhtml.HtmlElement): The HTML element to remove attributes from.
|
||||
important_attrs (List[str]): List of important attributes to keep.
|
||||
keep_data_attributes (bool): Whether to keep data attributes.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the extracted content.
|
||||
None
|
||||
"""
|
||||
success = True
|
||||
if not html:
|
||||
return None
|
||||
attrs_to_remove = []
|
||||
for attr in element.attrib:
|
||||
if attr not in important_attrs:
|
||||
if keep_data_attributes:
|
||||
if not attr.startswith("data-"):
|
||||
attrs_to_remove.append(attr)
|
||||
else:
|
||||
attrs_to_remove.append(attr)
|
||||
|
||||
parser_type = kwargs.get("parser", "lxml")
|
||||
soup = BeautifulSoup(html, parser_type)
|
||||
body = soup.body
|
||||
if body is None:
|
||||
raise Exception("'<body>' tag is not found in fetched html. Consider adding wait_for=\"css:body\" to wait for body tag to be loaded into DOM.")
|
||||
base_domain = get_base_domain(url)
|
||||
|
||||
# Early removal of all images if exclude_all_images is set
|
||||
# This happens before any processing to minimize memory usage
|
||||
if kwargs.get("exclude_all_images", False):
|
||||
for img in body.find_all('img'):
|
||||
img.decompose()
|
||||
|
||||
try:
|
||||
meta = extract_metadata("", soup)
|
||||
except Exception as e:
|
||||
self._log(
|
||||
"error",
|
||||
message="Error extracting metadata: {error}",
|
||||
tag="SCRAPE",
|
||||
params={"error": str(e)},
|
||||
)
|
||||
meta = {}
|
||||
|
||||
# Handle tag-based removal first - faster than CSS selection
|
||||
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
|
||||
if excluded_tags:
|
||||
for element in body.find_all(lambda tag: tag.name in excluded_tags):
|
||||
element.extract()
|
||||
|
||||
# Handle CSS selector-based removal
|
||||
excluded_selector = kwargs.get("excluded_selector", "")
|
||||
if excluded_selector:
|
||||
is_single_selector = (
|
||||
"," not in excluded_selector and " " not in excluded_selector
|
||||
)
|
||||
if is_single_selector:
|
||||
while element := body.select_one(excluded_selector):
|
||||
element.extract()
|
||||
else:
|
||||
for element in body.select(excluded_selector):
|
||||
element.extract()
|
||||
|
||||
content_element = None
|
||||
if target_elements:
|
||||
try:
|
||||
for_content_targeted_element = []
|
||||
for target_element in target_elements:
|
||||
for_content_targeted_element.extend(body.select(target_element))
|
||||
content_element = soup.new_tag("div")
|
||||
for el in for_content_targeted_element:
|
||||
content_element.append(copy.deepcopy(el))
|
||||
except Exception as e:
|
||||
self._log("error", f"Error with target element detection: {str(e)}", "SCRAPE")
|
||||
return None
|
||||
else:
|
||||
content_element = body
|
||||
|
||||
kwargs["exclude_social_media_domains"] = set(
|
||||
kwargs.get("exclude_social_media_domains", []) + SOCIAL_MEDIA_DOMAINS
|
||||
)
|
||||
kwargs["exclude_domains"] = set(kwargs.get("exclude_domains", []))
|
||||
if kwargs.get("exclude_social_media_links", False):
|
||||
kwargs["exclude_domains"] = kwargs["exclude_domains"].union(
|
||||
kwargs["exclude_social_media_domains"]
|
||||
)
|
||||
|
||||
result_obj = self.process_element(
|
||||
url,
|
||||
body,
|
||||
word_count_threshold=word_count_threshold,
|
||||
base_domain=base_domain,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
links = {"internal": [], "external": []}
|
||||
media = result_obj["media"]
|
||||
internal_links_dict = result_obj["internal_links_dict"]
|
||||
external_links_dict = result_obj["external_links_dict"]
|
||||
|
||||
# Update the links dictionary with unique links
|
||||
links["internal"] = list(internal_links_dict.values())
|
||||
links["external"] = list(external_links_dict.values())
|
||||
|
||||
# # Process images using ThreadPoolExecutor
|
||||
imgs = body.find_all("img")
|
||||
|
||||
media["images"] = [
|
||||
img
|
||||
for result in (
|
||||
self.process_image(img, url, i, len(imgs), **kwargs)
|
||||
for i, img in enumerate(imgs)
|
||||
)
|
||||
if result is not None
|
||||
for img in result
|
||||
]
|
||||
|
||||
# Process tables if not excluded
|
||||
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
|
||||
if 'table' not in excluded_tags:
|
||||
tables = body.find_all('table')
|
||||
for table in tables:
|
||||
if self.is_data_table(table, **kwargs):
|
||||
table_data = self.extract_table_data(table)
|
||||
media["tables"].append(table_data)
|
||||
|
||||
body = self.flatten_nested_elements(body)
|
||||
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
|
||||
for img in imgs:
|
||||
src = img.get("src", "")
|
||||
if base64_pattern.match(src):
|
||||
# Replace base64 data with empty string
|
||||
img["src"] = base64_pattern.sub("", src)
|
||||
|
||||
str_body = ""
|
||||
try:
|
||||
str_body = content_element.encode_contents().decode("utf-8")
|
||||
except Exception:
|
||||
# Reset body to the original HTML
|
||||
success = False
|
||||
body = BeautifulSoup(html, "html.parser")
|
||||
|
||||
# Create a new div with a special ID
|
||||
error_div = body.new_tag("div", id="crawl4ai_error_message")
|
||||
error_div.string = """
|
||||
Crawl4AI Error: This page is not fully supported.
|
||||
|
||||
Possible reasons:
|
||||
1. The page may have restrictions that prevent crawling.
|
||||
2. The page might not be fully loaded.
|
||||
|
||||
Suggestions:
|
||||
- Try calling the crawl function with these parameters:
|
||||
magic=True,
|
||||
- Set headless=False to visualize what's happening on the page.
|
||||
|
||||
If the issue persists, please check the page's structure and any potential anti-crawling measures.
|
||||
"""
|
||||
|
||||
# Append the error div to the body
|
||||
body.append(error_div)
|
||||
str_body = body.encode_contents().decode("utf-8")
|
||||
|
||||
print(
|
||||
"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details."
|
||||
)
|
||||
self._log(
|
||||
"error",
|
||||
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
|
||||
tag="SCRAPE",
|
||||
)
|
||||
|
||||
cleaned_html = str_body.replace("\n\n", "\n").replace(" ", " ")
|
||||
|
||||
return {
|
||||
"cleaned_html": cleaned_html,
|
||||
"success": success,
|
||||
"media": media,
|
||||
"links": links,
|
||||
"metadata": meta,
|
||||
}
|
||||
|
||||
|
||||
class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
def __init__(self, logger=None):
|
||||
super().__init__(logger)
|
||||
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
|
||||
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
|
||||
for attr in attrs_to_remove:
|
||||
del element.attrib[attr]
|
||||
|
||||
def _process_element(
|
||||
self,
|
||||
@@ -1190,7 +403,7 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
return None
|
||||
|
||||
parent = img.getparent()
|
||||
if parent.tag in ["button", "input"]:
|
||||
if parent.tag in self.tags_to_check:
|
||||
return None
|
||||
|
||||
parent_classes = parent.get("class", "").split()
|
||||
@@ -1200,8 +413,8 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
return None
|
||||
|
||||
# If src is in class or alt, likely an icon
|
||||
if (src and any(c in src for c in ["button", "icon", "logo"])) or (
|
||||
alt and any(c in alt for c in ["button", "icon", "logo"])
|
||||
if (src and any(c in src for c in self.classes_to_check)) or (
|
||||
alt and any(c in alt for c in self.classes_to_check)
|
||||
):
|
||||
return None
|
||||
|
||||
@@ -1216,11 +429,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
score += index / total_images < 0.5
|
||||
|
||||
# Check formats in all possible sources
|
||||
image_formats = {"jpg", "jpeg", "png", "webp", "avif", "gif"}
|
||||
detected_format = None
|
||||
for url in [src, data_src, srcset, data_srcset]:
|
||||
if url:
|
||||
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
|
||||
format_matches = [fmt for fmt in self.image_formats if fmt in url.lower()]
|
||||
if format_matches:
|
||||
detected_format = format_matches[0]
|
||||
score += 1
|
||||
@@ -1484,6 +696,13 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
|
||||
success = True
|
||||
try:
|
||||
# Extract metadata FIRST from the original HTML to avoid issues with modified content.
|
||||
try:
|
||||
meta = extract_metadata_using_lxml(html, None) # Pass the original HTML
|
||||
except Exception as e:
|
||||
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
|
||||
meta = {}
|
||||
|
||||
doc = lhtml.document_fromstring(html)
|
||||
# Match BeautifulSoup's behavior of using body or full doc
|
||||
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
|
||||
@@ -1524,14 +743,14 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
"error", f"Error with excluded CSS selector: {str(e)}", "SCRAPE"
|
||||
)
|
||||
|
||||
# Extract metadata before any content filtering
|
||||
try:
|
||||
meta = extract_metadata_using_lxml(
|
||||
"", doc
|
||||
) # Using same function as BeautifulSoup version
|
||||
except Exception as e:
|
||||
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
|
||||
meta = {}
|
||||
# # Extract metadata before any content filtering
|
||||
# try:
|
||||
# meta = extract_metadata_using_lxml(
|
||||
# "", doc
|
||||
# ) # Using same function as BeautifulSoup version
|
||||
# except Exception as e:
|
||||
# self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
|
||||
# meta = {}
|
||||
|
||||
content_element = None
|
||||
if target_elements:
|
||||
@@ -1611,7 +830,9 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
|
||||
|
||||
# Remove unneeded attributes
|
||||
self.remove_unwanted_attributes_fast(
|
||||
body, keep_data_attributes=kwargs.get("keep_data_attributes", False)
|
||||
body,
|
||||
important_attrs=IMPORTANT_ATTRS + kwargs.get("keep_attrs", []),
|
||||
keep_data_attributes=kwargs.get("keep_data_attributes", False)
|
||||
)
|
||||
|
||||
# Generate output HTML
|
||||
|
||||
@@ -19,7 +19,7 @@ LLMConfig = Union['LLMConfigType']
|
||||
|
||||
# Content scraping types
|
||||
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
|
||||
WebScrapingStrategy = Union['WebScrapingStrategyType']
|
||||
# WebScrapingStrategy = Union['WebScrapingStrategyType']
|
||||
LXMLWebScrapingStrategy = Union['LXMLWebScrapingStrategyType']
|
||||
|
||||
# Proxy types
|
||||
@@ -106,7 +106,7 @@ if TYPE_CHECKING:
|
||||
# Content scraping imports
|
||||
from .content_scraping_strategy import (
|
||||
ContentScrapingStrategy as ContentScrapingStrategyType,
|
||||
WebScrapingStrategy as WebScrapingStrategyType,
|
||||
# WebScrapingStrategy as WebScrapingStrategyType,
|
||||
LXMLWebScrapingStrategy as LXMLWebScrapingStrategyType,
|
||||
)
|
||||
|
||||
|
||||
@@ -1487,8 +1487,29 @@ def extract_metadata_using_lxml(html, doc=None):
|
||||
head = head[0]
|
||||
|
||||
# Title - using XPath
|
||||
# title = head.xpath(".//title/text()")
|
||||
# metadata["title"] = title[0].strip() if title else None
|
||||
|
||||
# === Title Extraction - New Approach ===
|
||||
# Attempt to extract <title> using XPath
|
||||
title = head.xpath(".//title/text()")
|
||||
metadata["title"] = title[0].strip() if title else None
|
||||
title = title[0] if title else None
|
||||
|
||||
# Fallback: Use .find() in case XPath fails due to malformed HTML
|
||||
if not title:
|
||||
title_el = doc.find(".//title")
|
||||
title = title_el.text if title_el is not None else None
|
||||
|
||||
# Final fallback: Use OpenGraph or Twitter title if <title> is missing or empty
|
||||
if not title:
|
||||
title_candidates = (
|
||||
doc.xpath("//meta[@property='og:title']/@content") or
|
||||
doc.xpath("//meta[@name='twitter:title']/@content")
|
||||
)
|
||||
title = title_candidates[0] if title_candidates else None
|
||||
|
||||
# Strip and assign title
|
||||
metadata["title"] = title.strip() if title else None
|
||||
|
||||
# Meta description - using XPath with multiple attribute conditions
|
||||
description = head.xpath('.//meta[@name="description"]/@content')
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
import time, re
|
||||
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(parent_dir)
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
|
||||
import functools
|
||||
from collections import defaultdict
|
||||
|
||||
@@ -57,7 +63,7 @@ methods_to_profile = [
|
||||
|
||||
|
||||
# Apply decorators to both strategies
|
||||
for strategy, name in [(WebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
|
||||
for strategy, name in [(LXMLWebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
|
||||
for method in methods_to_profile:
|
||||
apply_decorators(strategy, method, name)
|
||||
|
||||
@@ -85,7 +91,7 @@ def generate_large_html(n_elements=1000):
|
||||
|
||||
def test_scraping():
|
||||
# Initialize both scrapers
|
||||
original_scraper = WebScrapingStrategy()
|
||||
original_scraper = LXMLWebScrapingStrategy()
|
||||
selected_scraper = LXMLWebScrapingStrategy()
|
||||
|
||||
# Generate test HTML
|
||||
|
||||
@@ -12,10 +12,10 @@ parent_dir = os.path.dirname(
|
||||
sys.path.append(parent_dir)
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
|
||||
from crawl4ai.content_scraping_strategy import (
|
||||
WebScrapingStrategy as WebScrapingStrategyCurrent,
|
||||
)
|
||||
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
|
||||
# from crawl4ai.content_scraping_strategy import (
|
||||
# WebScrapingStrategy as WebScrapingStrategyCurrent,
|
||||
# )
|
||||
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
|
||||
|
||||
|
||||
@@ -32,8 +32,8 @@ class TestResult:
|
||||
|
||||
class StrategyTester:
|
||||
def __init__(self):
|
||||
self.new_scraper = WebScrapingStrategy()
|
||||
self.current_scraper = WebScrapingStrategyCurrent()
|
||||
self.new_scraper = LXMLWebScrapingStrategy()
|
||||
self.current_scraper = LXMLWebScrapingStrategy()
|
||||
with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f:
|
||||
self.WIKI_HTML = f.read()
|
||||
self.results = {"new": [], "current": []}
|
||||
|
||||
@@ -2,7 +2,6 @@ import json
|
||||
import time
|
||||
from bs4 import BeautifulSoup
|
||||
from crawl4ai.content_scraping_strategy import (
|
||||
WebScrapingStrategy,
|
||||
LXMLWebScrapingStrategy,
|
||||
)
|
||||
from typing import Dict, List, Tuple
|
||||
@@ -274,7 +273,7 @@ def get_test_scenarios():
|
||||
that will be passed into scrap() for testing various features.
|
||||
"""
|
||||
TEST_SCENARIOS = {
|
||||
# "default": {},
|
||||
"default": {},
|
||||
# "exclude_domains": {
|
||||
# "exclude_domains": {"images.example.com", "ads.example.com"}
|
||||
# },
|
||||
@@ -609,19 +608,26 @@ class ScraperEquivalenceTester:
|
||||
print("\n=== Testing complicated HTML with multiple parameter scenarios ===")
|
||||
|
||||
# Create the scrapers once (or you can re-create if needed)
|
||||
original = WebScrapingStrategy()
|
||||
# original = WebScrapingStrategy()
|
||||
original = LXMLWebScrapingStrategy()
|
||||
lxml = LXMLWebScrapingStrategy()
|
||||
|
||||
# Base URL for testing
|
||||
url = "http://test.com"
|
||||
url = "https://kidocode.com"
|
||||
|
||||
for scenario_name, params in get_test_scenarios().items():
|
||||
print(f"\nScenario: {scenario_name}")
|
||||
|
||||
start = time.time()
|
||||
orig_result = original.scrap("http://test.com", complicated_html, **params)
|
||||
orig_result = original.scrap(url, complicated_html, **params)
|
||||
orig_time = time.time() - start
|
||||
orig_result = orig_result.model_dump()
|
||||
|
||||
start = time.time()
|
||||
lxml_result = lxml.scrap("http://test.com", complicated_html, **params)
|
||||
lxml_result = lxml.scrap(url, complicated_html, **params)
|
||||
lxml_time = time.time() - start
|
||||
lxml_result = lxml_result.model_dump()
|
||||
|
||||
diffs = {}
|
||||
link_diff = self.deep_compare_links(
|
||||
|
||||
Reference in New Issue
Block a user