Compare commits

..

2 Commits

Author SHA1 Message Date
ntohidi
2b3b728dcd fix(metadata): improve title extraction with fallbacks for edge cases. REF #995
Some pages include a <title> tag in HTML but lxml fails to parse it due to unusual structure.
Added fallback logic using .find() and OpenGraph/Twitter meta tags to ensure reliable title extraction.
2025-05-28 10:17:50 +02:00
ntohidi
bfec5156ad Refactor content scraping strategies: comment out WebScrapingStrategy references and update to use LXMLWebScrapingStrategy across multiple files. Bring WebScrapingStrategy methods to LXMLWebScrapingStrategy 2025-05-27 17:32:45 +02:00
29 changed files with 1455 additions and 14951 deletions

View File

@@ -1,3 +0,0 @@
{
"enableAllProjectMcpServers": false
}

View File

@@ -5,42 +5,6 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **AsyncUrlSeeder**: High-performance URL discovery system for intelligent crawling at scale
- Discover URLs from sitemaps and Common Crawl index
- Extract and analyze page metadata without full crawling
- BM25 relevance scoring for query-based URL filtering
- Multi-domain parallel discovery with `many_urls()` method
- Automatic caching with TTL for discovered URLs
- Rate limiting and concurrent request management
- Live URL validation with HEAD requests
- JSON-LD and Open Graph metadata extraction
- **SeedingConfig**: Configuration class for URL seeding operations
- Support for multiple discovery sources (`sitemap`, `cc`, `sitemap+cc`)
- Pattern-based URL filtering with wildcards
- Configurable concurrency and rate limiting
- Query-based relevance scoring with BM25
- Score threshold filtering for quality control
- Comprehensive documentation for URL seeding feature
- Detailed comparison with deep crawling approaches
- Complete API reference with examples
- Integration guide with AsyncWebCrawler
- Performance benchmarks and best practices
- Example scripts demonstrating URL seeding:
- `url_seeder_demo.py`: Interactive Rich-based demonstration
- `url_seeder_quick_demo.py`: Screenshot-friendly examples
- Test suite for URL seeding with BM25 scoring
### Changed
- Updated `__init__.py` to export AsyncUrlSeeder and SeedingConfig
- Enhanced documentation with URL seeding integration examples
### Fixed
- Corrected examples to properly extract URLs from seeder results before passing to `arun_many()`
- Fixed logger color compatibility issue (changed `lightblack` to `bright_black`)
## [0.6.2] - 2025-05-02
### Added

View File

@@ -2,12 +2,11 @@
import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig
from .content_scraping_strategy import (
ContentScrapingStrategy,
WebScrapingStrategy,
# WebScrapingStrategy,
LXMLWebScrapingStrategy,
)
from .async_logger import (
@@ -66,8 +65,6 @@ from .deep_crawling import (
DFSDeepCrawlStrategy,
DeepCrawlDecorator,
)
# NEW: Import AsyncUrlSeeder
from .async_url_seeder import AsyncUrlSeeder
from .utils import (
start_colab_display_server,
@@ -81,10 +78,6 @@ __all__ = [
"BrowserProfiler",
"LLMConfig",
"GeolocationConfig",
# NEW: Add SeedingConfig
"SeedingConfig",
# NEW: Add AsyncUrlSeeder
"AsyncUrlSeeder",
"DeepCrawlStrategy",
"BFSDeepCrawlStrategy",
"BestFirstCrawlingStrategy",
@@ -107,7 +100,7 @@ __all__ = [
"CrawlerHub",
"CacheMode",
"ContentScrapingStrategy",
"WebScrapingStrategy",
# "WebScrapingStrategy",
"LXMLWebScrapingStrategy",
"BrowserConfig",
"CrawlerRunConfig",
@@ -167,4 +160,4 @@ __all__ = [
# Disable all Pydantic warnings
warnings.filterwarnings("ignore", module="pydantic")
# pydantic_warnings.filter_warnings()
# pydantic_warnings.filter_warnings()

View File

@@ -17,7 +17,7 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy
from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy
from .deep_crawling import DeepCrawlStrategy
from .cache_context import CacheMode
@@ -207,6 +207,7 @@ class GeolocationConfig:
config_dict.update(kwargs)
return GeolocationConfig.from_dict(config_dict)
class ProxyConfig:
def __init__(
self,
@@ -317,6 +318,8 @@ class ProxyConfig:
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)
class BrowserConfig:
"""
Configuration class for setting up a browser instance and its context in AsyncPlaywrightCrawlerStrategy.
@@ -594,6 +597,7 @@ class BrowserConfig:
return config
return BrowserConfig.from_kwargs(config)
class HTTPCrawlerConfig:
"""HTTP-specific crawler configuration"""
@@ -721,7 +725,7 @@ class CrawlerRunConfig():
parser_type (str): Type of parser to use for HTML parsing.
Default: "lxml".
scraping_strategy (ContentScrapingStrategy): Scraping strategy to use.
Default: WebScrapingStrategy.
Default: LXMLWebScrapingStrategy.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
@@ -975,7 +979,7 @@ class CrawlerRunConfig():
self.remove_forms = remove_forms
self.prettiify = prettiify
self.parser_type = parser_type
self.scraping_strategy = scraping_strategy or WebScrapingStrategy()
self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy()
self.proxy_config = proxy_config
self.proxy_rotation_strategy = proxy_rotation_strategy
@@ -1325,6 +1329,7 @@ class CrawlerRunConfig():
config_dict.update(kwargs)
return CrawlerRunConfig.from_kwargs(config_dict)
class LLMConfig:
def __init__(
self,
@@ -1409,53 +1414,4 @@ class LLMConfig:
config_dict.update(kwargs)
return LLMConfig.from_kwargs(config_dict)
class SeedingConfig:
"""
Configuration class for URL discovery and pre-validation via AsyncUrlSeeder.
"""
def __init__(
self,
source: str = "sitemap+cc", # Options: "sitemap", "cc", "sitemap+cc"
pattern: Optional[str] = "*", # URL pattern to filter discovered URLs (e.g., "*example.com/blog/*")
live_check: bool = False, # Whether to perform HEAD requests to verify URL liveness
extract_head: bool = False, # Whether to fetch and parse <head> section for metadata
max_urls: int = -1, # Maximum number of URLs to discover (default: -1 for no limit)
concurrency: int = 1000, # Maximum concurrent requests for live checks/head extraction
hits_per_sec: int = 5, # Rate limit in requests per second
force: bool = False, # If True, bypasses the AsyncUrlSeeder's internal .jsonl cache
base_directory: Optional[str] = None, # Base directory for UrlSeeder's cache files (.jsonl)
llm_config: Optional[LLMConfig] = None, # Forward LLM config for future use (e.g., relevance scoring)
verbose: Optional[bool] = None, # Override crawler's general verbose setting
query: Optional[str] = None, # Search query for relevance scoring
score_threshold: Optional[float] = None, # Minimum relevance score to include URL (0.0-1.0)
scoring_method: str = "bm25", # Scoring method: "bm25" (default), future: "semantic"
filter_nonsense_urls: bool = True, # Filter out utility URLs like robots.txt, sitemap.xml, etc.
):
self.source = source
self.pattern = pattern
self.live_check = live_check
self.extract_head = extract_head
self.max_urls = max_urls
self.concurrency = concurrency
self.hits_per_sec = hits_per_sec
self.force = force
self.base_directory = base_directory
self.llm_config = llm_config
self.verbose = verbose
self.query = query
self.score_threshold = score_threshold
self.scoring_method = scoring_method
self.filter_nonsense_urls = filter_nonsense_urls
# Add to_dict, from_kwargs, and clone methods for consistency
def to_dict(self) -> Dict[str, Any]:
return {k: v for k, v in self.__dict__.items() if k != 'llm_config' or v is not None}
@staticmethod
def from_kwargs(kwargs: Dict[str, Any]) -> 'SeedingConfig':
return SeedingConfig(**kwargs)
def clone(self, **kwargs: Any) -> 'SeedingConfig':
config_dict = self.to_dict()
config_dict.update(kwargs)
return SeedingConfig.from_kwargs(config_dict)

View File

@@ -29,7 +29,7 @@ class LogLevel(Enum):
class LogColor(str, Enum):
"""Enum for log colors."""
DEBUG = "bright_black"
DEBUG = "lightblack"
INFO = "cyan"
SUCCESS = "green"
WARNING = "yellow"

File diff suppressed because it is too large Load Diff

View File

@@ -35,10 +35,9 @@ from .markdown_generation_strategy import (
)
from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig, SeedingConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .async_url_seeder import AsyncUrlSeeder
from .utils import (
sanitize_input_encode,
@@ -164,8 +163,6 @@ class AsyncWebCrawler:
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlDecorator(self)
self.arun = self._deep_handler(self.arun)
self.url_seeder: Optional[AsyncUrlSeeder] = None
async def start(self):
"""
@@ -747,94 +744,3 @@ class AsyncWebCrawler:
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
async def aseed_urls(
self,
domain_or_domains: Union[str, List[str]],
config: Optional[SeedingConfig] = None,
**kwargs
) -> Union[List[str], Dict[str, List[Union[str, Dict[str, Any]]]]]:
"""
Discovers, filters, and optionally validates URLs for a given domain(s)
using sitemaps and Common Crawl archives.
Args:
domain_or_domains: A single domain string (e.g., "iana.org") or a list of domains.
config: A SeedingConfig object to control the seeding process.
Parameters passed directly via kwargs will override those in 'config'.
**kwargs: Additional parameters (e.g., `source`, `live_check`, `extract_head`,
`pattern`, `concurrency`, `hits_per_sec`, `force_refresh`, `verbose`)
that will be used to construct or update the SeedingConfig.
Returns:
If `extract_head` is False:
- For a single domain: `List[str]` of discovered URLs.
- For multiple domains: `Dict[str, List[str]]` mapping each domain to its URLs.
If `extract_head` is True:
- For a single domain: `List[Dict[str, Any]]` where each dict contains 'url'
and 'head_data' (parsed <head> metadata).
- For multiple domains: `Dict[str, List[Dict[str, Any]]]` mapping each domain
to a list of URL data dictionaries.
Raises:
ValueError: If `domain_or_domains` is not a string or a list of strings.
Exception: Any underlying exceptions from AsyncUrlSeeder or network operations.
Example:
>>> # Discover URLs from sitemap with live check for 'example.com'
>>> result = await crawler.aseed_urls("example.com", source="sitemap", live_check=True, hits_per_sec=10)
>>> # Discover URLs from Common Crawl, extract head data for 'example.com' and 'python.org'
>>> multi_domain_result = await crawler.aseed_urls(
>>> ["example.com", "python.org"],
>>> source="cc", extract_head=True, concurrency=200, hits_per_sec=50
>>> )
"""
# Initialize AsyncUrlSeeder here if it hasn't been already
if not self.url_seeder:
# Pass the crawler's base_directory for seeder's cache management
# Pass the crawler's logger for consistent logging
self.url_seeder = AsyncUrlSeeder(
base_directory=self.crawl4ai_folder,
logger=self.logger
)
# Merge config object with direct kwargs, giving kwargs precedence
seeding_config = config.clone(**kwargs) if config else SeedingConfig.from_kwargs(kwargs)
# Ensure base_directory is set for the seeder's cache
seeding_config.base_directory = seeding_config.base_directory or self.crawl4ai_folder
# Ensure the seeder uses the crawler's logger (if not already set)
if not self.url_seeder.logger:
self.url_seeder.logger = self.logger
# Pass verbose setting if explicitly provided in SeedingConfig or kwargs
if seeding_config.verbose is not None:
self.url_seeder.logger.verbose = seeding_config.verbose
else: # Default to crawler's verbose setting
self.url_seeder.logger.verbose = self.logger.verbose
if isinstance(domain_or_domains, str):
self.logger.info(
message="Starting URL seeding for domain: {domain}",
tag="SEED",
params={"domain": domain_or_domains}
)
return await self.url_seeder.urls(
domain_or_domains,
seeding_config
)
elif isinstance(domain_or_domains, (list, tuple)):
self.logger.info(
message="Starting URL seeding for {count} domains",
tag="SEED",
params={"count": len(domain_or_domains)}
)
# AsyncUrlSeeder.many_urls directly accepts a list of domains and individual params.
return await self.url_seeder.many_urls(
domain_or_domains,
seeding_config
)
else:
raise ValueError("`domain_or_domains` must be a string or a list of strings.")

View File

@@ -1073,8 +1073,7 @@ def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config:
crawler_cfg.markdown_generator = DefaultMarkdownGenerator(
content_filter = BM25ContentFilter(
user_query=filter_conf.get("query"),
bm25_threshold=filter_conf.get("threshold", 1.0),
use_stemming=filter_conf.get("use_stemming", True),
bm25_threshold=filter_conf.get("threshold", 1.0)
)
)
elif filter_conf["type"] == "pruning":

View File

@@ -405,7 +405,6 @@ class BM25ContentFilter(RelevantContentFilter):
user_query: str = None,
bm25_threshold: float = 1.0,
language: str = "english",
use_stemming: bool = True,
):
"""
Initializes the BM25ContentFilter class, if not provided, falls back to page metadata.
@@ -417,11 +416,9 @@ class BM25ContentFilter(RelevantContentFilter):
user_query (str): User query for filtering (optional).
bm25_threshold (float): BM25 threshold for filtering (default: 1.0).
language (str): Language for stemming (default: 'english').
use_stemming (bool): Whether to apply stemming (default: True).
"""
super().__init__(user_query=user_query)
self.bm25_threshold = bm25_threshold
self.use_stemming = use_stemming
self.priority_tags = {
"h1": 5.0,
"h2": 4.0,
@@ -435,7 +432,7 @@ class BM25ContentFilter(RelevantContentFilter):
"pre": 1.5,
"th": 1.5, # Table headers
}
self.stemmer = stemmer(language) if use_stemming else None
self.stemmer = stemmer(language)
def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]:
"""
@@ -482,19 +479,13 @@ class BM25ContentFilter(RelevantContentFilter):
# for _, chunk, _, _ in candidates]
# tokenized_query = [ps.stem(word) for word in query.lower().split()]
if self.use_stemming:
tokenized_corpus = [
[self.stemmer.stemWord(word) for word in chunk.lower().split()]
for _, chunk, _, _ in candidates
]
tokenized_query = [
self.stemmer.stemWord(word) for word in query.lower().split()
]
else:
tokenized_corpus = [
chunk.lower().split() for _, chunk, _, _ in candidates
]
tokenized_query = query.lower().split()
tokenized_corpus = [
[self.stemmer.stemWord(word) for word in chunk.lower().split()]
for _, chunk, _, _ in candidates
]
tokenized_query = [
self.stemmer.stemWord(word) for word in query.lower().split()
]
# tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())]
# for _, chunk, _, _ in candidates]

View File

@@ -2,7 +2,7 @@ import re
from itertools import chain
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from bs4 import BeautifulSoup
# from bs4 import BeautifulSoup
import asyncio
import requests
from .config import (
@@ -13,12 +13,12 @@ from .config import (
IMPORTANT_ATTRS,
SOCIAL_MEDIA_DOMAINS,
)
from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
# from bs4 import NavigableString, Comment
# from bs4 import PageElement, Tag
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
from .utils import (
extract_metadata,
# extract_metadata,
normalize_url,
is_external_url,
get_base_domain,
@@ -96,20 +96,16 @@ class ContentScrapingStrategy(ABC):
pass
class WebScrapingStrategy(ContentScrapingStrategy):
"""
Class for web content scraping. Perhaps the most important class.
How it works:
1. Extract content from HTML using BeautifulSoup.
2. Clean the extracted content using a content cleaning strategy.
3. Filter the cleaned content using a content filtering strategy.
4. Generate markdown content from the filtered content.
5. Return the markdown content.
"""
class LXMLWebScrapingStrategy(ContentScrapingStrategy):
def __init__(self, logger=None):
self.logger = logger
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
# Constants for image processing
self.classes_to_check = frozenset(["button", "icon", "logo"])
self.tags_to_check = frozenset(["button", "input"])
self.image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
def _log(self, level, message, tag="SCRAPE", **kwargs):
"""Helper method to safely use logger."""
@@ -130,7 +126,8 @@ class WebScrapingStrategy(ContentScrapingStrategy):
ScrapingResult: A structured result containing the scraped content.
"""
actual_url = kwargs.get("redirected_url", url)
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
raw_result = self._scrap(actual_url, html, **kwargs)
if raw_result is None:
return ScrapingResult(
cleaned_html="",
@@ -194,388 +191,15 @@ class WebScrapingStrategy(ContentScrapingStrategy):
Returns:
ScrapingResult: A structured result containing the scraped content.
"""
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
return await asyncio.to_thread(self.scrap, url, html, **kwargs)
def is_data_table(self, table: Tag, **kwargs) -> bool:
"""
Determine if a table element is a data table (not a layout table).
Args:
table (Tag): BeautifulSoup Tag representing a table element
**kwargs: Additional keyword arguments including table_score_threshold
Returns:
bool: True if the table is a data table, False otherwise
"""
score = 0
# Check for thead and tbody
has_thead = len(table.select('thead')) > 0
has_tbody = len(table.select('tbody')) > 0
if has_thead:
score += 2
if has_tbody:
score += 1
# Check for th elements
th_count = len(table.select('th'))
if th_count > 0:
score += 2
if has_thead or len(table.select('tr:first-child th')) > 0:
score += 1
# Check for nested tables
if len(table.select('table')) > 0:
score -= 3
# Role attribute check
role = table.get('role', '').lower()
if role in {'presentation', 'none'}:
score -= 3
# Column consistency
rows = table.select('tr')
if not rows:
return False
col_counts = [len(row.select('td, th')) for row in rows]
avg_cols = sum(col_counts) / len(col_counts)
variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts)
if variance < 1:
score += 2
# Caption and summary
if table.select('caption'):
score += 2
if table.has_attr('summary') and table['summary']:
score += 1
# Text density
total_text = sum(len(cell.get_text().strip()) for row in rows for cell in row.select('td, th'))
total_tags = sum(1 for _ in table.descendants if isinstance(_, Tag))
text_ratio = total_text / (total_tags + 1e-5)
if text_ratio > 20:
score += 3
elif text_ratio > 10:
score += 2
# Data attributes
data_attrs = sum(1 for attr in table.attrs if attr.startswith('data-'))
score += data_attrs * 0.5
# Size check
if avg_cols >= 2 and len(rows) >= 2:
score += 2
threshold = kwargs.get('table_score_threshold', 7)
return score >= threshold
def extract_table_data(self, table: Tag) -> dict:
"""
Extract structured data from a table element.
Args:
table (Tag): BeautifulSoup Tag representing a table element
Returns:
dict: Dictionary containing table data (headers, rows, caption, summary)
"""
caption_elem = table.select_one('caption')
caption = caption_elem.get_text().strip() if caption_elem else ""
summary = table.get('summary', '').strip()
# Extract headers with colspan handling
headers = []
thead_rows = table.select('thead tr')
if thead_rows:
header_cells = thead_rows[0].select('th')
for cell in header_cells:
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
else:
first_row = table.select('tr:first-child')
if first_row:
for cell in first_row[0].select('th, td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
# Extract rows with colspan handling
rows = []
all_rows = table.select('tr')
thead = table.select_one('thead')
tbody_rows = []
if thead:
thead_rows = thead.select('tr')
tbody_rows = [row for row in all_rows if row not in thead_rows]
else:
if all_rows and all_rows[0].select('th'):
tbody_rows = all_rows[1:]
else:
tbody_rows = all_rows
for row in tbody_rows:
# for row in table.select('tr:not(:has(ancestor::thead))'):
row_data = []
for cell in row.select('td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
row_data.extend([text] * colspan)
if row_data:
rows.append(row_data)
# Align rows with headers
max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0)
aligned_rows = []
for row in rows:
aligned = row[:max_columns] + [''] * (max_columns - len(row))
aligned_rows.append(aligned)
if not headers:
headers = [f"Column {i+1}" for i in range(max_columns)]
return {
"headers": headers,
"rows": aligned_rows,
"caption": caption,
"summary": summary,
}
def flatten_nested_elements(self, node):
"""
Flatten nested elements in a HTML tree.
Args:
node (Tag): The root node of the HTML tree.
Returns:
Tag: The flattened HTML tree.
"""
if isinstance(node, NavigableString):
return node
if (
len(node.contents) == 1
and isinstance(node.contents[0], Tag)
and node.contents[0].name == node.name
):
return self.flatten_nested_elements(node.contents[0])
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
return node
def find_closest_parent_with_useful_text(self, tag, **kwargs):
"""
Find the closest parent with useful text.
Args:
tag (Tag): The starting tag to search from.
**kwargs: Additional keyword arguments.
Returns:
Tag: The closest parent with useful text, or None if not found.
"""
image_description_min_word_threshold = kwargs.get(
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
)
current_tag = tag
while current_tag:
current_tag = current_tag.parent
# Get the text content of the parent tag
if current_tag:
text_content = current_tag.get_text(separator=" ", strip=True)
# Check if the text content has at least word_count_threshold
if len(text_content.split()) >= image_description_min_word_threshold:
return text_content
return None
def remove_unwanted_attributes(
self, element, important_attrs, keep_data_attributes=False
):
"""
Remove unwanted attributes from an HTML element.
Args:
element (Tag): The HTML element to remove attributes from.
important_attrs (list): List of important attributes to keep.
keep_data_attributes (bool): Whether to keep data attributes.
Returns:
None
"""
attrs_to_remove = []
for attr in element.attrs:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith("data-"):
attrs_to_remove.append(attr)
else:
attrs_to_remove.append(attr)
for attr in attrs_to_remove:
del element[attr]
def process_image(self, img, url, index, total_images, **kwargs):
"""
Process an image element.
How it works:
1. Check if the image has valid display and inside undesired html elements.
2. Score an image for it's usefulness.
3. Extract image file metadata to extract size and extension.
4. Generate a dictionary with the processed image information.
5. Return the processed image information.
Args:
img (Tag): The image element to process.
url (str): The URL of the page containing the image.
index (int): The index of the image in the list of images.
total_images (int): The total number of images in the list.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the processed image information.
"""
# parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
# if ' ' in u else None}
# for u in [f"http{p}" for p in s.split("http") if p]]
# Constants for checks
classes_to_check = frozenset(["button", "icon", "logo"])
tags_to_check = frozenset(["button", "input"])
image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
# Pre-fetch commonly used attributes
style = img.get("style", "")
alt = img.get("alt", "")
src = img.get("src", "")
data_src = img.get("data-src", "")
srcset = img.get("srcset", "")
data_srcset = img.get("data-srcset", "")
width = img.get("width")
height = img.get("height")
parent = img.parent
parent_classes = parent.get("class", [])
# Quick validation checks
if (
"display:none" in style
or parent.name in tags_to_check
or any(c in cls for c in parent_classes for cls in classes_to_check)
or any(c in src for c in classes_to_check)
or any(c in alt for c in classes_to_check)
):
return None
# Quick score calculation
score = 0
if width and width.isdigit():
width_val = int(width)
score += 1 if width_val > 150 else 0
if height and height.isdigit():
height_val = int(height)
score += 1 if height_val > 150 else 0
if alt:
score += 1
score += index / total_images < 0.5
# image_format = ''
# if "data:image/" in src:
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0]
# else:
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0]
# if image_format in ('jpg', 'png', 'webp', 'avif'):
# score += 1
# Check for image format in all possible sources
def has_image_format(url):
return any(fmt in url.lower() for fmt in image_formats)
# Score for having proper image sources
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]):
score += 1
if srcset or data_srcset:
score += 1
if img.find_parent("picture"):
score += 1
# Detect format from any available source
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
break
if score <= kwargs.get("image_score_threshold", IMAGE_SCORE_THRESHOLD):
return None
# Use set for deduplication
unique_urls = set()
image_variants = []
# Generate a unique group ID for this set of variants
group_id = index
# Base image info template
base_info = {
"alt": alt,
"desc": self.find_closest_parent_with_useful_text(img, **kwargs),
"score": score,
"type": "image",
"group_id": group_id, # Group ID for this set of variants
"format": detected_format,
}
# Inline function for adding variants
def add_variant(src, width=None):
if src and not src.startswith("data:") and src not in unique_urls:
unique_urls.add(src)
image_variants.append({**base_info, "src": src, "width": width})
# Process all sources
add_variant(src)
add_variant(data_src)
# Handle srcset and data-srcset in one pass
for attr in ("srcset", "data-srcset"):
if value := img.get(attr):
for source in parse_srcset(value):
add_variant(source["url"], source["width"])
# Quick picture element check
if picture := img.find_parent("picture"):
for source in picture.find_all("source"):
if srcset := source.get("srcset"):
for src in parse_srcset(srcset):
add_variant(src["url"], src["width"])
# Framework-specific attributes in one pass
for attr, value in img.attrs.items():
if (
attr.startswith("data-")
and ("src" in attr or "srcset" in attr)
and "http" in value
):
add_variant(value)
return image_variants if image_variants else None
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
def process_element(self, url: str, element: lhtml.HtmlElement, **kwargs) -> Dict[str, Any]:
"""
Process an HTML element.
How it works:
1. Check if the element is an image, video, or audio.
2. Extract the element's attributes and content.
3. Process the element based on its type.
4. Return the processed element information.
Args:
url (str): The URL of the page containing the element.
element (Tag): The HTML element to process.
element (lhtml.HtmlElement): The HTML element to process.
**kwargs: Additional keyword arguments.
Returns:
@@ -584,451 +208,40 @@ class WebScrapingStrategy(ContentScrapingStrategy):
media = {"images": [], "videos": [], "audios": [], "tables": []}
internal_links_dict = {}
external_links_dict = {}
self._process_element(
url, element, media, internal_links_dict, external_links_dict, **kwargs
)
return {
"media": media,
"internal_links_dict": internal_links_dict,
"external_links_dict": external_links_dict,
}
def _process_element(
self,
url,
element: PageElement,
media: Dict[str, Any],
internal_links_dict: Dict[str, Any],
external_links_dict: Dict[str, Any],
**kwargs,
) -> bool:
def remove_unwanted_attributes(self, element: lhtml.HtmlElement, important_attrs: List[str], keep_data_attributes: bool = False):
"""
Process an HTML element.
"""
try:
if isinstance(element, NavigableString):
if isinstance(element, Comment):
element.extract()
return False
# if element.name == 'img':
# process_image(element, url, 0, 1)
# return True
base_domain = kwargs.get("base_domain", get_base_domain(url))
if element.name in ["script", "style", "link", "meta", "noscript"]:
element.decompose()
return False
keep_element = False
# Special case for table elements - always preserve structure
if element.name in ["tr", "td", "th"]:
keep_element = True
exclude_domains = kwargs.get("exclude_domains", [])
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS))
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
# exclude_social_media_domains = list(set(exclude_social_media_domains))
try:
if element.name == "a" and element.get("href"):
href = element.get("href", "").strip()
if not href: # Skip empty hrefs
return False
# url_base = url.split("/")[2]
# Normalize the URL
try:
normalized_href = normalize_url(href, url)
except ValueError:
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}")
return False
link_data = {
"href": normalized_href,
"text": element.get_text().strip(),
"title": element.get("title", "").strip(),
"base_domain": base_domain,
}
is_external = is_external_url(normalized_href, base_domain)
keep_element = True
# Handle external link exclusions
if is_external:
link_base_domain = get_base_domain(normalized_href)
link_data["base_domain"] = link_base_domain
if kwargs.get("exclude_external_links", False):
element.decompose()
return False
# elif kwargs.get('exclude_social_media_links', False):
# if link_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains):
# element.decompose()
# return False
elif exclude_domains:
if link_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
if is_external:
if normalized_href not in external_links_dict:
external_links_dict[normalized_href] = link_data
else:
if kwargs.get("exclude_internal_links", False):
element.decompose()
return False
if normalized_href not in internal_links_dict:
internal_links_dict[normalized_href] = link_data
except Exception as e:
raise Exception(f"Error processing links: {str(e)}")
try:
if element.name == "img":
potential_sources = [
"src",
"data-src",
"srcset" "data-lazy-src",
"data-original",
]
src = element.get("src", "")
while not src and potential_sources:
src = element.get(potential_sources.pop(0), "")
if not src:
element.decompose()
return False
# If it is srcset pick up the first image
if "srcset" in element.attrs:
src = element.attrs["srcset"].split(",")[0].split(" ")[0]
# If image src is internal, then skip
if not is_external_url(src, base_domain):
return True
image_src_base_domain = get_base_domain(src)
# Check flag if we should remove external images
if kwargs.get("exclude_external_images", False):
element.decompose()
return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if url_base not in src_url_base:
# element.decompose()
# return False
# if kwargs.get('exclude_social_media_links', False):
# if image_src_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if any(domain in src for domain in exclude_social_media_domains):
# element.decompose()
# return False
# Handle exclude domains
if exclude_domains:
if image_src_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in src for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
return True # Always keep image elements
except Exception:
raise "Error processing images"
# Check if flag to remove all forms is set
if kwargs.get("remove_forms", False) and element.name == "form":
element.decompose()
return False
if element.name in ["video", "audio"]:
media[f"{element.name}s"].append(
{
"src": element.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
source_tags = element.find_all("source")
for source_tag in source_tags:
media[f"{element.name}s"].append(
{
"src": source_tag.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
return True # Always keep video and audio elements
if element.name in ONLY_TEXT_ELIGIBLE_TAGS:
if kwargs.get("only_text", False):
element.replace_with(element.get_text())
try:
self.remove_unwanted_attributes(
element, IMPORTANT_ATTRS + kwargs.get("keep_attrs", []) , kwargs.get("keep_data_attributes", False)
)
except Exception as e:
# print('Error removing unwanted attributes:', str(e))
self._log(
"error",
message="Error removing unwanted attributes: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
# Process children
for child in list(element.children):
if isinstance(child, NavigableString) and not isinstance(
child, Comment
):
if len(child.strip()) > 0:
keep_element = True
else:
if self._process_element(
url,
child,
media,
internal_links_dict,
external_links_dict,
**kwargs,
):
keep_element = True
# Check word count
word_count_threshold = kwargs.get(
"word_count_threshold", MIN_WORD_THRESHOLD
)
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
if not keep_element:
element.decompose()
return keep_element
except Exception as e:
# print('Error processing element:', str(e))
self._log(
"error",
message="Error processing element: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
return False
def _scrap(
self,
url: str,
html: str,
word_count_threshold: int = MIN_WORD_THRESHOLD,
css_selector: str = None,
target_elements: List[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Extract content from HTML using BeautifulSoup.
Remove unwanted attributes from an HTML element.
Args:
url (str): The URL of the page to scrape.
html (str): The HTML content of the page to scrape.
word_count_threshold (int): The minimum word count threshold for content extraction.
css_selector (str): The CSS selector to use for content extraction.
**kwargs: Additional keyword arguments.
element (lhtml.HtmlElement): The HTML element to remove attributes from.
important_attrs (List[str]): List of important attributes to keep.
keep_data_attributes (bool): Whether to keep data attributes.
Returns:
dict: A dictionary containing the extracted content.
None
"""
success = True
if not html:
return None
attrs_to_remove = []
for attr in element.attrib:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith("data-"):
attrs_to_remove.append(attr)
else:
attrs_to_remove.append(attr)
parser_type = kwargs.get("parser", "lxml")
soup = BeautifulSoup(html, parser_type)
body = soup.body
if body is None:
raise Exception("'<body>' tag is not found in fetched html. Consider adding wait_for=\"css:body\" to wait for body tag to be loaded into DOM.")
base_domain = get_base_domain(url)
# Early removal of all images if exclude_all_images is set
# This happens before any processing to minimize memory usage
if kwargs.get("exclude_all_images", False):
for img in body.find_all('img'):
img.decompose()
try:
meta = extract_metadata("", soup)
except Exception as e:
self._log(
"error",
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
meta = {}
# Handle tag-based removal first - faster than CSS selection
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if excluded_tags:
for element in body.find_all(lambda tag: tag.name in excluded_tags):
element.extract()
# Handle CSS selector-based removal
excluded_selector = kwargs.get("excluded_selector", "")
if excluded_selector:
is_single_selector = (
"," not in excluded_selector and " " not in excluded_selector
)
if is_single_selector:
while element := body.select_one(excluded_selector):
element.extract()
else:
for element in body.select(excluded_selector):
element.extract()
content_element = None
if target_elements:
try:
for_content_targeted_element = []
for target_element in target_elements:
for_content_targeted_element.extend(body.select(target_element))
content_element = soup.new_tag("div")
for el in for_content_targeted_element:
content_element.append(copy.deepcopy(el))
except Exception as e:
self._log("error", f"Error with target element detection: {str(e)}", "SCRAPE")
return None
else:
content_element = body
kwargs["exclude_social_media_domains"] = set(
kwargs.get("exclude_social_media_domains", []) + SOCIAL_MEDIA_DOMAINS
)
kwargs["exclude_domains"] = set(kwargs.get("exclude_domains", []))
if kwargs.get("exclude_social_media_links", False):
kwargs["exclude_domains"] = kwargs["exclude_domains"].union(
kwargs["exclude_social_media_domains"]
)
result_obj = self.process_element(
url,
body,
word_count_threshold=word_count_threshold,
base_domain=base_domain,
**kwargs,
)
links = {"internal": [], "external": []}
media = result_obj["media"]
internal_links_dict = result_obj["internal_links_dict"]
external_links_dict = result_obj["external_links_dict"]
# Update the links dictionary with unique links
links["internal"] = list(internal_links_dict.values())
links["external"] = list(external_links_dict.values())
# # Process images using ThreadPoolExecutor
imgs = body.find_all("img")
media["images"] = [
img
for result in (
self.process_image(img, url, i, len(imgs), **kwargs)
for i, img in enumerate(imgs)
)
if result is not None
for img in result
]
# Process tables if not excluded
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if 'table' not in excluded_tags:
tables = body.find_all('table')
for table in tables:
if self.is_data_table(table, **kwargs):
table_data = self.extract_table_data(table)
media["tables"].append(table_data)
body = self.flatten_nested_elements(body)
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
for img in imgs:
src = img.get("src", "")
if base64_pattern.match(src):
# Replace base64 data with empty string
img["src"] = base64_pattern.sub("", src)
str_body = ""
try:
str_body = content_element.encode_contents().decode("utf-8")
except Exception:
# Reset body to the original HTML
success = False
body = BeautifulSoup(html, "html.parser")
# Create a new div with a special ID
error_div = body.new_tag("div", id="crawl4ai_error_message")
error_div.string = """
Crawl4AI Error: This page is not fully supported.
Possible reasons:
1. The page may have restrictions that prevent crawling.
2. The page might not be fully loaded.
Suggestions:
- Try calling the crawl function with these parameters:
magic=True,
- Set headless=False to visualize what's happening on the page.
If the issue persists, please check the page's structure and any potential anti-crawling measures.
"""
# Append the error div to the body
body.append(error_div)
str_body = body.encode_contents().decode("utf-8")
print(
"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details."
)
self._log(
"error",
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
tag="SCRAPE",
)
cleaned_html = str_body.replace("\n\n", "\n").replace(" ", " ")
return {
"cleaned_html": cleaned_html,
"success": success,
"media": media,
"links": links,
"metadata": meta,
}
class LXMLWebScrapingStrategy(WebScrapingStrategy):
def __init__(self, logger=None):
super().__init__(logger)
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
for attr in attrs_to_remove:
del element.attrib[attr]
def _process_element(
self,
@@ -1190,7 +403,7 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
return None
parent = img.getparent()
if parent.tag in ["button", "input"]:
if parent.tag in self.tags_to_check:
return None
parent_classes = parent.get("class", "").split()
@@ -1200,8 +413,8 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
return None
# If src is in class or alt, likely an icon
if (src and any(c in src for c in ["button", "icon", "logo"])) or (
alt and any(c in alt for c in ["button", "icon", "logo"])
if (src and any(c in src for c in self.classes_to_check)) or (
alt and any(c in alt for c in self.classes_to_check)
):
return None
@@ -1216,11 +429,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
score += index / total_images < 0.5
# Check formats in all possible sources
image_formats = {"jpg", "jpeg", "png", "webp", "avif", "gif"}
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
format_matches = [fmt for fmt in self.image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
score += 1
@@ -1484,6 +696,13 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
success = True
try:
# Extract metadata FIRST from the original HTML to avoid issues with modified content.
try:
meta = extract_metadata_using_lxml(html, None) # Pass the original HTML
except Exception as e:
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
meta = {}
doc = lhtml.document_fromstring(html)
# Match BeautifulSoup's behavior of using body or full doc
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
@@ -1524,14 +743,14 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
"error", f"Error with excluded CSS selector: {str(e)}", "SCRAPE"
)
# Extract metadata before any content filtering
try:
meta = extract_metadata_using_lxml(
"", doc
) # Using same function as BeautifulSoup version
except Exception as e:
self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
meta = {}
# # Extract metadata before any content filtering
# try:
# meta = extract_metadata_using_lxml(
# "", doc
# ) # Using same function as BeautifulSoup version
# except Exception as e:
# self._log("error", f"Error extracting metadata: {str(e)}", "SCRAPE")
# meta = {}
content_element = None
if target_elements:
@@ -1611,7 +830,9 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
# Remove unneeded attributes
self.remove_unwanted_attributes_fast(
body, keep_data_attributes=kwargs.get("keep_data_attributes", False)
body,
important_attrs=IMPORTANT_ATTRS + kwargs.get("keep_attrs", []),
keep_data_attributes=kwargs.get("keep_data_attributes", False)
)
# Generate output HTML

View File

@@ -10,20 +10,16 @@ CacheMode = Union['CacheModeType']
CrawlResult = Union['CrawlResultType']
CrawlerHub = Union['CrawlerHubType']
BrowserProfiler = Union['BrowserProfilerType']
# NEW: Add AsyncUrlSeederType
AsyncUrlSeeder = Union['AsyncUrlSeederType']
# Configuration types
BrowserConfig = Union['BrowserConfigType']
CrawlerRunConfig = Union['CrawlerRunConfigType']
HTTPCrawlerConfig = Union['HTTPCrawlerConfigType']
LLMConfig = Union['LLMConfigType']
# NEW: Add SeedingConfigType
SeedingConfig = Union['SeedingConfigType']
# Content scraping types
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
WebScrapingStrategy = Union['WebScrapingStrategyType']
# WebScrapingStrategy = Union['WebScrapingStrategyType']
LXMLWebScrapingStrategy = Union['LXMLWebScrapingStrategyType']
# Proxy types
@@ -98,8 +94,6 @@ if TYPE_CHECKING:
from .models import CrawlResult as CrawlResultType
from .hub import CrawlerHub as CrawlerHubType
from .browser_profiler import BrowserProfiler as BrowserProfilerType
# NEW: Import AsyncUrlSeeder for type checking
from .async_url_seeder import AsyncUrlSeeder as AsyncUrlSeederType
# Configuration imports
from .async_configs import (
@@ -107,14 +101,12 @@ if TYPE_CHECKING:
CrawlerRunConfig as CrawlerRunConfigType,
HTTPCrawlerConfig as HTTPCrawlerConfigType,
LLMConfig as LLMConfigType,
# NEW: Import SeedingConfig for type checking
SeedingConfig as SeedingConfigType,
)
# Content scraping imports
from .content_scraping_strategy import (
ContentScrapingStrategy as ContentScrapingStrategyType,
WebScrapingStrategy as WebScrapingStrategyType,
# WebScrapingStrategy as WebScrapingStrategyType,
LXMLWebScrapingStrategy as LXMLWebScrapingStrategyType,
)
@@ -192,4 +184,4 @@ if TYPE_CHECKING:
def create_llm_config(*args, **kwargs) -> 'LLMConfigType':
from .async_configs import LLMConfig
return LLMConfig(*args, **kwargs)
return LLMConfig(*args, **kwargs)

View File

@@ -1487,8 +1487,29 @@ def extract_metadata_using_lxml(html, doc=None):
head = head[0]
# Title - using XPath
# title = head.xpath(".//title/text()")
# metadata["title"] = title[0].strip() if title else None
# === Title Extraction - New Approach ===
# Attempt to extract <title> using XPath
title = head.xpath(".//title/text()")
metadata["title"] = title[0].strip() if title else None
title = title[0] if title else None
# Fallback: Use .find() in case XPath fails due to malformed HTML
if not title:
title_el = doc.find(".//title")
title = title_el.text if title_el is not None else None
# Final fallback: Use OpenGraph or Twitter title if <title> is missing or empty
if not title:
title_candidates = (
doc.xpath("//meta[@property='og:title']/@content") or
doc.xpath("//meta[@name='twitter:title']/@content")
)
title = title_candidates[0] if title_candidates else None
# Strip and assign title
metadata["title"] = title.strip() if title else None
# Meta description - using XPath with multiple attribute conditions
description = head.xpath('.//meta[@name="description"]/@content')

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -447,7 +447,10 @@
dragNodes: true,
dragView: true,
zoomView: true,
zoomSpeed: 0.15 // Reduced from default 1.0
mouseWheel: {
speed: 0.15, // Reduced from default 1.0
smooth: true // Enable smooth zooming
}
},
nodes: {
font: {

View File

@@ -1,6 +1,12 @@
import time, re
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
import time
import os
import sys
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
import functools
from collections import defaultdict
@@ -57,7 +63,7 @@ methods_to_profile = [
# Apply decorators to both strategies
for strategy, name in [(WebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
for strategy, name in [(LXMLWebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
for method in methods_to_profile:
apply_decorators(strategy, method, name)
@@ -85,7 +91,7 @@ def generate_large_html(n_elements=1000):
def test_scraping():
# Initialize both scrapers
original_scraper = WebScrapingStrategy()
original_scraper = LXMLWebScrapingStrategy()
selected_scraper = LXMLWebScrapingStrategy()
# Generate test HTML

File diff suppressed because it is too large Load Diff

View File

@@ -1,807 +0,0 @@
"""
BBC Sport Research Assistant Pipeline
=====================================
This example demonstrates how URLSeeder helps create an efficient research pipeline:
1. Discover all available URLs without crawling
2. Filter and rank them based on relevance
3. Crawl only the most relevant content
4. Generate comprehensive research insights
Pipeline Steps:
1. Get user query
2. Optionally enhance query using LLM
3. Use URLSeeder to discover and rank URLs
4. Crawl top K URLs with BM25 filtering
5. Generate detailed response with citations
Requirements:
- pip install crawl4ai
- pip install litellm
- export GEMINI_API_KEY="your-api-key"
Usage:
- Run normally: python bbc_sport_research_assistant.py
- Run test mode: python bbc_sport_research_assistant.py test
Note: AsyncUrlSeeder now uses context manager for automatic cleanup.
"""
import asyncio
import json
import os
import hashlib
import pickle
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
# Rich for colored output
from rich.console import Console
from rich.text import Text
from rich.panel import Panel
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
# Crawl4AI imports
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
AsyncUrlSeeder,
SeedingConfig,
AsyncLogger
)
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# LiteLLM for AI communication
import litellm
# Initialize Rich console
console = Console()
# Get the current directory where this script is located
SCRIPT_DIR = Path(__file__).parent.resolve()
# Cache configuration - relative to script directory
CACHE_DIR = SCRIPT_DIR / "temp_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Testing limits
TESTING_MODE = True
MAX_URLS_DISCOVERY = 100 if TESTING_MODE else 1000
MAX_URLS_TO_CRAWL = 5 if TESTING_MODE else 10
def get_cache_key(prefix: str, *args) -> str:
"""Generate cache key from prefix and arguments"""
content = f"{prefix}:{'|'.join(str(arg) for arg in args)}"
return hashlib.md5(content.encode()).hexdigest()
def load_from_cache(cache_key: str) -> Optional[any]:
"""Load data from cache if exists"""
cache_path = CACHE_DIR / f"{cache_key}.pkl"
if cache_path.exists():
with open(cache_path, 'rb') as f:
return pickle.load(f)
return None
def save_to_cache(cache_key: str, data: any) -> None:
"""Save data to cache"""
cache_path = CACHE_DIR / f"{cache_key}.pkl"
with open(cache_path, 'wb') as f:
pickle.dump(data, f)
@dataclass
class ResearchConfig:
"""Configuration for research pipeline"""
# Core settings
domain: str = "www.bbc.com/sport"
max_urls_discovery: int = 100
max_urls_to_crawl: int = 10
top_k_urls: int = 10
# Scoring and filtering
score_threshold: float = 0.1
scoring_method: str = "bm25"
# Processing options
use_llm_enhancement: bool = True
extract_head_metadata: bool = True
live_check: bool = True
force_refresh: bool = False
# Crawler settings
max_concurrent_crawls: int = 5
timeout: int = 30000
headless: bool = True
# Output settings
save_json: bool = True
save_markdown: bool = True
output_dir: str = None # Will be set in __post_init__
# Development settings
test_mode: bool = False
interactive_mode: bool = False
verbose: bool = True
def __post_init__(self):
"""Adjust settings based on test mode"""
if self.test_mode:
self.max_urls_discovery = 50
self.max_urls_to_crawl = 3
self.top_k_urls = 5
# Set default output directory relative to script location
if self.output_dir is None:
self.output_dir = str(SCRIPT_DIR / "research_results")
@dataclass
class ResearchQuery:
"""Container for research query and metadata"""
original_query: str
enhanced_query: Optional[str] = None
search_patterns: List[str] = None
timestamp: str = None
@dataclass
class ResearchResult:
"""Container for research results"""
query: ResearchQuery
discovered_urls: List[Dict]
crawled_content: List[Dict]
synthesis: str
citations: List[Dict]
metadata: Dict
async def get_user_query() -> str:
"""
Get research query from user input
"""
query = input("\n🔍 Enter your research query: ")
return query.strip()
async def enhance_query_with_llm(query: str) -> ResearchQuery:
"""
Use LLM to enhance the research query:
- Extract key terms
- Generate search patterns
- Identify related topics
"""
# Check cache
cache_key = get_cache_key("enhanced_query", query)
cached_result = load_from_cache(cache_key)
if cached_result:
console.print("[dim cyan]📦 Using cached enhanced query[/dim cyan]")
return cached_result
try:
response = await litellm.acompletion(
model="gemini/gemini-2.5-flash-preview-04-17",
messages=[{
"role": "user",
"content": f"""Given this research query: "{query}"
Extract:
1. Key terms and concepts (as a list)
2. Related search terms
3. A more specific/enhanced version of the query
Return as JSON:
{{
"key_terms": ["term1", "term2"],
"related_terms": ["related1", "related2"],
"enhanced_query": "enhanced version of query"
}}"""
}],
# reasoning_effort="low",
temperature=0.3,
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
# Create search patterns
all_terms = data["key_terms"] + data["related_terms"]
patterns = [f"*{term.lower()}*" for term in all_terms]
result = ResearchQuery(
original_query=query,
enhanced_query=data["enhanced_query"],
search_patterns=patterns[:10], # Limit patterns
timestamp=datetime.now().isoformat()
)
# Cache the result
save_to_cache(cache_key, result)
return result
except Exception as e:
console.print(f"[yellow]⚠️ LLM enhancement failed: {e}[/yellow]")
# Fallback to simple tokenization
return ResearchQuery(
original_query=query,
enhanced_query=query,
search_patterns=tokenize_query_to_patterns(query),
timestamp=datetime.now().isoformat()
)
def tokenize_query_to_patterns(query: str) -> List[str]:
"""
Convert query into URL patterns for URLSeeder
Example: "AI startups funding" -> ["*ai*", "*startup*", "*funding*"]
"""
# Simple tokenization - split and create patterns
words = query.lower().split()
# Filter out common words
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'that'}
keywords = [w for w in words if w not in stop_words and len(w) > 2]
# Create patterns
patterns = [f"*{keyword}*" for keyword in keywords]
return patterns[:8] # Limit to 8 patterns
async def discover_urls(domain: str, query: str, config: ResearchConfig) -> List[Dict]:
"""
Use URLSeeder to discover and rank URLs:
1. Fetch all URLs from domain
2. Filter by patterns
3. Extract metadata (titles, descriptions)
4. Rank by BM25 relevance score
5. Return top K URLs
"""
# Check cache
cache_key = get_cache_key("discovered_urls", domain, query, config.top_k_urls)
cached_result = load_from_cache(cache_key)
if cached_result and not config.force_refresh:
console.print("[dim cyan]📦 Using cached URL discovery[/dim cyan]")
return cached_result
console.print(f"\n[cyan]🔍 Discovering URLs from {domain}...[/cyan]")
# Initialize URL seeder with context manager
async with AsyncUrlSeeder(logger=AsyncLogger(verbose=config.verbose)) as seeder:
# Configure seeding
seeding_config = SeedingConfig(
source="sitemap+cc", # Use both sitemap and Common Crawl
extract_head=config.extract_head_metadata,
query=query,
scoring_method=config.scoring_method,
score_threshold=config.score_threshold,
max_urls=config.max_urls_discovery,
live_check=config.live_check,
force=config.force_refresh
)
try:
# Discover URLs
urls = await seeder.urls(domain, seeding_config)
# Sort by relevance score (descending)
sorted_urls = sorted(
urls,
key=lambda x: x.get('relevance_score', 0),
reverse=True
)
# Take top K
top_urls = sorted_urls[:config.top_k_urls]
console.print(f"[green]✅ Discovered {len(urls)} URLs, selected top {len(top_urls)}[/green]")
# Cache the result
save_to_cache(cache_key, top_urls)
return top_urls
except Exception as e:
console.print(f"[red]❌ URL discovery failed: {e}[/red]")
return []
async def crawl_selected_urls(urls: List[str], query: str, config: ResearchConfig) -> List[Dict]:
"""
Crawl selected URLs with content filtering:
- Use AsyncWebCrawler.arun_many()
- Apply content filter
- Generate clean markdown
"""
# Extract just URLs from the discovery results
url_list = [u['url'] for u in urls if 'url' in u][:config.max_urls_to_crawl]
if not url_list:
console.print("[red]❌ No URLs to crawl[/red]")
return []
console.print(f"\n[cyan]🕷️ Crawling {len(url_list)} URLs...[/cyan]")
# Check cache for each URL
crawled_results = []
urls_to_crawl = []
for url in url_list:
cache_key = get_cache_key("crawled_content", url, query)
cached_content = load_from_cache(cache_key)
if cached_content and not config.force_refresh:
crawled_results.append(cached_content)
else:
urls_to_crawl.append(url)
if urls_to_crawl:
console.print(f"[cyan]📥 Crawling {len(urls_to_crawl)} new URLs (cached: {len(crawled_results)})[/cyan]")
# Configure markdown generator with content filter
md_generator = DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="dynamic",
min_word_threshold=10
),
)
# Configure crawler
crawler_config = CrawlerRunConfig(
markdown_generator=md_generator,
exclude_external_links=True,
excluded_tags=['nav', 'header', 'footer', 'aside'],
)
# Create crawler with browser config
async with AsyncWebCrawler(
config=BrowserConfig(
headless=config.headless,
verbose=config.verbose
)
) as crawler:
# Crawl URLs
results = await crawler.arun_many(
urls_to_crawl,
config=crawler_config,
max_concurrent=config.max_concurrent_crawls
)
# Process results
for url, result in zip(urls_to_crawl, results):
if result.success:
content_data = {
'url': url,
'title': result.metadata.get('title', ''),
'markdown': result.markdown.fit_markdown or result.markdown.raw_markdown,
'raw_length': len(result.markdown.raw_markdown),
'fit_length': len(result.markdown.fit_markdown) if result.markdown.fit_markdown else len(result.markdown.raw_markdown),
'metadata': result.metadata
}
crawled_results.append(content_data)
# Cache the result
cache_key = get_cache_key("crawled_content", url, query)
save_to_cache(cache_key, content_data)
else:
console.print(f" [red]❌ Failed: {url[:50]}... - {result.error}[/red]")
console.print(f"[green]✅ Successfully crawled {len(crawled_results)} URLs[/green]")
return crawled_results
async def generate_research_synthesis(
query: str,
crawled_content: List[Dict]
) -> Tuple[str, List[Dict]]:
"""
Use LLM to synthesize research findings:
- Analyze all crawled content
- Generate comprehensive answer
- Extract citations and references
"""
if not crawled_content:
return "No content available for synthesis.", []
console.print("\n[cyan]🤖 Generating research synthesis...[/cyan]")
# Prepare content for LLM
content_sections = []
for i, content in enumerate(crawled_content, 1):
section = f"""
SOURCE {i}:
Title: {content['title']}
URL: {content['url']}
Content Preview:
{content['markdown'][:1500]}...
"""
content_sections.append(section)
combined_content = "\n---\n".join(content_sections)
try:
response = await litellm.acompletion(
model="gemini/gemini-2.5-flash-preview-04-17",
messages=[{
"role": "user",
"content": f"""Research Query: "{query}"
Based on the following sources, provide a comprehensive research synthesis.
{combined_content}
Please provide:
1. An executive summary (2-3 sentences)
2. Key findings (3-5 bullet points)
3. Detailed analysis (2-3 paragraphs)
4. Future implications or trends
Format your response with clear sections and cite sources using [Source N] notation.
Keep the total response under 800 words."""
}],
# reasoning_effort="medium",
temperature=0.7
)
synthesis = response.choices[0].message.content
# Extract citations from the synthesis
citations = []
for i, content in enumerate(crawled_content, 1):
if f"[Source {i}]" in synthesis or f"Source {i}" in synthesis:
citations.append({
'source_id': i,
'title': content['title'],
'url': content['url']
})
return synthesis, citations
except Exception as e:
console.print(f"[red]❌ Synthesis generation failed: {e}[/red]")
# Fallback to simple summary
summary = f"Research on '{query}' found {len(crawled_content)} relevant articles:\n\n"
for content in crawled_content[:3]:
summary += f"- {content['title']}\n {content['url']}\n\n"
return summary, []
def format_research_output(result: ResearchResult) -> str:
"""
Format the final research output with:
- Executive summary
- Key findings
- Detailed analysis
- Citations and sources
"""
output = []
output.append("\n" + "=" * 60)
output.append("🔬 RESEARCH RESULTS")
output.append("=" * 60)
# Query info
output.append(f"\n📋 Query: {result.query.original_query}")
if result.query.enhanced_query != result.query.original_query:
output.append(f" Enhanced: {result.query.enhanced_query}")
# Discovery stats
output.append(f"\n📊 Statistics:")
output.append(f" - URLs discovered: {len(result.discovered_urls)}")
output.append(f" - URLs crawled: {len(result.crawled_content)}")
output.append(f" - Processing time: {result.metadata.get('duration', 'N/A')}")
# Synthesis
output.append(f"\n📝 SYNTHESIS")
output.append("-" * 60)
output.append(result.synthesis)
# Citations
if result.citations:
output.append(f"\n📚 SOURCES")
output.append("-" * 60)
for citation in result.citations:
output.append(f"[{citation['source_id']}] {citation['title']}")
output.append(f" {citation['url']}")
return "\n".join(output)
async def save_research_results(result: ResearchResult, config: ResearchConfig) -> Tuple[str, str]:
"""
Save research results in JSON and Markdown formats
Returns:
Tuple of (json_path, markdown_path)
"""
# Create output directory
output_dir = Path(config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Generate filename based on query and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
query_slug = result.query.original_query[:50].replace(" ", "_").replace("/", "_")
base_filename = f"{timestamp}_{query_slug}"
json_path = None
md_path = None
# Save JSON
if config.save_json:
json_path = output_dir / f"{base_filename}.json"
with open(json_path, 'w') as f:
json.dump(asdict(result), f, indent=2, default=str)
console.print(f"\n[green]💾 JSON saved: {json_path}[/green]")
# Save Markdown
if config.save_markdown:
md_path = output_dir / f"{base_filename}.md"
# Create formatted markdown
md_content = [
f"# Research Report: {result.query.original_query}",
f"\n**Generated on:** {result.metadata.get('timestamp', 'N/A')}",
f"\n**Domain:** {result.metadata.get('domain', 'N/A')}",
f"\n**Processing time:** {result.metadata.get('duration', 'N/A')}",
"\n---\n",
"## Query Information",
f"- **Original Query:** {result.query.original_query}",
f"- **Enhanced Query:** {result.query.enhanced_query or 'N/A'}",
f"- **Search Patterns:** {', '.join(result.query.search_patterns or [])}",
"\n## Statistics",
f"- **URLs Discovered:** {len(result.discovered_urls)}",
f"- **URLs Crawled:** {len(result.crawled_content)}",
f"- **Sources Cited:** {len(result.citations)}",
"\n## Research Synthesis\n",
result.synthesis,
"\n## Sources\n"
]
# Add citations
for citation in result.citations:
md_content.append(f"### [{citation['source_id']}] {citation['title']}")
md_content.append(f"- **URL:** [{citation['url']}]({citation['url']})")
md_content.append("")
# Add discovered URLs summary
md_content.extend([
"\n## Discovered URLs (Top 10)\n",
"| Score | URL | Title |",
"|-------|-----|-------|"
])
for url_data in result.discovered_urls[:10]:
score = url_data.get('relevance_score', 0)
url = url_data.get('url', '')
title = 'N/A'
if 'head_data' in url_data and url_data['head_data']:
title = url_data['head_data'].get('title', 'N/A')[:60] + '...'
md_content.append(f"| {score:.3f} | {url[:50]}... | {title} |")
# Write markdown
with open(md_path, 'w') as f:
f.write('\n'.join(md_content))
console.print(f"[green]📄 Markdown saved: {md_path}[/green]")
return str(json_path) if json_path else None, str(md_path) if md_path else None
async def wait_for_user(message: str = "\nPress Enter to continue..."):
"""Wait for user input in interactive mode"""
input(message)
async def research_pipeline(
query: str,
config: ResearchConfig
) -> ResearchResult:
"""
Main research pipeline orchestrator with configurable settings
"""
start_time = datetime.now()
# Display pipeline header
header = Panel(
f"[bold cyan]Research Pipeline[/bold cyan]\n\n"
f"[dim]Domain:[/dim] {config.domain}\n"
f"[dim]Mode:[/dim] {'Test' if config.test_mode else 'Production'}\n"
f"[dim]Interactive:[/dim] {'Yes' if config.interactive_mode else 'No'}",
title="🚀 Starting",
border_style="cyan"
)
console.print(header)
# Step 1: Enhance query (optional)
console.print(f"\n[bold cyan]📝 Step 1: Query Processing[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
if config.use_llm_enhancement:
research_query = await enhance_query_with_llm(query)
else:
research_query = ResearchQuery(
original_query=query,
enhanced_query=query,
search_patterns=tokenize_query_to_patterns(query),
timestamp=datetime.now().isoformat()
)
console.print(f" [green]✅ Query ready:[/green] {research_query.enhanced_query or query}")
# Step 2: Discover URLs
console.print(f"\n[bold cyan]🔍 Step 2: URL Discovery[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
discovered_urls = await discover_urls(
domain=config.domain,
query=research_query.enhanced_query or query,
config=config
)
if not discovered_urls:
return ResearchResult(
query=research_query,
discovered_urls=[],
crawled_content=[],
synthesis="No relevant URLs found for the given query.",
citations=[],
metadata={'duration': str(datetime.now() - start_time)}
)
console.print(f" [green]✅ Found {len(discovered_urls)} relevant URLs[/green]")
# Step 3: Crawl selected URLs
console.print(f"\n[bold cyan]🕷️ Step 3: Content Crawling[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
crawled_content = await crawl_selected_urls(
urls=discovered_urls,
query=research_query.enhanced_query or query,
config=config
)
console.print(f" [green]✅ Successfully crawled {len(crawled_content)} pages[/green]")
# Step 4: Generate synthesis
console.print(f"\n[bold cyan]🤖 Step 4: Synthesis Generation[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
synthesis, citations = await generate_research_synthesis(
query=research_query.enhanced_query or query,
crawled_content=crawled_content
)
console.print(f" [green]✅ Generated synthesis with {len(citations)} citations[/green]")
# Step 5: Create result
result = ResearchResult(
query=research_query,
discovered_urls=discovered_urls,
crawled_content=crawled_content,
synthesis=synthesis,
citations=citations,
metadata={
'duration': str(datetime.now() - start_time),
'domain': config.domain,
'timestamp': datetime.now().isoformat(),
'config': asdict(config)
}
)
duration = datetime.now() - start_time
console.print(f"\n[bold green]✅ Research completed in {duration}[/bold green]")
return result
async def main():
"""
Main entry point for the BBC Sport Research Assistant
"""
# Example queries
example_queries = [
"Premier League transfer news and rumors",
"Champions League match results and analysis",
"World Cup qualifying updates",
"Football injury reports and return dates",
"Tennis grand slam tournament results"
]
# Display header
console.print(Panel.fit(
"[bold cyan]BBC Sport Research Assistant[/bold cyan]\n\n"
"This tool demonstrates efficient research using URLSeeder:\n"
"[dim]• Discover all URLs without crawling\n"
"• Filter and rank by relevance\n"
"• Crawl only the most relevant content\n"
"• Generate AI-powered insights with citations[/dim]\n\n"
f"[dim]📁 Working directory: {SCRIPT_DIR}[/dim]",
title="🔬 Welcome",
border_style="cyan"
))
# Configuration options table
config_table = Table(title="\n⚙️ Configuration Options", show_header=False, box=None)
config_table.add_column(style="bold cyan", width=3)
config_table.add_column()
config_table.add_row("1", "Quick Test Mode (3 URLs, fast)")
config_table.add_row("2", "Standard Mode (10 URLs, balanced)")
config_table.add_row("3", "Comprehensive Mode (20 URLs, thorough)")
config_table.add_row("4", "Custom Configuration")
console.print(config_table)
config_choice = input("\nSelect configuration (1-4): ").strip()
# Create config based on choice
if config_choice == "1":
config = ResearchConfig(test_mode=True, interactive_mode=False)
elif config_choice == "2":
config = ResearchConfig(max_urls_to_crawl=10, top_k_urls=10)
elif config_choice == "3":
config = ResearchConfig(max_urls_to_crawl=20, top_k_urls=20, max_urls_discovery=200)
else:
# Custom configuration
config = ResearchConfig()
config.test_mode = input("\nTest mode? (y/n): ").lower() == 'y'
config.interactive_mode = input("Interactive mode (pause between steps)? (y/n): ").lower() == 'y'
config.use_llm_enhancement = input("Use AI to enhance queries? (y/n): ").lower() == 'y'
if not config.test_mode:
try:
config.max_urls_to_crawl = int(input("Max URLs to crawl (default 10): ") or "10")
config.top_k_urls = int(input("Top K URLs to select (default 10): ") or "10")
except ValueError:
console.print("[yellow]Using default values[/yellow]")
# Display example queries
query_table = Table(title="\n📋 Example Queries", show_header=False, box=None)
query_table.add_column(style="bold cyan", width=3)
query_table.add_column()
for i, q in enumerate(example_queries, 1):
query_table.add_row(str(i), q)
console.print(query_table)
query_input = input("\nSelect a query (1-5) or enter your own: ").strip()
if query_input.isdigit() and 1 <= int(query_input) <= len(example_queries):
query = example_queries[int(query_input) - 1]
else:
query = query_input if query_input else example_queries[0]
console.print(f"\n[bold cyan]📝 Selected Query:[/bold cyan] {query}")
# Run the research pipeline
result = await research_pipeline(query=query, config=config)
# Display results
formatted_output = format_research_output(result)
# print(formatted_output)
console.print(Panel.fit(
formatted_output,
title="🔬 Research Results",
border_style="green"
))
# Save results
if config.save_json or config.save_markdown:
json_path, md_path = await save_research_results(result, config)
# print(f"\n✅ Results saved successfully!")
if json_path:
console.print(f"[green]JSON saved at:[/green] {json_path}")
if md_path:
console.print(f"[green]Markdown saved at:[/green] {md_path}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
"""
Convert Crawl4AI URL Seeder tutorial markdown to Colab notebook format
"""
import json
import re
from pathlib import Path
def parse_markdown_to_cells(markdown_content):
"""Parse markdown content and convert to notebook cells"""
cells = []
# Split content by cell markers
lines = markdown_content.split('\n')
# Extract the header content before first cell marker
header_lines = []
i = 0
while i < len(lines) and not lines[i].startswith('# cell'):
header_lines.append(lines[i])
i += 1
# Add header as markdown cell if it exists
if header_lines:
header_content = '\n'.join(header_lines).strip()
if header_content:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": header_content.split('\n')
})
# Process cells marked with # cell X type:Y
current_cell_content = []
current_cell_type = None
while i < len(lines):
line = lines[i]
# Check for cell marker
cell_match = re.match(r'^# cell (\d+) type:(markdown|code)$', line)
if cell_match:
# Save previous cell if exists
if current_cell_content and current_cell_type:
content = '\n'.join(current_cell_content).strip()
if content:
if current_cell_type == 'code':
cells.append({
"cell_type": "code",
"execution_count": None,
"metadata": {},
"outputs": [],
"source": content.split('\n')
})
else:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": content.split('\n')
})
# Start new cell
current_cell_type = cell_match.group(2)
current_cell_content = []
else:
# Add line to current cell
current_cell_content.append(line)
i += 1
# Add last cell if exists
if current_cell_content and current_cell_type:
content = '\n'.join(current_cell_content).strip()
if content:
if current_cell_type == 'code':
cells.append({
"cell_type": "code",
"execution_count": None,
"metadata": {},
"outputs": [],
"source": content.split('\n')
})
else:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": content.split('\n')
})
return cells
def create_colab_notebook(cells):
"""Create a Colab notebook structure"""
notebook = {
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Crawl4AI_URL_Seeder_Tutorial.ipynb",
"provenance": [],
"collapsed_sections": [],
"toc_visible": True
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": cells
}
return notebook
def main():
# Read the markdown file
md_path = Path("tutorial_url_seeder.md")
if not md_path.exists():
print(f"Error: {md_path} not found!")
return
print(f"Reading {md_path}...")
with open(md_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
# Parse markdown to cells
print("Parsing markdown content...")
cells = parse_markdown_to_cells(markdown_content)
print(f"Created {len(cells)} cells")
# Create notebook
print("Creating Colab notebook...")
notebook = create_colab_notebook(cells)
# Save notebook
output_path = Path("Crawl4AI_URL_Seeder_Tutorial.ipynb")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(notebook, f, indent=2, ensure_ascii=False)
print(f"✅ Successfully created {output_path}")
print(f" - Total cells: {len(cells)}")
print(f" - Markdown cells: {sum(1 for c in cells if c['cell_type'] == 'markdown')}")
print(f" - Code cells: {sum(1 for c in cells if c['cell_type'] == 'code')}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -1,263 +0,0 @@
"""
URL Seeder Demo - Interactive showcase of Crawl4AI's URL discovery capabilities
This demo shows:
1. Basic URL discovery from sitemaps and Common Crawl
2. Cache management and forced refresh
3. Live URL validation and metadata extraction
4. BM25 relevance scoring for intelligent filtering
5. Integration with AsyncWebCrawler for the complete pipeline
6. Multi-domain discovery across multiple sites
Note: The AsyncUrlSeeder now supports context manager protocol for automatic cleanup.
"""
import asyncio
import time
from datetime import datetime
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, BarColumn, TimeElapsedColumn
from rich.prompt import Prompt, Confirm
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
AsyncUrlSeeder,
SeedingConfig
)
console = Console()
console.rule("[bold green]🌐 Crawl4AI URL Seeder: Interactive Demo")
DOMAIN = "crawl4ai.com"
# Utils
def print_head_info(head_data):
table = Table(title="<head> Metadata", expand=True)
table.add_column("Key", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
if not head_data:
console.print("[yellow]No head data found.")
return
if head_data.get("title"):
table.add_row("title", head_data["title"])
if head_data.get("charset"):
table.add_row("charset", head_data["charset"])
for k, v in head_data.get("meta", {}).items():
table.add_row(f"meta:{k}", v)
for rel, items in head_data.get("link", {}).items():
for item in items:
table.add_row(f"link:{rel}", item.get("href", ""))
console.print(table)
async def section_1_basic_exploration(seed: AsyncUrlSeeder):
console.rule("[bold cyan]1. Basic Seeding")
cfg = SeedingConfig(source="cc+sitemap", pattern="*", verbose=True)
start_time = time.time()
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
p.add_task(description="Fetching from Common Crawl + Sitemap...", total=None)
urls = await seed.urls(DOMAIN, cfg)
elapsed = time.time() - start_time
console.print(f"[green]✓ Fetched {len(urls)} URLs in {elapsed:.2f} seconds")
console.print(f"[dim] Speed: {len(urls)/elapsed:.0f} URLs/second[/dim]\n")
console.print("[bold]Sample URLs:[/bold]")
for u in urls[:5]:
console.print(f"{u['url']}")
async def section_2_cache_demo(seed: AsyncUrlSeeder):
console.rule("[bold cyan]2. Caching Demonstration")
console.print("[yellow]Using `force=True` to bypass cache and fetch fresh data.[/yellow]")
cfg = SeedingConfig(source="cc", pattern="*crawl4ai.com/core/*", verbose=False, force = True)
await seed.urls(DOMAIN, cfg)
async def section_3_live_head(seed: AsyncUrlSeeder):
console.rule("[bold cyan]3. Live Check + Head Extraction")
cfg = SeedingConfig(
extract_head=True,
concurrency=10,
hits_per_sec=5,
pattern="*crawl4ai.com/*",
max_urls=10,
verbose=False,
)
urls = await seed.urls(DOMAIN, cfg)
valid = [u for u in urls if u["status"] == "valid"]
console.print(f"[green]Valid: {len(valid)} / {len(urls)}")
if valid:
print_head_info(valid[0]["head_data"])
async def section_4_bm25_scoring(seed: AsyncUrlSeeder):
console.rule("[bold cyan]4. BM25 Relevance Scoring")
console.print("[yellow]Using AI-powered relevance scoring to find the most relevant content[/yellow]")
query = "markdown generation extraction strategies"
cfg = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
score_threshold=0.3, # Only URLs with >30% relevance
max_urls=20,
verbose=False
)
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
p.add_task(description=f"Searching for: '{query}'", total=None)
urls = await seed.urls(DOMAIN, cfg)
console.print(f"[green]Found {len(urls)} relevant URLs (score > 0.3)")
# Show top results with scores
table = Table(title="Top 5 Most Relevant Pages", expand=True)
table.add_column("Score", style="cyan", width=8)
table.add_column("Title", style="magenta")
table.add_column("URL", style="blue", overflow="fold")
for url in urls[:5]:
score = f"{url['relevance_score']:.2f}"
title = url['head_data'].get('title', 'No title')[:60] + "..."
table.add_row(score, title, url['url'])
console.print(table)
async def section_5_keyword_filter_to_agent(seed: AsyncUrlSeeder):
console.rule("[bold cyan]5. Complete Pipeline: Discover → Filter → Crawl")
cfg = SeedingConfig(
extract_head=True,
concurrency=20,
hits_per_sec=10,
max_urls=10,
pattern="*crawl4ai.com/*",
force=True,
)
urls = await seed.urls(DOMAIN, cfg)
keywords = ["deep crawling", "markdown", "llm"]
selected = [u for u in urls if any(k in str(u["head_data"]).lower() for k in keywords)]
console.print(f"[cyan]Selected {len(selected)} URLs with relevant keywords:")
for u in selected[:10]:
console.print("", u["url"])
console.print("\n[yellow]Passing above URLs to arun_many() LLM agent for crawling...")
async with AsyncWebCrawler(verbose=True) as crawler:
crawl_run_config = CrawlerRunConfig(
# Example crawl settings for these URLs:
only_text=True, # Just get text content
screenshot=False,
pdf=False,
word_count_threshold=50, # Only process pages with at least 50 words
stream=True,
verbose=False # Keep logs clean for arun_many in this demo
)
# Extract just the URLs from the selected results
urls_to_crawl = [u["url"] for u in selected]
# We'll stream results for large lists, but collect them here for demonstration
crawled_results_stream = await crawler.arun_many(urls_to_crawl, config=crawl_run_config)
final_crawled_data = []
async for result in crawled_results_stream:
final_crawled_data.append(result)
if len(final_crawled_data) % 5 == 0:
print(f" Processed {len(final_crawled_data)}/{len(urls_to_crawl)} URLs...")
print(f"\n Successfully crawled {len(final_crawled_data)} URLs.")
if final_crawled_data:
print("\n Example of a crawled result's URL and Markdown (first successful one):")
for result in final_crawled_data:
if result.success and result.markdown.raw_markdown:
print(f" URL: {result.url}")
print(f" Markdown snippet: {result.markdown.raw_markdown[:200]}...")
break
else:
print(" No successful crawls with markdown found.")
else:
print(" No successful crawls found.")
async def section_6_multi_domain(seed: AsyncUrlSeeder):
console.rule("[bold cyan]6. Multi-Domain Discovery")
console.print("[yellow]Discovering Python tutorials across multiple educational sites[/yellow]\n")
domains = ["docs.python.org", "realpython.com", "docs.crawl4ai.com"]
cfg = SeedingConfig(
source="sitemap",
extract_head=True,
query="python tutorial guide",
scoring_method="bm25",
score_threshold=0.2,
max_urls=5 # Per domain
)
start_time = time.time()
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
task = p.add_task(description="Discovering across domains...", total=None)
results = await seed.many_urls(domains, cfg)
elapsed = time.time() - start_time
total_urls = sum(len(urls) for urls in results.values())
console.print(f"[green]✓ Found {total_urls} relevant URLs across {len(domains)} domains in {elapsed:.2f}s\n")
# Show results per domain
for domain, urls in results.items():
console.print(f"[bold]{domain}:[/bold] {len(urls)} relevant pages")
if urls:
top = urls[0]
console.print(f" Top result: [{top['relevance_score']:.2f}] {top['head_data'].get('title', 'No title')}")
async def main():
async with AsyncUrlSeeder() as seed:
# Interactive menu
sections = {
"1": ("Basic URL Discovery", section_1_basic_exploration),
"2": ("Cache Management Demo", section_2_cache_demo),
"3": ("Live Check & Metadata Extraction", section_3_live_head),
"4": ("BM25 Relevance Scoring", section_4_bm25_scoring),
"5": ("Complete Pipeline (Discover → Filter → Crawl)", section_5_keyword_filter_to_agent),
"6": ("Multi-Domain Discovery", section_6_multi_domain),
"7": ("Run All Demos", None)
}
console.print("\n[bold]Available Demos:[/bold]")
for key, (title, _) in sections.items():
console.print(f" {key}. {title}")
choice = Prompt.ask("\n[cyan]Which demo would you like to run?[/cyan]",
choices=list(sections.keys()),
default="7")
console.print()
if choice == "7":
# Run all demos
for key, (title, func) in sections.items():
if key != "7" and func:
await func(seed)
if key != "6": # Don't pause after the last demo
if not Confirm.ask("\n[yellow]Continue to next demo?[/yellow]", default=True):
break
console.print()
else:
# Run selected demo
_, func = sections[choice]
await func(seed)
console.rule("[bold green]Demo Complete ✔︎")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,128 +0,0 @@
"""
🚀 URL Seeder + AsyncWebCrawler = Magic!
Quick demo showing discovery → filter → crawl pipeline
Note: Uses context manager for automatic cleanup of resources.
"""
import asyncio, os
from crawl4ai import AsyncUrlSeeder, AsyncWebCrawler, SeedingConfig, CrawlerRunConfig, AsyncLogger, DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import PruningContentFilter
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# 🔍 Example 1: Discover ALL → Filter → Crawl
async def discover_and_crawl():
"""Find Python module tutorials & extract them all!"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
# Step 1: See how many URLs exist (spoiler: A LOT!)
print("📊 Let's see what RealPython has...")
all_urls = await seeder.urls("realpython.com",
SeedingConfig(source="sitemap"))
print(f"😱 Found {len(all_urls)} total URLs!")
# Step 2: Filter for Python modules (perfect size ~13)
print("\n🎯 Filtering for 'python-modules' tutorials...")
module_urls = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*python-modules*",
live_check=True # Make sure they're alive!
))
print(f"✨ Found {len(module_urls)} module tutorials")
for url in module_urls[:3]: # Show first 3
status = "" if url["status"] == "valid" else ""
print(f"{status} {url['url']}")
# Step 3: Crawl them all with pruning (keep it lean!)
print("\n🕷️ Crawling all module tutorials...")
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter( # Smart filtering!
threshold=0.48, # Remove fluff
threshold_type="fixed",
),
),
only_text=True,
stream=True,
)
# Extract just the URLs from the seeder results
urls_to_crawl = [u["url"] for u in module_urls[:5]]
results = await crawler.arun_many(urls_to_crawl, config=config)
# Process & save
saved = 0
async for result in results:
if result.success:
# Save each tutorial (name from URL)
name = result.url.split("/")[-2] + ".md"
name = os.path.join(CURRENT_DIR, name)
with open(name, "w") as f:
f.write(result.markdown.fit_markdown)
saved += 1
print(f"💾 Saved: {name}")
print(f"\n🎉 Successfully saved {saved} tutorials!")
# 🔍 Example 2: Beautiful Soup articles with metadata peek
async def explore_beautifulsoup():
"""Discover BeautifulSoup content & peek at metadata"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
print("🍲 Looking for Beautiful Soup articles...")
soup_urls = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*beautiful-soup*",
extract_head=True # Get the metadata!
))
print(f"\n📚 Found {len(soup_urls)} Beautiful Soup articles:\n")
# Show what we discovered
for i, url in enumerate(soup_urls, 1):
meta = url["head_data"]["meta"]
print(f"{i}. {url['head_data']['title']}")
print(f" 📝 {meta.get('description', 'No description')[:60]}...")
print(f" 👤 By: {meta.get('author', 'Unknown')}")
print(f" 🔗 {url['url']}\n")
# 🔍 Example 3: Smart search with BM25 relevance scoring
async def smart_search_with_bm25():
"""Use AI-powered relevance scoring to find the best content"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
print("🧠 Smart search: 'web scraping tutorial quiz'")
# Search with BM25 scoring - AI finds the best matches!
results = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*beautiful-soup*",
extract_head=True,
query="web scraping tutorial quiz", # Our search
scoring_method="bm25",
score_threshold=0.2 # Quality filter
))
print(f"\n🎯 Top {len(results)} most relevant results:\n")
# Show ranked results with relevance scores
for i, result in enumerate(results[:3], 1):
print(f"{i}. [{result['relevance_score']:.2f}] {result['head_data']['title']}")
print(f" 🔗 {result['url'][:60]}...")
print("\n✨ BM25 automatically ranked by relevance!")
# 🎬 Run the show!
async def main():
print("=" * 60)
await discover_and_crawl()
print("\n" + "=" * 60 + "\n")
await explore_beautifulsoup()
print("\n" + "=" * 60 + "\n")
await smart_search_with_bm25()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -16,10 +16,22 @@
--mono-font-stack: Menlo, Monaco, Lucida Console, Liberation Mono, DejaVu Sans Mono, Bitstream Vera Sans Mono,
Courier New, monospace, serif;
--background-color: #151515; /* Dark background */
--font-color: #eaeaea; /* Light font color for contrast */
--invert-font-color: #151515; /* Dark color for inverted elements */
--primary-color: #1a95e0; /* Primary color can remain the same or be adjusted for better contrast */
--secondary-color: #727578; /* Secondary color for less important text */
--secondary-dimmed-color: #8b857a; /* Dimmed secondary color */
--error-color: #ff5555; /* Bright color for errors */
--progress-bar-background: #444; /* Darker background for progress bar */
--progress-bar-fill: #1a95e0; /* Bright color for progress bar fill */
--code-bg-color: #1e1e1e; /* Darker background for code blocks */
--input-style: solid; /* Keeping input style solid */
--block-background-color: #202020; /* Darker background for block elements */
--global-font-color: #eaeaea; /* Light font color for global elements */
--background-color: #222225;
--background-color: #070708;
--page-width: 70em;
--font-color: #e8e9ed;
@@ -28,7 +40,7 @@
--secondary-color: #d5cec0;
--tertiary-color: #a3abba;
--primary-dimmed-color: #09b5a5; /* Updated to the brand color */
--primary-color: #0fbbaa; /* Updated to the brand color */
--primary-color: #50ffff; /* Updated to the brand color */
--accent-color: rgb(243, 128, 245);
--error-color: #ff3c74;
--progress-bar-background: #3f3f44;

View File

@@ -200,7 +200,7 @@ config = CrawlerRunConfig(markdown_generator=md_generator)
- **`user_query`**: The term you want to focus on. BM25 tries to keep only content blocks relevant to that query.
- **`bm25_threshold`**: Raise it to keep fewer blocks; lower it to keep more.
- **`use_stemming`** *(default `True`)*: If enabled, variations of words match (e.g., “learn,” “learning,” “learnt”).
- **`use_stemming`**: If `True`, variations of words match (e.g., “learn,” “learning,” “learnt”).
**No query provided?** BM25 tries to glean a context from page metadata, or you can simply treat it as a scorched-earth approach that discards text with low generic score. Realistically, you want to supply a query for best results.

File diff suppressed because it is too large Load Diff

View File

@@ -12,10 +12,10 @@ parent_dir = os.path.dirname(
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy as WebScrapingStrategyCurrent,
)
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
# from crawl4ai.content_scraping_strategy import (
# WebScrapingStrategy as WebScrapingStrategyCurrent,
# )
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
@@ -32,8 +32,8 @@ class TestResult:
class StrategyTester:
def __init__(self):
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
self.new_scraper = LXMLWebScrapingStrategy()
self.current_scraper = LXMLWebScrapingStrategy()
with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f:
self.WIKI_HTML = f.read()
self.results = {"new": [], "current": []}

View File

@@ -2,7 +2,6 @@ import json
import time
from bs4 import BeautifulSoup
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy,
LXMLWebScrapingStrategy,
)
from typing import Dict, List, Tuple
@@ -274,7 +273,7 @@ def get_test_scenarios():
that will be passed into scrap() for testing various features.
"""
TEST_SCENARIOS = {
# "default": {},
"default": {},
# "exclude_domains": {
# "exclude_domains": {"images.example.com", "ads.example.com"}
# },
@@ -609,19 +608,26 @@ class ScraperEquivalenceTester:
print("\n=== Testing complicated HTML with multiple parameter scenarios ===")
# Create the scrapers once (or you can re-create if needed)
original = WebScrapingStrategy()
# original = WebScrapingStrategy()
original = LXMLWebScrapingStrategy()
lxml = LXMLWebScrapingStrategy()
# Base URL for testing
url = "http://test.com"
url = "https://kidocode.com"
for scenario_name, params in get_test_scenarios().items():
print(f"\nScenario: {scenario_name}")
start = time.time()
orig_result = original.scrap("http://test.com", complicated_html, **params)
orig_result = original.scrap(url, complicated_html, **params)
orig_time = time.time() - start
orig_result = orig_result.model_dump()
start = time.time()
lxml_result = lxml.scrap("http://test.com", complicated_html, **params)
lxml_result = lxml.scrap(url, complicated_html, **params)
lxml_time = time.time() - start
lxml_result = lxml_result.model_dump()
diffs = {}
link_diff = self.deep_compare_links(

View File

@@ -1,711 +0,0 @@
"""
Comprehensive test cases for AsyncUrlSeeder with BM25 scoring functionality.
Tests cover all features including query-based scoring, metadata extraction,
edge cases, and integration scenarios.
"""
import asyncio
import pytest
from typing import List, Dict, Any
from crawl4ai import AsyncUrlSeeder, SeedingConfig, AsyncLogger
import json
from datetime import datetime
# Test domain - using docs.crawl4ai.com as it has the actual documentation
TEST_DOMAIN = "kidocode.com"
TEST_DOMAIN = "docs.crawl4ai.com"
TEST_DOMAIN = "www.bbc.com/sport"
class TestAsyncUrlSeederBM25:
"""Comprehensive test suite for AsyncUrlSeeder with BM25 scoring."""
async def create_seeder(self):
"""Create an AsyncUrlSeeder instance for testing."""
logger = AsyncLogger()
return AsyncUrlSeeder(logger=logger)
# ============================================
# Basic BM25 Scoring Tests
# ============================================
@pytest.mark.asyncio
async def test_basic_bm25_scoring(self, seeder):
"""Test basic BM25 scoring with a simple query."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="premier league highlights",
scoring_method="bm25",
max_urls=200,
verbose=True,
force=True # Force fresh fetch
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify results have relevance scores
assert all("relevance_score" in r for r in results)
# Verify scores are normalized between 0 and 1
scores = [r["relevance_score"] for r in results]
assert all(0.0 <= s <= 1.0 for s in scores)
# Verify results are sorted by relevance (descending)
assert scores == sorted(scores, reverse=True)
# Print top 5 results for manual verification
print("\nTop 5 results for 'web crawling tutorial':")
for i, r in enumerate(results[:5]):
print(f"{i+1}. Score: {r['relevance_score']:.3f} - {r['url']}")
@pytest.mark.asyncio
async def test_query_variations(self, seeder):
"""Test BM25 scoring with different query variations."""
queries = [
"VAR controversy",
"player ratings",
"live score update",
"transfer rumours",
"post match analysis",
"injury news"
]
for query in queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=100,
# force=True
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify each query produces scored results
assert len(results) > 0
assert all("relevance_score" in r for r in results)
print(f"\nTop result for '{query}':")
if results:
top = results[0]
print(f" Score: {top['relevance_score']:.3f} - {top['url']}")
# ============================================
# Score Threshold Tests
# ============================================
@pytest.mark.asyncio
async def test_score_threshold_filtering(self, seeder):
"""Test filtering results by minimum relevance score."""
thresholds = [0.1, 0.3, 0.5, 0.7]
for threshold in thresholds:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="league standings",
score_threshold=threshold,
scoring_method="bm25",
max_urls=50
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify all results meet threshold
if results:
assert all(r["relevance_score"] >= threshold for r in results)
print(f"\nThreshold {threshold}: {len(results)} URLs passed")
@pytest.mark.asyncio
async def test_extreme_thresholds(self, seeder):
"""Test edge cases with extreme threshold values."""
# Very low threshold - should return many results
config_low = SeedingConfig(
source="sitemap",
extract_head=True,
query="match",
score_threshold=0.001,
scoring_method="bm25"
)
results_low = await seeder.urls(TEST_DOMAIN, config_low)
# Very high threshold - might return few or no results
config_high = SeedingConfig(
source="sitemap",
extract_head=True,
query="match",
score_threshold=0.99,
scoring_method="bm25"
)
results_high = await seeder.urls(TEST_DOMAIN, config_high)
# Low threshold should return more results than high
assert len(results_low) >= len(results_high)
print(f"\nLow threshold (0.001): {len(results_low)} results")
print(f"High threshold (0.99): {len(results_high)} results")
# ============================================
# Metadata Extraction Tests
# ============================================
@pytest.mark.asyncio
async def test_comprehensive_metadata_extraction(self, seeder):
"""Test extraction of all metadata types including JSON-LD."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="match report",
scoring_method="bm25",
max_urls=5,
verbose=True
)
results = await seeder.urls(TEST_DOMAIN, config)
for result in results:
head_data = result.get("head_data", {})
# Check for various metadata fields
print(f"\nMetadata for {result['url']}:")
print(f" Title: {head_data.get('title', 'N/A')}")
print(f" Charset: {head_data.get('charset', 'N/A')}")
print(f" Lang: {head_data.get('lang', 'N/A')}")
# Check meta tags
meta = head_data.get("meta", {})
if meta:
print(" Meta tags found:")
for key in ["description", "keywords", "author", "viewport"]:
if key in meta:
print(f" {key}: {meta[key][:50]}...")
# Check for Open Graph tags
og_tags = {k: v for k, v in meta.items() if k.startswith("og:")}
if og_tags:
print(" Open Graph tags found:")
for k, v in list(og_tags.items())[:3]:
print(f" {k}: {v[:50]}...")
# Check JSON-LD
if head_data.get("jsonld"):
print(f" JSON-LD schemas found: {len(head_data['jsonld'])}")
@pytest.mark.asyncio
async def test_jsonld_extraction_scoring(self, seeder):
"""Test that JSON-LD data contributes to BM25 scoring."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="Premier League match report highlights",
scoring_method="bm25",
max_urls=20
)
results = await seeder.urls(TEST_DOMAIN, config)
# Find results with JSON-LD data
jsonld_results = [r for r in results if r.get("head_data", {}).get("jsonld")]
if jsonld_results:
print(f"\nFound {len(jsonld_results)} URLs with JSON-LD data")
for r in jsonld_results[:3]:
print(f" Score: {r['relevance_score']:.3f} - {r['url']}")
jsonld_data = r["head_data"]["jsonld"]
print(f" JSON-LD types: {[item.get('@type', 'Unknown') for item in jsonld_data if isinstance(item, dict)]}")
# ============================================
# Edge Cases and Error Handling
# ============================================
@pytest.mark.asyncio
async def test_empty_query(self, seeder):
"""Test behavior with empty query string."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="",
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
# Should return results but all with zero scores
assert len(results) > 0
assert all(r.get("relevance_score", 0) == 0 for r in results)
@pytest.mark.asyncio
async def test_query_without_extract_head(self, seeder):
"""Test query scoring when extract_head is False."""
config = SeedingConfig(
source="sitemap",
extract_head=False, # This should trigger a warning
query="Premier League match report highlights",
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
# Results should not have relevance scores
assert all("relevance_score" not in r for r in results)
print("\nVerified: No scores added when extract_head=False")
@pytest.mark.asyncio
async def test_special_characters_in_query(self, seeder):
"""Test queries with special characters and symbols."""
special_queries = [
"premier league + analytics",
"injury/rehab routines",
"AI-powered scouting",
"match stats & xG",
"tactical@breakdown",
"transfer-window.yml"
]
for query in special_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
try:
results = await seeder.urls(TEST_DOMAIN, config)
assert isinstance(results, list)
print(f"\n✓ Query '{query}' processed successfully")
except Exception as e:
pytest.fail(f"Failed on query '{query}': {str(e)}")
@pytest.mark.asyncio
async def test_unicode_query(self, seeder):
"""Test queries with Unicode characters."""
unicode_queries = [
"网页爬虫", # Chinese
"веб-краулер", # Russian
"🚀 crawl4ai", # Emoji
"naïve implementation", # Accented characters
]
for query in unicode_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
try:
results = await seeder.urls(TEST_DOMAIN, config)
assert isinstance(results, list)
print(f"\n✓ Unicode query '{query}' processed successfully")
except Exception as e:
print(f"\n✗ Unicode query '{query}' failed: {str(e)}")
# ============================================
# Performance and Scalability Tests
# ============================================
@pytest.mark.asyncio
async def test_large_scale_scoring(self, seeder):
"""Test BM25 scoring with many URLs."""
config = SeedingConfig(
source="cc+sitemap", # Use both sources for more URLs
extract_head=True,
query="world cup group standings",
scoring_method="bm25",
max_urls=100,
concurrency=20,
hits_per_sec=10
)
start_time = asyncio.get_event_loop().time()
results = await seeder.urls(TEST_DOMAIN, config)
elapsed = asyncio.get_event_loop().time() - start_time
print(f"\nProcessed {len(results)} URLs in {elapsed:.2f} seconds")
print(f"Average time per URL: {elapsed/len(results)*1000:.1f}ms")
# Verify scoring worked at scale
assert all("relevance_score" in r for r in results)
# Check score distribution
scores = [r["relevance_score"] for r in results]
print(f"Score distribution:")
print(f" Min: {min(scores):.3f}")
print(f" Max: {max(scores):.3f}")
print(f" Avg: {sum(scores)/len(scores):.3f}")
@pytest.mark.asyncio
async def test_concurrent_scoring_consistency(self, seeder):
"""Test that concurrent requests produce consistent scores."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="live score update",
scoring_method="bm25",
max_urls=20,
concurrency=10
)
# Run the same query multiple times
results_list = []
for _ in range(3):
results = await seeder.urls(TEST_DOMAIN, config)
results_list.append(results)
# Compare scores across runs (they should be identical for same URLs)
url_scores = {}
for results in results_list:
for r in results:
url = r["url"]
score = r["relevance_score"]
if url in url_scores:
# Scores should be very close (allowing for tiny float differences)
assert abs(url_scores[url] - score) < 0.001
else:
url_scores[url] = score
print(f"\n✓ Consistent scores across {len(results_list)} runs")
# ============================================
# Multi-Domain Tests
# ============================================
@pytest.mark.asyncio
async def test_many_urls_with_scoring(self, seeder):
"""Test many_urls method with BM25 scoring."""
domains = [TEST_DOMAIN, "docs.crawl4ai.com", "example.com"]
config = SeedingConfig(
source="sitemap",
extract_head=True,
# live_check=True,
query="fixture list",
scoring_method="bm25",
score_threshold=0.2,
max_urls=10,
force=True, # Force fresh fetch
)
results_dict = await seeder.many_urls(domains, config)
for domain, results in results_dict.items():
print(f"\nDomain: {domain}")
print(f" Found {len(results)} URLs above threshold")
if results:
top = results[0]
print(f" Top result: {top['relevance_score']:.3f} - {top['url']}")
# ============================================
# Complex Query Tests
# ============================================
@pytest.mark.asyncio
async def test_multi_word_complex_queries(self, seeder):
"""Test complex multi-word queries."""
complex_queries = [
"how to follow live match commentary",
"extract expected goals stats from match data",
"premier league match report analysis",
"transfer rumours and confirmed signings tracker",
"tactical breakdown of high press strategy"
]
for query in complex_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
results = await seeder.urls(TEST_DOMAIN, config)
if results:
print(f"\nQuery: '{query}'")
print(f"Top match: {results[0]['relevance_score']:.3f} - {results[0]['url']}")
# Extract matched terms from metadata
head_data = results[0].get("head_data", {})
title = head_data.get("title", "")
description = head_data.get("meta", {}).get("description", "")
# Simple term matching for verification
query_terms = set(query.lower().split())
title_terms = set(title.lower().split())
desc_terms = set(description.lower().split())
matched_terms = query_terms & (title_terms | desc_terms)
if matched_terms:
print(f"Matched terms: {', '.join(matched_terms)}")
# ============================================
# Cache and Force Tests
# ============================================
@pytest.mark.asyncio
async def test_scoring_with_cache(self, seeder):
"""Test that scoring works correctly with cached results."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="injury update timeline",
scoring_method="bm25",
max_urls=10,
force=False # Use cache
)
# First run - populate cache
results1 = await seeder.urls(TEST_DOMAIN, config)
# Second run - should use cache
results2 = await seeder.urls(TEST_DOMAIN, config)
# Results should be identical
assert len(results1) == len(results2)
for r1, r2 in zip(results1, results2):
assert r1["url"] == r2["url"]
assert abs(r1["relevance_score"] - r2["relevance_score"]) < 0.001
print("\n✓ Cache produces consistent scores")
@pytest.mark.asyncio
async def test_force_refresh_scoring(self, seeder):
"""Test force=True bypasses cache for fresh scoring."""
config_cached = SeedingConfig(
source="sitemap",
extract_head=True,
query="transfer window",
scoring_method="bm25",
max_urls=5,
force=False
)
config_forced = SeedingConfig(
source="sitemap",
extract_head=True,
query="transfer window",
scoring_method="bm25",
max_urls=5,
force=True
)
# Run with cache
start1 = asyncio.get_event_loop().time()
results1 = await seeder.urls(TEST_DOMAIN, config_cached)
time1 = asyncio.get_event_loop().time() - start1
# Run with force (should be slower due to fresh fetch)
start2 = asyncio.get_event_loop().time()
results2 = await seeder.urls(TEST_DOMAIN, config_forced)
time2 = asyncio.get_event_loop().time() - start2
print(f"\nCached run: {time1:.2f}s")
print(f"Forced run: {time2:.2f}s")
# Both should produce scored results
assert all("relevance_score" in r for r in results1)
assert all("relevance_score" in r for r in results2)
# ============================================
# Source Combination Tests
# ============================================
@pytest.mark.asyncio
async def test_scoring_with_multiple_sources(self, seeder):
"""Test BM25 scoring with combined sources (cc+sitemap)."""
config = SeedingConfig(
source="cc+sitemap",
extract_head=True,
query="match highlights video",
scoring_method="bm25",
score_threshold=0.3,
max_urls=30,
concurrency=15
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify we got results from both sources
print(f"\nCombined sources returned {len(results)} URLs above threshold")
# Check URL diversity
unique_paths = set()
for r in results:
path = r["url"].replace("https://", "").replace("http://", "").split("/", 1)[-1]
unique_paths.add(path.split("?")[0]) # Remove query params
print(f"Unique paths found: {len(unique_paths)}")
# All should be scored and above threshold
assert all(r["relevance_score"] >= 0.3 for r in results)
# ============================================
# Integration Tests
# ============================================
@pytest.mark.asyncio
async def test_full_workflow_integration(self, seeder):
"""Test complete workflow: discover -> score -> filter -> use."""
# Step 1: Discover and score URLs
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="premier league opening fixtures",
scoring_method="bm25",
score_threshold=0.4,
max_urls=10,
verbose=True
)
results = await seeder.urls(TEST_DOMAIN, config)
print(f"\nStep 1: Found {len(results)} relevant URLs")
# Step 2: Analyze top results
if results:
top_urls = results[:3]
print("\nStep 2: Top 3 URLs for crawling:")
for i, r in enumerate(top_urls):
print(f"{i+1}. Score: {r['relevance_score']:.3f}")
print(f" URL: {r['url']}")
print(f" Title: {r['head_data'].get('title', 'N/A')}")
# Check metadata quality
meta = r['head_data'].get('meta', {})
if 'description' in meta:
print(f" Description: {meta['description'][:80]}...")
# Step 3: Verify these URLs would be good for actual crawling
assert all(r["status"] == "valid" for r in results[:3])
print("\nStep 3: All top URLs are valid for crawling ✓")
# ============================================
# Report Generation
# ============================================
@pytest.mark.asyncio
async def test_generate_scoring_report(self, seeder):
"""Generate a comprehensive report of BM25 scoring effectiveness."""
queries = {
"beginner": "match schedule",
"advanced": "tactical analysis pressing",
"api": "VAR decision explanation",
"deployment": "fixture changes due to weather",
"extraction": "expected goals statistics"
}
report = {
"timestamp": datetime.now().isoformat(),
"domain": TEST_DOMAIN,
"results": {}
}
for category, query in queries.items():
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
report["results"][category] = {
"query": query,
"total_results": len(results),
"top_results": [
{
"url": r["url"],
"score": r["relevance_score"],
"title": r["head_data"].get("title", "")
}
for r in results[:3]
],
"score_distribution": {
"min": min(r["relevance_score"] for r in results) if results else 0,
"max": max(r["relevance_score"] for r in results) if results else 0,
"avg": sum(r["relevance_score"] for r in results) / len(results) if results else 0
}
}
# Print report
print("\n" + "="*60)
print("BM25 SCORING EFFECTIVENESS REPORT")
print("="*60)
print(f"Domain: {report['domain']}")
print(f"Timestamp: {report['timestamp']}")
print("\nResults by Category:")
for category, data in report["results"].items():
print(f"\n{category.upper()}: '{data['query']}'")
print(f" Total results: {data['total_results']}")
print(f" Score range: {data['score_distribution']['min']:.3f} - {data['score_distribution']['max']:.3f}")
print(f" Average score: {data['score_distribution']['avg']:.3f}")
print(" Top matches:")
for i, result in enumerate(data['top_results']):
print(f" {i+1}. [{result['score']:.3f}] {result['title']}")
# ============================================
# Standalone test runner
# ============================================
async def run_all_tests():
"""Run all tests standalone (without pytest)."""
print("Running AsyncUrlSeeder BM25 Tests...")
print("="*60)
test_instance = TestAsyncUrlSeederBM25()
seeder = await test_instance.create_seeder()
# Run each test method
test_methods = [
# test_instance.test_basic_bm25_scoring,
# test_instance.test_query_variations,
# test_instance.test_score_threshold_filtering,
# test_instance.test_extreme_thresholds,
# test_instance.test_comprehensive_metadata_extraction,
# test_instance.test_jsonld_extraction_scoring,
# test_instance.test_empty_query,
# test_instance.test_query_without_extract_head,
# test_instance.test_special_characters_in_query,
# test_instance.test_unicode_query,
# test_instance.test_large_scale_scoring,
# test_instance.test_concurrent_scoring_consistency,
# test_instance.test_many_urls_with_scoring,
test_instance.test_multi_word_complex_queries,
test_instance.test_scoring_with_cache,
test_instance.test_force_refresh_scoring,
test_instance.test_scoring_with_multiple_sources,
test_instance.test_full_workflow_integration,
test_instance.test_generate_scoring_report
]
for test_method in test_methods:
try:
print(f"\nRunning {test_method.__name__}...")
await test_method(seeder)
print(f"{test_method.__name__} passed")
except Exception as e:
import traceback
print(f"{test_method.__name__} failed: {str(e)}")
print(f" Error type: {type(e).__name__}")
traceback.print_exc()
print("\n" + "="*60)
print("Test suite completed!")
if __name__ == "__main__":
# Run tests directly
asyncio.run(run_all_tests())