Compare commits

..

2 Commits

Author SHA1 Message Date
Ahmed-Tawfik94
2b2ef12e25 #1156: Refactor completion function calls to use asynchronous version 2025-05-27 15:10:34 +08:00
Ahmed-Tawfik94
d9b3db925a Refactor extraction and completion functions to support asynchronous execution 2025-05-26 16:01:38 +08:00
14 changed files with 908 additions and 191 deletions

View File

@@ -6,7 +6,7 @@ from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, L
from .content_scraping_strategy import (
ContentScrapingStrategy,
# WebScrapingStrategy,
WebScrapingStrategy,
LXMLWebScrapingStrategy,
)
from .async_logger import (
@@ -100,7 +100,7 @@ __all__ = [
"CrawlerHub",
"CacheMode",
"ContentScrapingStrategy",
# "WebScrapingStrategy",
"WebScrapingStrategy",
"LXMLWebScrapingStrategy",
"BrowserConfig",
"CrawlerRunConfig",

View File

@@ -17,7 +17,7 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy
from .deep_crawling import DeepCrawlStrategy
from .cache_context import CacheMode
@@ -725,7 +725,7 @@ class CrawlerRunConfig():
parser_type (str): Type of parser to use for HTML parsing.
Default: "lxml".
scraping_strategy (ContentScrapingStrategy): Scraping strategy to use.
Default: LXMLWebScrapingStrategy.
Default: WebScrapingStrategy.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
@@ -979,7 +979,7 @@ class CrawlerRunConfig():
self.remove_forms = remove_forms
self.prettiify = prettiify
self.parser_type = parser_type
self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy()
self.scraping_strategy = scraping_strategy or WebScrapingStrategy()
self.proxy_config = proxy_config
self.proxy_rotation_strategy = proxy_rotation_strategy

View File

@@ -607,7 +607,7 @@ class AsyncWebCrawler:
else config.chunking_strategy
)
sections = chunking.chunk(content)
extracted_content = config.extraction_strategy.run(url, sections)
extracted_content = await config.extraction_strategy.run(url, sections)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)

View File

@@ -9,7 +9,7 @@ from bs4 import NavigableString, Comment
from .utils import (
clean_tokens,
perform_completion_with_backoff,
aperform_completion_with_backoff,
escape_json_string,
sanitize_html,
get_home_folder,
@@ -953,7 +953,7 @@ class LLMContentFilter(RelevantContentFilter):
for var, value in prompt_variables.items():
prompt = prompt.replace("{" + var + "}", value)
def _proceed_with_chunk(
async def _proceed_with_chunk(
provider: str,
prompt: str,
api_token: str,
@@ -966,7 +966,7 @@ class LLMContentFilter(RelevantContentFilter):
tag="CHUNK",
params={"chunk_num": i + 1},
)
return perform_completion_with_backoff(
return await aperform_completion_with_backoff(
provider,
prompt,
api_token,

View File

@@ -2,7 +2,7 @@ import re
from itertools import chain
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
# from bs4 import BeautifulSoup
from bs4 import BeautifulSoup
import asyncio
import requests
from .config import (
@@ -13,12 +13,12 @@ from .config import (
IMPORTANT_ATTRS,
SOCIAL_MEDIA_DOMAINS,
)
# from bs4 import NavigableString, Comment
# from bs4 import PageElement, Tag
from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
from .utils import (
# extract_metadata,
extract_metadata,
normalize_url,
is_external_url,
get_base_domain,
@@ -96,16 +96,20 @@ class ContentScrapingStrategy(ABC):
pass
class LXMLWebScrapingStrategy(ContentScrapingStrategy):
class WebScrapingStrategy(ContentScrapingStrategy):
"""
Class for web content scraping. Perhaps the most important class.
How it works:
1. Extract content from HTML using BeautifulSoup.
2. Clean the extracted content using a content cleaning strategy.
3. Filter the cleaned content using a content filtering strategy.
4. Generate markdown content from the filtered content.
5. Return the markdown content.
"""
def __init__(self, logger=None):
self.logger = logger
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
# Constants for image processing
self.classes_to_check = frozenset(["button", "icon", "logo"])
self.tags_to_check = frozenset(["button", "input"])
self.image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
def _log(self, level, message, tag="SCRAPE", **kwargs):
"""Helper method to safely use logger."""
@@ -126,8 +130,7 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
ScrapingResult: A structured result containing the scraped content.
"""
actual_url = kwargs.get("redirected_url", url)
raw_result = self._scrap(actual_url, html, **kwargs)
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
if raw_result is None:
return ScrapingResult(
cleaned_html="",
@@ -191,48 +194,217 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
Returns:
ScrapingResult: A structured result containing the scraped content.
"""
return await asyncio.to_thread(self.scrap, url, html, **kwargs)
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
def process_element(self, url: str, element: lhtml.HtmlElement, **kwargs) -> Dict[str, Any]:
def is_data_table(self, table: Tag, **kwargs) -> bool:
"""
Process an HTML element.
Determine if a table element is a data table (not a layout table).
Args:
url (str): The URL of the page containing the element.
element (lhtml.HtmlElement): The HTML element to process.
table (Tag): BeautifulSoup Tag representing a table element
**kwargs: Additional keyword arguments including table_score_threshold
Returns:
bool: True if the table is a data table, False otherwise
"""
score = 0
# Check for thead and tbody
has_thead = len(table.select('thead')) > 0
has_tbody = len(table.select('tbody')) > 0
if has_thead:
score += 2
if has_tbody:
score += 1
# Check for th elements
th_count = len(table.select('th'))
if th_count > 0:
score += 2
if has_thead or len(table.select('tr:first-child th')) > 0:
score += 1
# Check for nested tables
if len(table.select('table')) > 0:
score -= 3
# Role attribute check
role = table.get('role', '').lower()
if role in {'presentation', 'none'}:
score -= 3
# Column consistency
rows = table.select('tr')
if not rows:
return False
col_counts = [len(row.select('td, th')) for row in rows]
avg_cols = sum(col_counts) / len(col_counts)
variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts)
if variance < 1:
score += 2
# Caption and summary
if table.select('caption'):
score += 2
if table.has_attr('summary') and table['summary']:
score += 1
# Text density
total_text = sum(len(cell.get_text().strip()) for row in rows for cell in row.select('td, th'))
total_tags = sum(1 for _ in table.descendants if isinstance(_, Tag))
text_ratio = total_text / (total_tags + 1e-5)
if text_ratio > 20:
score += 3
elif text_ratio > 10:
score += 2
# Data attributes
data_attrs = sum(1 for attr in table.attrs if attr.startswith('data-'))
score += data_attrs * 0.5
# Size check
if avg_cols >= 2 and len(rows) >= 2:
score += 2
threshold = kwargs.get('table_score_threshold', 7)
return score >= threshold
def extract_table_data(self, table: Tag) -> dict:
"""
Extract structured data from a table element.
Args:
table (Tag): BeautifulSoup Tag representing a table element
Returns:
dict: Dictionary containing table data (headers, rows, caption, summary)
"""
caption_elem = table.select_one('caption')
caption = caption_elem.get_text().strip() if caption_elem else ""
summary = table.get('summary', '').strip()
# Extract headers with colspan handling
headers = []
thead_rows = table.select('thead tr')
if thead_rows:
header_cells = thead_rows[0].select('th')
for cell in header_cells:
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
else:
first_row = table.select('tr:first-child')
if first_row:
for cell in first_row[0].select('th, td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
# Extract rows with colspan handling
rows = []
all_rows = table.select('tr')
thead = table.select_one('thead')
tbody_rows = []
if thead:
thead_rows = thead.select('tr')
tbody_rows = [row for row in all_rows if row not in thead_rows]
else:
if all_rows and all_rows[0].select('th'):
tbody_rows = all_rows[1:]
else:
tbody_rows = all_rows
for row in tbody_rows:
# for row in table.select('tr:not(:has(ancestor::thead))'):
row_data = []
for cell in row.select('td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
row_data.extend([text] * colspan)
if row_data:
rows.append(row_data)
# Align rows with headers
max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0)
aligned_rows = []
for row in rows:
aligned = row[:max_columns] + [''] * (max_columns - len(row))
aligned_rows.append(aligned)
if not headers:
headers = [f"Column {i+1}" for i in range(max_columns)]
return {
"headers": headers,
"rows": aligned_rows,
"caption": caption,
"summary": summary,
}
def flatten_nested_elements(self, node):
"""
Flatten nested elements in a HTML tree.
Args:
node (Tag): The root node of the HTML tree.
Returns:
Tag: The flattened HTML tree.
"""
if isinstance(node, NavigableString):
return node
if (
len(node.contents) == 1
and isinstance(node.contents[0], Tag)
and node.contents[0].name == node.name
):
return self.flatten_nested_elements(node.contents[0])
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
return node
def find_closest_parent_with_useful_text(self, tag, **kwargs):
"""
Find the closest parent with useful text.
Args:
tag (Tag): The starting tag to search from.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the processed element information.
Tag: The closest parent with useful text, or None if not found.
"""
media = {"images": [], "videos": [], "audios": [], "tables": []}
internal_links_dict = {}
external_links_dict = {}
self._process_element(
url, element, media, internal_links_dict, external_links_dict, **kwargs
image_description_min_word_threshold = kwargs.get(
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
)
return {
"media": media,
"internal_links_dict": internal_links_dict,
"external_links_dict": external_links_dict,
}
current_tag = tag
while current_tag:
current_tag = current_tag.parent
# Get the text content of the parent tag
if current_tag:
text_content = current_tag.get_text(separator=" ", strip=True)
# Check if the text content has at least word_count_threshold
if len(text_content.split()) >= image_description_min_word_threshold:
return text_content
return None
def remove_unwanted_attributes(self, element: lhtml.HtmlElement, important_attrs: List[str], keep_data_attributes: bool = False):
def remove_unwanted_attributes(
self, element, important_attrs, keep_data_attributes=False
):
"""
Remove unwanted attributes from an HTML element.
Args:
element (lhtml.HtmlElement): The HTML element to remove attributes from.
important_attrs (List[str]): List of important attributes to keep.
element (Tag): The HTML element to remove attributes from.
important_attrs (list): List of important attributes to keep.
keep_data_attributes (bool): Whether to keep data attributes.
Returns:
None
"""
attrs_to_remove = []
for attr in element.attrib:
for attr in element.attrs:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith("data-"):
@@ -241,7 +413,622 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
attrs_to_remove.append(attr)
for attr in attrs_to_remove:
del element.attrib[attr]
del element[attr]
def process_image(self, img, url, index, total_images, **kwargs):
"""
Process an image element.
How it works:
1. Check if the image has valid display and inside undesired html elements.
2. Score an image for it's usefulness.
3. Extract image file metadata to extract size and extension.
4. Generate a dictionary with the processed image information.
5. Return the processed image information.
Args:
img (Tag): The image element to process.
url (str): The URL of the page containing the image.
index (int): The index of the image in the list of images.
total_images (int): The total number of images in the list.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the processed image information.
"""
# parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
# if ' ' in u else None}
# for u in [f"http{p}" for p in s.split("http") if p]]
# Constants for checks
classes_to_check = frozenset(["button", "icon", "logo"])
tags_to_check = frozenset(["button", "input"])
image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
# Pre-fetch commonly used attributes
style = img.get("style", "")
alt = img.get("alt", "")
src = img.get("src", "")
data_src = img.get("data-src", "")
srcset = img.get("srcset", "")
data_srcset = img.get("data-srcset", "")
width = img.get("width")
height = img.get("height")
parent = img.parent
parent_classes = parent.get("class", [])
# Quick validation checks
if (
"display:none" in style
or parent.name in tags_to_check
or any(c in cls for c in parent_classes for cls in classes_to_check)
or any(c in src for c in classes_to_check)
or any(c in alt for c in classes_to_check)
):
return None
# Quick score calculation
score = 0
if width and width.isdigit():
width_val = int(width)
score += 1 if width_val > 150 else 0
if height and height.isdigit():
height_val = int(height)
score += 1 if height_val > 150 else 0
if alt:
score += 1
score += index / total_images < 0.5
# image_format = ''
# if "data:image/" in src:
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0]
# else:
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0]
# if image_format in ('jpg', 'png', 'webp', 'avif'):
# score += 1
# Check for image format in all possible sources
def has_image_format(url):
return any(fmt in url.lower() for fmt in image_formats)
# Score for having proper image sources
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]):
score += 1
if srcset or data_srcset:
score += 1
if img.find_parent("picture"):
score += 1
# Detect format from any available source
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
break
if score <= kwargs.get("image_score_threshold", IMAGE_SCORE_THRESHOLD):
return None
# Use set for deduplication
unique_urls = set()
image_variants = []
# Generate a unique group ID for this set of variants
group_id = index
# Base image info template
base_info = {
"alt": alt,
"desc": self.find_closest_parent_with_useful_text(img, **kwargs),
"score": score,
"type": "image",
"group_id": group_id, # Group ID for this set of variants
"format": detected_format,
}
# Inline function for adding variants
def add_variant(src, width=None):
if src and not src.startswith("data:") and src not in unique_urls:
unique_urls.add(src)
image_variants.append({**base_info, "src": src, "width": width})
# Process all sources
add_variant(src)
add_variant(data_src)
# Handle srcset and data-srcset in one pass
for attr in ("srcset", "data-srcset"):
if value := img.get(attr):
for source in parse_srcset(value):
add_variant(source["url"], source["width"])
# Quick picture element check
if picture := img.find_parent("picture"):
for source in picture.find_all("source"):
if srcset := source.get("srcset"):
for src in parse_srcset(srcset):
add_variant(src["url"], src["width"])
# Framework-specific attributes in one pass
for attr, value in img.attrs.items():
if (
attr.startswith("data-")
and ("src" in attr or "srcset" in attr)
and "http" in value
):
add_variant(value)
return image_variants if image_variants else None
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
"""
Process an HTML element.
How it works:
1. Check if the element is an image, video, or audio.
2. Extract the element's attributes and content.
3. Process the element based on its type.
4. Return the processed element information.
Args:
url (str): The URL of the page containing the element.
element (Tag): The HTML element to process.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the processed element information.
"""
media = {"images": [], "videos": [], "audios": [], "tables": []}
internal_links_dict = {}
external_links_dict = {}
self._process_element(
url, element, media, internal_links_dict, external_links_dict, **kwargs
)
return {
"media": media,
"internal_links_dict": internal_links_dict,
"external_links_dict": external_links_dict,
}
def _process_element(
self,
url,
element: PageElement,
media: Dict[str, Any],
internal_links_dict: Dict[str, Any],
external_links_dict: Dict[str, Any],
**kwargs,
) -> bool:
"""
Process an HTML element.
"""
try:
if isinstance(element, NavigableString):
if isinstance(element, Comment):
element.extract()
return False
# if element.name == 'img':
# process_image(element, url, 0, 1)
# return True
base_domain = kwargs.get("base_domain", get_base_domain(url))
if element.name in ["script", "style", "link", "meta", "noscript"]:
element.decompose()
return False
keep_element = False
# Special case for table elements - always preserve structure
if element.name in ["tr", "td", "th"]:
keep_element = True
exclude_domains = kwargs.get("exclude_domains", [])
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS))
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
# exclude_social_media_domains = list(set(exclude_social_media_domains))
try:
if element.name == "a" and element.get("href"):
href = element.get("href", "").strip()
if not href: # Skip empty hrefs
return False
# url_base = url.split("/")[2]
# Normalize the URL
try:
normalized_href = normalize_url(href, url)
except ValueError:
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}")
return False
link_data = {
"href": normalized_href,
"text": element.get_text().strip(),
"title": element.get("title", "").strip(),
"base_domain": base_domain,
}
is_external = is_external_url(normalized_href, base_domain)
keep_element = True
# Handle external link exclusions
if is_external:
link_base_domain = get_base_domain(normalized_href)
link_data["base_domain"] = link_base_domain
if kwargs.get("exclude_external_links", False):
element.decompose()
return False
# elif kwargs.get('exclude_social_media_links', False):
# if link_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains):
# element.decompose()
# return False
elif exclude_domains:
if link_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
if is_external:
if normalized_href not in external_links_dict:
external_links_dict[normalized_href] = link_data
else:
if kwargs.get("exclude_internal_links", False):
element.decompose()
return False
if normalized_href not in internal_links_dict:
internal_links_dict[normalized_href] = link_data
except Exception as e:
raise Exception(f"Error processing links: {str(e)}")
try:
if element.name == "img":
potential_sources = [
"src",
"data-src",
"srcset" "data-lazy-src",
"data-original",
]
src = element.get("src", "")
while not src and potential_sources:
src = element.get(potential_sources.pop(0), "")
if not src:
element.decompose()
return False
# If it is srcset pick up the first image
if "srcset" in element.attrs:
src = element.attrs["srcset"].split(",")[0].split(" ")[0]
# If image src is internal, then skip
if not is_external_url(src, base_domain):
return True
image_src_base_domain = get_base_domain(src)
# Check flag if we should remove external images
if kwargs.get("exclude_external_images", False):
element.decompose()
return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if url_base not in src_url_base:
# element.decompose()
# return False
# if kwargs.get('exclude_social_media_links', False):
# if image_src_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if any(domain in src for domain in exclude_social_media_domains):
# element.decompose()
# return False
# Handle exclude domains
if exclude_domains:
if image_src_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in src for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
return True # Always keep image elements
except Exception:
raise "Error processing images"
# Check if flag to remove all forms is set
if kwargs.get("remove_forms", False) and element.name == "form":
element.decompose()
return False
if element.name in ["video", "audio"]:
media[f"{element.name}s"].append(
{
"src": element.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
source_tags = element.find_all("source")
for source_tag in source_tags:
media[f"{element.name}s"].append(
{
"src": source_tag.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
return True # Always keep video and audio elements
if element.name in ONLY_TEXT_ELIGIBLE_TAGS:
if kwargs.get("only_text", False):
element.replace_with(element.get_text())
try:
self.remove_unwanted_attributes(
element, IMPORTANT_ATTRS + kwargs.get("keep_attrs", []) , kwargs.get("keep_data_attributes", False)
)
except Exception as e:
# print('Error removing unwanted attributes:', str(e))
self._log(
"error",
message="Error removing unwanted attributes: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
# Process children
for child in list(element.children):
if isinstance(child, NavigableString) and not isinstance(
child, Comment
):
if len(child.strip()) > 0:
keep_element = True
else:
if self._process_element(
url,
child,
media,
internal_links_dict,
external_links_dict,
**kwargs,
):
keep_element = True
# Check word count
word_count_threshold = kwargs.get(
"word_count_threshold", MIN_WORD_THRESHOLD
)
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
if not keep_element:
element.decompose()
return keep_element
except Exception as e:
# print('Error processing element:', str(e))
self._log(
"error",
message="Error processing element: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
return False
def _scrap(
self,
url: str,
html: str,
word_count_threshold: int = MIN_WORD_THRESHOLD,
css_selector: str = None,
target_elements: List[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Extract content from HTML using BeautifulSoup.
Args:
url (str): The URL of the page to scrape.
html (str): The HTML content of the page to scrape.
word_count_threshold (int): The minimum word count threshold for content extraction.
css_selector (str): The CSS selector to use for content extraction.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the extracted content.
"""
success = True
if not html:
return None
parser_type = kwargs.get("parser", "lxml")
soup = BeautifulSoup(html, parser_type)
body = soup.body
if body is None:
raise Exception("'<body>' tag is not found in fetched html. Consider adding wait_for=\"css:body\" to wait for body tag to be loaded into DOM.")
base_domain = get_base_domain(url)
# Early removal of all images if exclude_all_images is set
# This happens before any processing to minimize memory usage
if kwargs.get("exclude_all_images", False):
for img in body.find_all('img'):
img.decompose()
try:
meta = extract_metadata("", soup)
except Exception as e:
self._log(
"error",
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
meta = {}
# Handle tag-based removal first - faster than CSS selection
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if excluded_tags:
for element in body.find_all(lambda tag: tag.name in excluded_tags):
element.extract()
# Handle CSS selector-based removal
excluded_selector = kwargs.get("excluded_selector", "")
if excluded_selector:
is_single_selector = (
"," not in excluded_selector and " " not in excluded_selector
)
if is_single_selector:
while element := body.select_one(excluded_selector):
element.extract()
else:
for element in body.select(excluded_selector):
element.extract()
content_element = None
if target_elements:
try:
for_content_targeted_element = []
for target_element in target_elements:
for_content_targeted_element.extend(body.select(target_element))
content_element = soup.new_tag("div")
for el in for_content_targeted_element:
content_element.append(copy.deepcopy(el))
except Exception as e:
self._log("error", f"Error with target element detection: {str(e)}", "SCRAPE")
return None
else:
content_element = body
kwargs["exclude_social_media_domains"] = set(
kwargs.get("exclude_social_media_domains", []) + SOCIAL_MEDIA_DOMAINS
)
kwargs["exclude_domains"] = set(kwargs.get("exclude_domains", []))
if kwargs.get("exclude_social_media_links", False):
kwargs["exclude_domains"] = kwargs["exclude_domains"].union(
kwargs["exclude_social_media_domains"]
)
result_obj = self.process_element(
url,
body,
word_count_threshold=word_count_threshold,
base_domain=base_domain,
**kwargs,
)
links = {"internal": [], "external": []}
media = result_obj["media"]
internal_links_dict = result_obj["internal_links_dict"]
external_links_dict = result_obj["external_links_dict"]
# Update the links dictionary with unique links
links["internal"] = list(internal_links_dict.values())
links["external"] = list(external_links_dict.values())
# # Process images using ThreadPoolExecutor
imgs = body.find_all("img")
media["images"] = [
img
for result in (
self.process_image(img, url, i, len(imgs), **kwargs)
for i, img in enumerate(imgs)
)
if result is not None
for img in result
]
# Process tables if not excluded
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if 'table' not in excluded_tags:
tables = body.find_all('table')
for table in tables:
if self.is_data_table(table, **kwargs):
table_data = self.extract_table_data(table)
media["tables"].append(table_data)
body = self.flatten_nested_elements(body)
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
for img in imgs:
src = img.get("src", "")
if base64_pattern.match(src):
# Replace base64 data with empty string
img["src"] = base64_pattern.sub("", src)
str_body = ""
try:
str_body = content_element.encode_contents().decode("utf-8")
except Exception:
# Reset body to the original HTML
success = False
body = BeautifulSoup(html, "html.parser")
# Create a new div with a special ID
error_div = body.new_tag("div", id="crawl4ai_error_message")
error_div.string = """
Crawl4AI Error: This page is not fully supported.
Possible reasons:
1. The page may have restrictions that prevent crawling.
2. The page might not be fully loaded.
Suggestions:
- Try calling the crawl function with these parameters:
magic=True,
- Set headless=False to visualize what's happening on the page.
If the issue persists, please check the page's structure and any potential anti-crawling measures.
"""
# Append the error div to the body
body.append(error_div)
str_body = body.encode_contents().decode("utf-8")
print(
"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details."
)
self._log(
"error",
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
tag="SCRAPE",
)
cleaned_html = str_body.replace("\n\n", "\n").replace(" ", " ")
return {
"cleaned_html": cleaned_html,
"success": success,
"media": media,
"links": links,
"metadata": meta,
}
class LXMLWebScrapingStrategy(WebScrapingStrategy):
def __init__(self, logger=None):
super().__init__(logger)
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
def _process_element(
self,
@@ -403,7 +1190,7 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
return None
parent = img.getparent()
if parent.tag in self.tags_to_check:
if parent.tag in ["button", "input"]:
return None
parent_classes = parent.get("class", "").split()
@@ -413,8 +1200,8 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
return None
# If src is in class or alt, likely an icon
if (src and any(c in src for c in self.classes_to_check)) or (
alt and any(c in alt for c in self.classes_to_check)
if (src and any(c in src for c in ["button", "icon", "logo"])) or (
alt and any(c in alt for c in ["button", "icon", "logo"])
):
return None
@@ -429,10 +1216,11 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
score += index / total_images < 0.5
# Check formats in all possible sources
image_formats = {"jpg", "jpeg", "png", "webp", "avif", "gif"}
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in self.image_formats if fmt in url.lower()]
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
score += 1
@@ -696,13 +1484,6 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
success = True
try:
# Extract metadata FIRST from the original HTML to avoid issues with modified content.
try:
meta = extract_metadata_using_lxml(html, None) # Pass the original HTML
except Exception as e:
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
meta = {}
doc = lhtml.document_fromstring(html)
# Match BeautifulSoup's behavior of using body or full doc
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
@@ -743,14 +1524,14 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
"error", f"Error with excluded CSS selector: {str(e)}", "SCRAPE"
)
# # Extract metadata before any content filtering
# try:
# meta = extract_metadata_using_lxml(
# "", doc
# ) # Using same function as BeautifulSoup version
# except Exception as e:
# self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
# meta = {}
# Extract metadata before any content filtering
try:
meta = extract_metadata_using_lxml(
"", doc
) # Using same function as BeautifulSoup version
except Exception as e:
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
meta = {}
content_element = None
if target_elements:
@@ -830,9 +1611,7 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
# Remove unneeded attributes
self.remove_unwanted_attributes_fast(
body,
important_attrs=IMPORTANT_ATTRS + kwargs.get("keep_attrs", []),
keep_data_attributes=kwargs.get("keep_data_attributes", False)
body, keep_data_attributes=kwargs.get("keep_data_attributes", False)
)
# Generate output HTML

View File

@@ -3,6 +3,7 @@ import inspect
from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import asyncio
import time
from enum import IntFlag, auto
@@ -19,7 +20,7 @@ from .utils import * # noqa: F403
from .utils import (
sanitize_html,
escape_json_string,
perform_completion_with_backoff,
aperform_completion_with_backoff,
extract_xml_data,
split_and_parse_json_objects,
sanitize_input_encode,
@@ -66,7 +67,7 @@ class ExtractionStrategy(ABC):
self.verbose = kwargs.get("verbose", False)
@abstractmethod
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML.
@@ -76,7 +77,7 @@ class ExtractionStrategy(ABC):
"""
pass
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Process sections of text in parallel by default.
@@ -85,13 +86,13 @@ class ExtractionStrategy(ABC):
:return: A list of processed JSON blocks.
"""
extracted_content = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.extract, url, section, **kwargs)
for section in sections
]
for future in as_completed(futures):
extracted_content.extend(future.result())
tasks = [
asyncio.create_task(self.extract(url, section, **kwargs))
for section in sections
]
results = await asyncio.gather(*tasks)
for result in results:
extracted_content.extend(result)
return extracted_content
@@ -100,19 +101,18 @@ class NoExtractionStrategy(ExtractionStrategy):
A strategy that does not extract any meaningful content from the HTML. It simply returns the entire HTML as a single block.
"""
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML.
"""
return [{"index": 0, "content": html}]
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
return [
{"index": i, "tags": [], "content": section}
for i, section in enumerate(sections)
]
#######################################################
# Strategies using clustering for text data extraction #
#######################################################
@@ -386,7 +386,7 @@ class CosineStrategy(ExtractionStrategy):
return filtered_clusters
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract clusters from HTML content using hierarchical clustering.
@@ -458,7 +458,7 @@ class CosineStrategy(ExtractionStrategy):
return cluster_list
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Process sections using hierarchical clustering.
@@ -584,7 +584,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
super().__setattr__(name, value)
def extract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
async def extract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML using an LLM.
@@ -628,7 +628,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
try:
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
@@ -723,7 +723,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
return sections
def run(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
"""
Process sections sequentially with a delay for rate limiting issues, specifically for LLMExtractionStrategy.
@@ -748,35 +748,11 @@ class LLMExtractionStrategy(ExtractionStrategy):
extracted_content.extend(
extract_func(ix, sanitize_input_encode(section))
)
time.sleep(0.5) # 500 ms delay between each processing
await asyncio.sleep(0.5) # 500 ms delay between each processing
else:
# Parallel processing using ThreadPoolExecutor
# extract_func = partial(self.extract, url)
# for ix, section in enumerate(merged_sections):
# extracted_content.append(extract_func(ix, section))
with ThreadPoolExecutor(max_workers=4) as executor:
extract_func = partial(self.extract, url)
futures = [
executor.submit(extract_func, ix, sanitize_input_encode(section))
for ix, section in enumerate(merged_sections)
]
for future in as_completed(futures):
try:
extracted_content.extend(future.result())
except Exception as e:
if self.verbose:
print(f"Error in thread execution: {e}")
# Add error information to extracted_content
extracted_content.append(
{
"index": 0,
"error": True,
"tags": ["error"],
"content": str(e),
}
)
extract_func = partial(self.extract, url)
extracted_content = await asyncio.gather(*[extract_func(ix, sanitize_input_encode(section)) for ix, section in enumerate(merged_sections)])
return extracted_content
@@ -797,7 +773,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
f"{i:<10} {usage.completion_tokens:>12,} {usage.prompt_tokens:>12,} {usage.total_tokens:>12,}"
)
#######################################################
# New extraction strategies for JSON-based extraction #
#######################################################
@@ -846,7 +821,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
self.schema = schema
self.verbose = kwargs.get("verbose", False)
def extract(
async def extract(
self, url: str, html_content: str, *q, **kwargs
) -> List[Dict[str, Any]]:
"""
@@ -1044,7 +1019,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
print(f"Error computing field {field['name']}: {str(e)}")
return field.get("default")
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Run the extraction strategy on a combined HTML content.
@@ -1063,7 +1038,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
"""
combined_html = self.DEL.join(sections)
return self.extract(url, combined_html, **kwargs)
return await self.extract(url, combined_html, **kwargs)
@abstractmethod
def _get_element_text(self, element) -> str:
@@ -1086,7 +1061,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
}
@staticmethod
def generate_schema(
async def generate_schema(
html: str,
schema_type: str = "CSS", # or XPATH
query: str = None,
@@ -1112,7 +1087,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
dict: Generated schema following the JsonElementExtractionStrategy format
"""
from .prompts import JSON_SCHEMA_BUILDER
from .utils import perform_completion_with_backoff
from .utils import aperform_completion_with_backoff
for name, message in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals()[name] is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {message}")
@@ -1179,7 +1154,7 @@ In this scenario, use your best judgment to generate the schema. You need to exa
try:
# Call LLM with backoff handling
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_message["content"], user_message["content"]]),
json_response = True,
@@ -1858,7 +1833,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
# ------------------------------------------------------------------ #
# Extraction
# ------------------------------------------------------------------ #
def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
async def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
# text = self._plain_text(html)
out: List[Dict[str, Any]] = []
@@ -1889,7 +1864,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
# LLM-assisted one-off pattern builder
# ------------------------------------------------------------------ #
@staticmethod
def generate_pattern(
async def generate_pattern(
label: str,
html: str,
*,
@@ -1946,7 +1921,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
user_msg = "\n\n".join(user_parts)
# ── LLM call (with retry/backoff)
resp = perform_completion_with_backoff(
resp = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_msg, user_msg]),
json_response=True,

View File

@@ -19,7 +19,7 @@ LLMConfig = Union['LLMConfigType']
# Content scraping types
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
# WebScrapingStrategy = Union['WebScrapingStrategyType']
WebScrapingStrategy = Union['WebScrapingStrategyType']
LXMLWebScrapingStrategy = Union['LXMLWebScrapingStrategyType']
# Proxy types
@@ -106,7 +106,7 @@ if TYPE_CHECKING:
# Content scraping imports
from .content_scraping_strategy import (
ContentScrapingStrategy as ContentScrapingStrategyType,
# WebScrapingStrategy as WebScrapingStrategyType,
WebScrapingStrategy as WebScrapingStrategyType,
LXMLWebScrapingStrategy as LXMLWebScrapingStrategyType,
)

View File

@@ -1487,29 +1487,8 @@ def extract_metadata_using_lxml(html, doc=None):
head = head[0]
# Title - using XPath
# title = head.xpath(".//title/text()")
# metadata["title"] = title[0].strip() if title else None
# === Title Extraction - New Approach ===
# Attempt to extract <title> using XPath
title = head.xpath(".//title/text()")
title = title[0] if title else None
# Fallback: Use .find() in case XPath fails due to malformed HTML
if not title:
title_el = doc.find(".//title")
title = title_el.text if title_el is not None else None
# Final fallback: Use OpenGraph or Twitter title if <title> is missing or empty
if not title:
title_candidates = (
doc.xpath("//meta[@property='og:title']/@content") or
doc.xpath("//meta[@name='twitter:title']/@content")
)
title = title_candidates[0] if title_candidates else None
# Strip and assign title
metadata["title"] = title.strip() if title else None
metadata["title"] = title[0].strip() if title else None
# Meta description - using XPath with multiple attribute conditions
description = head.xpath('.//meta[@name="description"]/@content')
@@ -1693,7 +1672,7 @@ def extract_xml_data(tags, string):
return data
def perform_completion_with_backoff(
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
@@ -1721,7 +1700,7 @@ def perform_completion_with_backoff(
dict: The API response or an error message after all retries.
"""
from litellm import completion
from litellm import acompletion
from litellm.exceptions import RateLimitError
max_attempts = 3
@@ -1736,7 +1715,7 @@ def perform_completion_with_backoff(
for attempt in range(max_attempts):
try:
response = completion(
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
@@ -1775,7 +1754,7 @@ def perform_completion_with_backoff(
# ]
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
async def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
"""
Extract content blocks from website HTML using an AI provider.
@@ -1809,7 +1788,7 @@ def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_ur
"{" + variable + "}", variable_values[variable]
)
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider, prompt_with_variables, api_token, base_url=base_url
)

View File

@@ -24,7 +24,7 @@ from crawl4ai import (
RateLimiter,
LLMConfig
)
from crawl4ai.utils import perform_completion_with_backoff
from crawl4ai.utils import aperform_completion_with_backoff
from crawl4ai.content_filter_strategy import (
PruningContentFilter,
BM25ContentFilter,
@@ -88,7 +88,7 @@ async def handle_llm_qa(
Answer:"""
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=config["llm"]["provider"],
prompt_with_variables=prompt,
api_token=os.environ.get(config["llm"].get("api_key_env", ""))

View File

@@ -3553,7 +3553,7 @@ from .utils import * # noqa: F403
from .utils import (
sanitize_html,
escape_json_string,
perform_completion_with_backoff,
aperform_completion_with_backoff,
extract_xml_data,
split_and_parse_json_objects,
sanitize_input_encode,
@@ -4162,7 +4162,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
try:
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
@@ -4646,7 +4646,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
dict: Generated schema following the JsonElementExtractionStrategy format
"""
from .prompts import JSON_SCHEMA_BUILDER
from .utils import perform_completion_with_backoff
from .utils import aperform_completion_with_backoff
for name, message in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals()[name] is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {message}")
@@ -4709,7 +4709,7 @@ In this scenario, use your best judgment to generate the schema. You need to exa
try:
# Call LLM with backoff handling
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_message["content"], user_message["content"]]),
json_response = True,
@@ -5597,7 +5597,7 @@ from bs4 import NavigableString, Comment
from .utils import (
clean_tokens,
perform_completion_with_backoff,
aperform_completion_with_backoff,
escape_json_string,
sanitize_html,
get_home_folder,
@@ -6556,7 +6556,7 @@ class LLMContentFilter(RelevantContentFilter):
tag="CHUNK",
params={"chunk_num": i + 1},
)
return perform_completion_with_backoff(
return await aperform_completion_with_backoff(
provider,
prompt,
api_token,

View File

@@ -1,11 +1,7 @@
# Crawl4AIProspectWizard stepbystep guide
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/10nRCwmfxPjVrRUHyJsYlX7BH5bvPoGpx?usp=sharing)
A threestage demo that goes from **LinkedIn scraping****LLM reasoning****graph visualisation**.
**Try it in Google Colab!** Click the badge above to run this demo in a cloud environment with zero setup required.
```
prospectwizard/
├─ c4ai_discover.py # Stage 1 scrape companies + people

View File

@@ -1,12 +1,6 @@
import time, re
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
import time
import os
import sys
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
import functools
from collections import defaultdict
@@ -63,7 +57,7 @@ methods_to_profile = [
# Apply decorators to both strategies
for strategy, name in [(LXMLWebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
for strategy, name in [(WebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
for method in methods_to_profile:
apply_decorators(strategy, method, name)
@@ -91,7 +85,7 @@ def generate_large_html(n_elements=1000):
def test_scraping():
# Initialize both scrapers
original_scraper = LXMLWebScrapingStrategy()
original_scraper = WebScrapingStrategy()
selected_scraper = LXMLWebScrapingStrategy()
# Generate test HTML

View File

@@ -12,10 +12,10 @@ parent_dir = os.path.dirname(
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
# from crawl4ai.content_scraping_strategy import (
# WebScrapingStrategy as WebScrapingStrategyCurrent,
# )
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy as WebScrapingStrategyCurrent,
)
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
@@ -32,8 +32,8 @@ class TestResult:
class StrategyTester:
def __init__(self):
self.new_scraper = LXMLWebScrapingStrategy()
self.current_scraper = LXMLWebScrapingStrategy()
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f:
self.WIKI_HTML = f.read()
self.results = {"new": [], "current": []}

View File

@@ -2,6 +2,7 @@ import json
import time
from bs4 import BeautifulSoup
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy,
LXMLWebScrapingStrategy,
)
from typing import Dict, List, Tuple
@@ -273,7 +274,7 @@ def get_test_scenarios():
that will be passed into scrap() for testing various features.
"""
TEST_SCENARIOS = {
"default": {},
# "default": {},
# "exclude_domains": {
# "exclude_domains": {"images.example.com", "ads.example.com"}
# },
@@ -608,26 +609,19 @@ class ScraperEquivalenceTester:
print("\n=== Testing complicated HTML with multiple parameter scenarios ===")
# Create the scrapers once (or you can re-create if needed)
# original = WebScrapingStrategy()
original = LXMLWebScrapingStrategy()
original = WebScrapingStrategy()
lxml = LXMLWebScrapingStrategy()
# Base URL for testing
url = "http://test.com"
url = "https://kidocode.com"
for scenario_name, params in get_test_scenarios().items():
print(f"\nScenario: {scenario_name}")
start = time.time()
orig_result = original.scrap(url, complicated_html, **params)
orig_result = original.scrap("http://test.com", complicated_html, **params)
orig_time = time.time() - start
orig_result = orig_result.model_dump()
start = time.time()
lxml_result = lxml.scrap(url, complicated_html, **params)
lxml_result = lxml.scrap("http://test.com", complicated_html, **params)
lxml_time = time.time() - start
lxml_result = lxml_result.model_dump()
diffs = {}
link_diff = self.deep_compare_links(