Compare commits

..

14 Commits

Author SHA1 Message Date
AHMET YILMAZ
9332326457 feat: Add PDF parsing documentation and navigation entry 2025-06-16 18:18:32 +08:00
ntohidi
dc85481180 refactor: Update LLM extraction example with the updated structure 2025-06-12 12:23:03 +02:00
ntohidi
5d9213a0e9 fix: Update JavaScript execution in AsyncPlaywrightCrawlerStrategy to handle script errors and add basic download test case. ref #1215 2025-06-12 12:21:40 +02:00
ntohidi
4679ee023d fix: Enhance URLPatternFilter to enforce path boundary checks for prefix matching. ref #1003 2025-06-10 11:19:18 +02:00
Nasrin
f9b7090084 Merge pull request #1186 from zimmski/fix-typo-provoder
fix, Typo
2025-06-10 10:26:45 +02:00
AHMET YILMAZ
9442597f81 #1127: Improve URL handling and normalization in scraping strategies 2025-06-10 11:57:06 +08:00
AHMET YILMAZ
74b06d4b80 #1167 Add PHP MIME types to ContentTypeFilter for better file handling 2025-06-09 11:49:33 +08:00
ntohidi
5ac19a61d7 feat: Implement max_scroll_steps parameter for full page scanning. ref: #1168 2025-06-05 16:40:34 +02:00
Markus Zimmermann
022cc2d92a fix, Typo 2025-06-05 15:30:38 +02:00
ntohidi
fcc2abe4db (fix): Update document about LLM extraction strategy to use LLMConfig. REF #1146 2025-06-03 12:53:59 +02:00
ntohidi
cc95d3abd4 Fix raw URL parsing logic to correctly handle "raw://" and "raw:" prefixes. REF #1118 2025-06-03 11:19:08 +02:00
Nasrin
5ce3e682f3 Merge pull request #752 from jl-martins/fix-raw-url-parsing
Fix `raw://` URL parsing logic. issue ref #1118
2025-06-03 11:10:29 +02:00
João Martins
58c1e17170 Merge branch 'main' into fix-raw-url-parsing 2025-05-30 13:03:25 +01:00
João Martins
27af4cc27b Fix "raw://" URL parsing logic
Closes https://github.com/unclecode/crawl4ai/issues/686
2025-02-15 15:34:59 +00:00
43 changed files with 2118 additions and 14229 deletions

View File

@@ -1,3 +0,0 @@
{
"enableAllProjectMcpServers": false
}

View File

@@ -5,42 +5,6 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **AsyncUrlSeeder**: High-performance URL discovery system for intelligent crawling at scale
- Discover URLs from sitemaps and Common Crawl index
- Extract and analyze page metadata without full crawling
- BM25 relevance scoring for query-based URL filtering
- Multi-domain parallel discovery with `many_urls()` method
- Automatic caching with TTL for discovered URLs
- Rate limiting and concurrent request management
- Live URL validation with HEAD requests
- JSON-LD and Open Graph metadata extraction
- **SeedingConfig**: Configuration class for URL seeding operations
- Support for multiple discovery sources (`sitemap`, `cc`, `sitemap+cc`)
- Pattern-based URL filtering with wildcards
- Configurable concurrency and rate limiting
- Query-based relevance scoring with BM25
- Score threshold filtering for quality control
- Comprehensive documentation for URL seeding feature
- Detailed comparison with deep crawling approaches
- Complete API reference with examples
- Integration guide with AsyncWebCrawler
- Performance benchmarks and best practices
- Example scripts demonstrating URL seeding:
- `url_seeder_demo.py`: Interactive Rich-based demonstration
- `url_seeder_quick_demo.py`: Screenshot-friendly examples
- Test suite for URL seeding with BM25 scoring
### Changed
- Updated `__init__.py` to export AsyncUrlSeeder and SeedingConfig
- Enhanced documentation with URL seeding integration examples
### Fixed
- Corrected examples to properly extract URLs from seeder results before passing to `arun_many()`
- Fixed logger color compatibility issue (changed `lightblack` to `bright_black`)
## [0.6.2] - 2025-05-02
### Added

View File

@@ -2,8 +2,7 @@
import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig
from .content_scraping_strategy import (
ContentScrapingStrategy,
@@ -66,8 +65,6 @@ from .deep_crawling import (
DFSDeepCrawlStrategy,
DeepCrawlDecorator,
)
# NEW: Import AsyncUrlSeeder
from .async_url_seeder import AsyncUrlSeeder
from .utils import (
start_colab_display_server,
@@ -81,10 +78,6 @@ __all__ = [
"BrowserProfiler",
"LLMConfig",
"GeolocationConfig",
# NEW: Add SeedingConfig
"SeedingConfig",
# NEW: Add AsyncUrlSeeder
"AsyncUrlSeeder",
"DeepCrawlStrategy",
"BFSDeepCrawlStrategy",
"BestFirstCrawlingStrategy",
@@ -167,4 +160,4 @@ __all__ = [
# Disable all Pydantic warnings
warnings.filterwarnings("ignore", module="pydantic")
# pydantic_warnings.filter_warnings()
# pydantic_warnings.filter_warnings()

View File

@@ -207,6 +207,7 @@ class GeolocationConfig:
config_dict.update(kwargs)
return GeolocationConfig.from_dict(config_dict)
class ProxyConfig:
def __init__(
self,
@@ -317,6 +318,8 @@ class ProxyConfig:
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)
class BrowserConfig:
"""
Configuration class for setting up a browser instance and its context in AsyncPlaywrightCrawlerStrategy.
@@ -594,6 +597,7 @@ class BrowserConfig:
return config
return BrowserConfig.from_kwargs(config)
class HTTPCrawlerConfig:
"""HTTP-specific crawler configuration"""
@@ -785,6 +789,8 @@ class CrawlerRunConfig():
Default: False.
scroll_delay (float): Delay in seconds between scroll steps if scan_full_page is True.
Default: 0.2.
max_scroll_steps (Optional[int]): Maximum number of scroll steps to perform during full page scan.
If None, scrolls until the entire page is loaded. Default: None.
process_iframes (bool): If True, attempts to process and inline iframe content.
Default: False.
remove_overlay_elements (bool): If True, remove overlays/popups before extracting HTML.
@@ -915,6 +921,7 @@ class CrawlerRunConfig():
ignore_body_visibility: bool = True,
scan_full_page: bool = False,
scroll_delay: float = 0.2,
max_scroll_steps: Optional[int] = None,
process_iframes: bool = False,
remove_overlay_elements: bool = False,
simulate_user: bool = False,
@@ -1013,6 +1020,7 @@ class CrawlerRunConfig():
self.ignore_body_visibility = ignore_body_visibility
self.scan_full_page = scan_full_page
self.scroll_delay = scroll_delay
self.max_scroll_steps = max_scroll_steps
self.process_iframes = process_iframes
self.remove_overlay_elements = remove_overlay_elements
self.simulate_user = simulate_user
@@ -1154,6 +1162,7 @@ class CrawlerRunConfig():
ignore_body_visibility=kwargs.get("ignore_body_visibility", True),
scan_full_page=kwargs.get("scan_full_page", False),
scroll_delay=kwargs.get("scroll_delay", 0.2),
max_scroll_steps=kwargs.get("max_scroll_steps"),
process_iframes=kwargs.get("process_iframes", False),
remove_overlay_elements=kwargs.get("remove_overlay_elements", False),
simulate_user=kwargs.get("simulate_user", False),
@@ -1263,6 +1272,7 @@ class CrawlerRunConfig():
"ignore_body_visibility": self.ignore_body_visibility,
"scan_full_page": self.scan_full_page,
"scroll_delay": self.scroll_delay,
"max_scroll_steps": self.max_scroll_steps,
"process_iframes": self.process_iframes,
"remove_overlay_elements": self.remove_overlay_elements,
"simulate_user": self.simulate_user,
@@ -1325,6 +1335,7 @@ class CrawlerRunConfig():
config_dict.update(kwargs)
return CrawlerRunConfig.from_kwargs(config_dict)
class LLMConfig:
def __init__(
self,
@@ -1409,53 +1420,4 @@ class LLMConfig:
config_dict.update(kwargs)
return LLMConfig.from_kwargs(config_dict)
class SeedingConfig:
"""
Configuration class for URL discovery and pre-validation via AsyncUrlSeeder.
"""
def __init__(
self,
source: str = "sitemap+cc", # Options: "sitemap", "cc", "sitemap+cc"
pattern: Optional[str] = "*", # URL pattern to filter discovered URLs (e.g., "*example.com/blog/*")
live_check: bool = False, # Whether to perform HEAD requests to verify URL liveness
extract_head: bool = False, # Whether to fetch and parse <head> section for metadata
max_urls: int = -1, # Maximum number of URLs to discover (default: -1 for no limit)
concurrency: int = 1000, # Maximum concurrent requests for live checks/head extraction
hits_per_sec: int = 5, # Rate limit in requests per second
force: bool = False, # If True, bypasses the AsyncUrlSeeder's internal .jsonl cache
base_directory: Optional[str] = None, # Base directory for UrlSeeder's cache files (.jsonl)
llm_config: Optional[LLMConfig] = None, # Forward LLM config for future use (e.g., relevance scoring)
verbose: Optional[bool] = None, # Override crawler's general verbose setting
query: Optional[str] = None, # Search query for relevance scoring
score_threshold: Optional[float] = None, # Minimum relevance score to include URL (0.0-1.0)
scoring_method: str = "bm25", # Scoring method: "bm25" (default), future: "semantic"
filter_nonsense_urls: bool = True, # Filter out utility URLs like robots.txt, sitemap.xml, etc.
):
self.source = source
self.pattern = pattern
self.live_check = live_check
self.extract_head = extract_head
self.max_urls = max_urls
self.concurrency = concurrency
self.hits_per_sec = hits_per_sec
self.force = force
self.base_directory = base_directory
self.llm_config = llm_config
self.verbose = verbose
self.query = query
self.score_threshold = score_threshold
self.scoring_method = scoring_method
self.filter_nonsense_urls = filter_nonsense_urls
# Add to_dict, from_kwargs, and clone methods for consistency
def to_dict(self) -> Dict[str, Any]:
return {k: v for k, v in self.__dict__.items() if k != 'llm_config' or v is not None}
@staticmethod
def from_kwargs(kwargs: Dict[str, Any]) -> 'SeedingConfig':
return SeedingConfig(**kwargs)
def clone(self, **kwargs: Any) -> 'SeedingConfig':
config_dict = self.to_dict()
config_dict.update(kwargs)
return SeedingConfig.from_kwargs(config_dict)

View File

@@ -466,9 +466,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
console_messages=captured_console,
)
elif url.startswith("raw:") or url.startswith("raw://"):
#####
# Since both "raw:" and "raw://" start with "raw:", the first condition is always true for both, so "raw://" will be sliced as "//...", which is incorrect.
# Fix: Check for "raw://" first, then "raw:"
# Also, the prefix "raw://" is actually 6 characters long, not 7, so it should be sliced accordingly: url[6:]
#####
elif url.startswith("raw://") or url.startswith("raw:"):
# Process raw HTML content
raw_html = url[4:] if url[:4] == "raw:" else url[7:]
# raw_html = url[4:] if url[:4] == "raw:" else url[7:]
raw_html = url[6:] if url.startswith("raw://") else url[4:]
html = raw_html
if config.screenshot:
screenshot_data = await self._generate_screenshot_from_html(html)
@@ -896,7 +902,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Handle full page scanning
if config.scan_full_page:
await self._handle_full_page_scan(page, config.scroll_delay)
# await self._handle_full_page_scan(page, config.scroll_delay)
await self._handle_full_page_scan(page, config.scroll_delay, config.max_scroll_steps)
# Execute JavaScript if provided
# if config.js_code:
@@ -1084,7 +1091,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Close the page
await page.close()
async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1):
# async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1):
async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1, max_scroll_steps: Optional[int] = None):
"""
Helper method to handle full page scanning.
@@ -1099,6 +1107,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Args:
page (Page): The Playwright page object
scroll_delay (float): The delay between page scrolls
max_scroll_steps (Optional[int]): Maximum number of scroll steps to perform. If None, scrolls until end.
"""
try:
@@ -1123,9 +1132,21 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
dimensions = await self.get_page_dimensions(page)
total_height = dimensions["height"]
scroll_step_count = 0
while current_position < total_height:
####
# NEW FEATURE: Check if we've reached the maximum allowed scroll steps
# This prevents infinite scrolling on very long pages or infinite scroll scenarios
# If max_scroll_steps is None, this check is skipped (unlimited scrolling - original behavior)
####
if max_scroll_steps is not None and scroll_step_count >= max_scroll_steps:
break
current_position = min(current_position + viewport_height, total_height)
await self.safe_scroll(page, 0, current_position, delay=scroll_delay)
# Increment the step counter for max_scroll_steps tracking
scroll_step_count += 1
# await page.evaluate(f"window.scrollTo(0, {current_position})")
# await asyncio.sleep(scroll_delay)
@@ -1575,12 +1596,31 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# then wait for the new page to load before continuing
result = None
try:
# OLD VERSION:
# result = await page.evaluate(
# f"""
# (async () => {{
# try {{
# const script_result = {script};
# return {{ success: true, result: script_result }};
# }} catch (err) {{
# return {{ success: false, error: err.toString(), stack: err.stack }};
# }}
# }})();
# """
# )
# """ NEW VERSION:
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.
# """
result = await page.evaluate(
f"""
(async () => {{
try {{
const script_result = {script};
return {{ success: true, result: script_result }};
return await (async () => {{
{script}
}})();
}} catch (err) {{
return {{ success: false, error: err.toString(), stack: err.stack }};
}}

View File

@@ -126,7 +126,6 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
check_interval: float = 1.0,
max_session_permit: int = 20,
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
memory_wait_timeout: Optional[float] = 600.0,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
@@ -137,46 +136,27 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.check_interval = check_interval
self.max_session_permit = max_session_permit
self.fairness_timeout = fairness_timeout
self.memory_wait_timeout = memory_wait_timeout
self.result_queue = asyncio.Queue()
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
self.current_memory_percent = 0.0 # Track current memory usage
self._high_memory_start_time: Optional[float] = None
async def _memory_monitor_task(self):
"""Background task to continuously monitor memory usage and update state"""
while True:
self.current_memory_percent = psutil.virtual_memory().percent
# Enter memory pressure mode if we cross the threshold
if self.current_memory_percent >= self.memory_threshold_percent:
if not self.memory_pressure_mode:
self.memory_pressure_mode = True
self._high_memory_start_time = time.time()
if self.monitor:
self.monitor.update_memory_status("PRESSURE")
else:
if self._high_memory_start_time is None:
self._high_memory_start_time = time.time()
if (
self.memory_wait_timeout is not None
and self._high_memory_start_time is not None
and time.time() - self._high_memory_start_time >= self.memory_wait_timeout
):
raise MemoryError(
"Memory usage exceeded threshold for"
f" {self.memory_wait_timeout} seconds"
)
if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent:
self.memory_pressure_mode = True
if self.monitor:
self.monitor.update_memory_status("PRESSURE")
# Exit memory pressure mode if we go below recovery threshold
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
self.memory_pressure_mode = False
self._high_memory_start_time = None
if self.monitor:
self.monitor.update_memory_status("NORMAL")
elif self.current_memory_percent < self.memory_threshold_percent:
self._high_memory_start_time = None
# In critical mode, we might need to take more drastic action
if self.current_memory_percent >= self.critical_threshold_percent:
@@ -327,7 +307,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.monitor.start()
results = []
try:
# Initialize task queue
for url in urls:
@@ -336,18 +316,11 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.monitor.add_task(task_id, url)
# Add to queue with initial priority 0, retry count 0, and current time
await self.task_queue.put((0, (url, task_id, 0, time.time())))
active_tasks = []
# Process until both queues are empty
while not self.task_queue.empty() or active_tasks:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try:
@@ -492,14 +465,8 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
active_tasks = []
completed_count = 0
total_urls = len(urls)
while completed_count < total_urls:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try:

View File

@@ -29,7 +29,7 @@ class LogLevel(Enum):
class LogColor(str, Enum):
"""Enum for log colors."""
DEBUG = "bright_black"
DEBUG = "lightblack"
INFO = "cyan"
SUCCESS = "green"
WARNING = "yellow"

File diff suppressed because it is too large Load Diff

View File

@@ -35,10 +35,9 @@ from .markdown_generation_strategy import (
)
from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig, SeedingConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .async_url_seeder import AsyncUrlSeeder
from .utils import (
sanitize_input_encode,
@@ -164,8 +163,6 @@ class AsyncWebCrawler:
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlDecorator(self)
self.arun = self._deep_handler(self.arun)
self.url_seeder: Optional[AsyncUrlSeeder] = None
async def start(self):
"""
@@ -747,94 +744,3 @@ class AsyncWebCrawler:
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
async def aseed_urls(
self,
domain_or_domains: Union[str, List[str]],
config: Optional[SeedingConfig] = None,
**kwargs
) -> Union[List[str], Dict[str, List[Union[str, Dict[str, Any]]]]]:
"""
Discovers, filters, and optionally validates URLs for a given domain(s)
using sitemaps and Common Crawl archives.
Args:
domain_or_domains: A single domain string (e.g., "iana.org") or a list of domains.
config: A SeedingConfig object to control the seeding process.
Parameters passed directly via kwargs will override those in 'config'.
**kwargs: Additional parameters (e.g., `source`, `live_check`, `extract_head`,
`pattern`, `concurrency`, `hits_per_sec`, `force_refresh`, `verbose`)
that will be used to construct or update the SeedingConfig.
Returns:
If `extract_head` is False:
- For a single domain: `List[str]` of discovered URLs.
- For multiple domains: `Dict[str, List[str]]` mapping each domain to its URLs.
If `extract_head` is True:
- For a single domain: `List[Dict[str, Any]]` where each dict contains 'url'
and 'head_data' (parsed <head> metadata).
- For multiple domains: `Dict[str, List[Dict[str, Any]]]` mapping each domain
to a list of URL data dictionaries.
Raises:
ValueError: If `domain_or_domains` is not a string or a list of strings.
Exception: Any underlying exceptions from AsyncUrlSeeder or network operations.
Example:
>>> # Discover URLs from sitemap with live check for 'example.com'
>>> result = await crawler.aseed_urls("example.com", source="sitemap", live_check=True, hits_per_sec=10)
>>> # Discover URLs from Common Crawl, extract head data for 'example.com' and 'python.org'
>>> multi_domain_result = await crawler.aseed_urls(
>>> ["example.com", "python.org"],
>>> source="cc", extract_head=True, concurrency=200, hits_per_sec=50
>>> )
"""
# Initialize AsyncUrlSeeder here if it hasn't been already
if not self.url_seeder:
# Pass the crawler's base_directory for seeder's cache management
# Pass the crawler's logger for consistent logging
self.url_seeder = AsyncUrlSeeder(
base_directory=self.crawl4ai_folder,
logger=self.logger
)
# Merge config object with direct kwargs, giving kwargs precedence
seeding_config = config.clone(**kwargs) if config else SeedingConfig.from_kwargs(kwargs)
# Ensure base_directory is set for the seeder's cache
seeding_config.base_directory = seeding_config.base_directory or self.crawl4ai_folder
# Ensure the seeder uses the crawler's logger (if not already set)
if not self.url_seeder.logger:
self.url_seeder.logger = self.logger
# Pass verbose setting if explicitly provided in SeedingConfig or kwargs
if seeding_config.verbose is not None:
self.url_seeder.logger.verbose = seeding_config.verbose
else: # Default to crawler's verbose setting
self.url_seeder.logger.verbose = self.logger.verbose
if isinstance(domain_or_domains, str):
self.logger.info(
message="Starting URL seeding for domain: {domain}",
tag="SEED",
params={"domain": domain_or_domains}
)
return await self.url_seeder.urls(
domain_or_domains,
seeding_config
)
elif isinstance(domain_or_domains, (list, tuple)):
self.logger.info(
message="Starting URL seeding for {count} domains",
tag="SEED",
params={"count": len(domain_or_domains)}
)
# AsyncUrlSeeder.many_urls directly accepts a list of domains and individual params.
return await self.url_seeder.many_urls(
domain_or_domains,
seeding_config
)
else:
raise ValueError("`domain_or_domains` must be a string or a list of strings.")

View File

@@ -1073,8 +1073,7 @@ def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config:
crawler_cfg.markdown_generator = DefaultMarkdownGenerator(
content_filter = BM25ContentFilter(
user_query=filter_conf.get("query"),
bm25_threshold=filter_conf.get("threshold", 1.0),
use_stemming=filter_conf.get("use_stemming", True),
bm25_threshold=filter_conf.get("threshold", 1.0)
)
)
elif filter_conf["type"] == "pruning":

View File

@@ -405,7 +405,6 @@ class BM25ContentFilter(RelevantContentFilter):
user_query: str = None,
bm25_threshold: float = 1.0,
language: str = "english",
use_stemming: bool = True,
):
"""
Initializes the BM25ContentFilter class, if not provided, falls back to page metadata.
@@ -417,11 +416,9 @@ class BM25ContentFilter(RelevantContentFilter):
user_query (str): User query for filtering (optional).
bm25_threshold (float): BM25 threshold for filtering (default: 1.0).
language (str): Language for stemming (default: 'english').
use_stemming (bool): Whether to apply stemming (default: True).
"""
super().__init__(user_query=user_query)
self.bm25_threshold = bm25_threshold
self.use_stemming = use_stemming
self.priority_tags = {
"h1": 5.0,
"h2": 4.0,
@@ -435,7 +432,7 @@ class BM25ContentFilter(RelevantContentFilter):
"pre": 1.5,
"th": 1.5, # Table headers
}
self.stemmer = stemmer(language) if use_stemming else None
self.stemmer = stemmer(language)
def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]:
"""
@@ -482,19 +479,13 @@ class BM25ContentFilter(RelevantContentFilter):
# for _, chunk, _, _ in candidates]
# tokenized_query = [ps.stem(word) for word in query.lower().split()]
if self.use_stemming:
tokenized_corpus = [
[self.stemmer.stemWord(word) for word in chunk.lower().split()]
for _, chunk, _, _ in candidates
]
tokenized_query = [
self.stemmer.stemWord(word) for word in query.lower().split()
]
else:
tokenized_corpus = [
chunk.lower().split() for _, chunk, _, _ in candidates
]
tokenized_query = query.lower().split()
tokenized_corpus = [
[self.stemmer.stemWord(word) for word in chunk.lower().split()]
for _, chunk, _, _ in candidates
]
tokenized_query = [
self.stemmer.stemWord(word) for word in query.lower().split()
]
# tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())]
# for _, chunk, _, _ in candidates]

View File

@@ -15,7 +15,7 @@ from .config import (
)
from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin
from urllib.parse import urljoin , urlparse
from requests.exceptions import InvalidSchema
from .utils import (
extract_metadata,
@@ -24,8 +24,7 @@ from .utils import (
get_base_domain,
extract_metadata_using_lxml,
)
from lxml import etree
from lxml import html as lhtml
from lxml import etree, html as lhtml
from typing import List
from .models import ScrapingResult, MediaItem, Link, Media, Links
import copy
@@ -130,7 +129,27 @@ class WebScrapingStrategy(ContentScrapingStrategy):
ScrapingResult: A structured result containing the scraped content.
"""
actual_url = kwargs.get("redirected_url", url)
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
# raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
effective_base_url = actual_url
try:
soup_for_base_check = BeautifulSoup(html, "html.parser")
base_tag = soup_for_base_check.find("base", href=True)
if base_tag:
base_href_val = base_tag.get("href")
if base_href_val is not None:
resolved_base_href = urljoin(actual_url, base_href_val)
parsed_resolved_base = urlparse(resolved_base_href)
if parsed_resolved_base.scheme and parsed_resolved_base.netloc:
effective_base_url = resolved_base_href
except Exception as e:
self._log(
"error",
message="Error resolving base URL: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
kwargs_for_scrap = {**kwargs, '_effective_base_url_override': effective_base_url }
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs_for_scrap)
if raw_result is None:
return ScrapingResult(
cleaned_html="",
@@ -1487,6 +1506,27 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
doc = lhtml.document_fromstring(html)
# Match BeautifulSoup's behavior of using body or full doc
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
# Determine effective base URL considering <base href="...">
base_tag_element = doc.find(".//base[@href]")
if base_tag_element is not None:
base_href_value = base_tag_element.get("href")
if base_href_value is not None:
resolved_base_href = urljoin(url, base_href_value)
parse_resolved_base_href = urlparse(resolved_base_href)
if parse_resolved_base_href.scheme and parse_resolved_base_href.netloc:
effective_base_url = resolved_base_href
self._log(
"debug",
f"Using <base href='{base_href_value}'>, resolved effective base URL for links: {effective_base_url}",
url=url, # Log against original document URL
tag="SCRAPE_BASE_URL")
else:
effective_base_url = url
self._log(
"warning",
f"<base href='{base_href_value}'> resolved to non-absolute URL '{resolved_base_href}'. Using document URL '{actual_url}' as base.",
url=url, # Log against original document URL
tag="SCRAPE_BASE_URL")
body = doc
base_domain = get_base_domain(url)

View File

@@ -227,10 +227,21 @@ class URLPatternFilter(URLFilter):
# Prefix check (/foo/*)
if self._simple_prefixes:
path = url.split("?")[0]
if any(path.startswith(p) for p in self._simple_prefixes):
result = True
self._update_stats(result)
return not result if self._reverse else result
# if any(path.startswith(p) for p in self._simple_prefixes):
# result = True
# self._update_stats(result)
# return not result if self._reverse else result
####
# Modified the prefix matching logic to ensure path boundary checking:
# - Check if the matched prefix is followed by a path separator (`/`), query parameter (`?`), fragment (`#`), or is at the end of the path
# - This ensures `/api/` only matches complete path segments, not substrings like `/apiv2/`
####
for prefix in self._simple_prefixes:
if path.startswith(prefix):
if len(path) == len(prefix) or path[len(prefix)] in ['/', '?', '#']:
result = True
self._update_stats(result)
return not result if self._reverse else result
# Complex patterns
if self._path_patterns:
@@ -337,6 +348,15 @@ class ContentTypeFilter(URLFilter):
"sqlite": "application/vnd.sqlite3",
# Placeholder
"unknown": "application/octet-stream", # Fallback for unknown file types
# php
"php": "application/x-httpd-php",
"php3": "application/x-httpd-php",
"php4": "application/x-httpd-php",
"php5": "application/x-httpd-php",
"php7": "application/x-httpd-php",
"phtml": "application/x-httpd-php",
"phps": "application/x-httpd-php-source",
}
@staticmethod

View File

@@ -10,16 +10,12 @@ CacheMode = Union['CacheModeType']
CrawlResult = Union['CrawlResultType']
CrawlerHub = Union['CrawlerHubType']
BrowserProfiler = Union['BrowserProfilerType']
# NEW: Add AsyncUrlSeederType
AsyncUrlSeeder = Union['AsyncUrlSeederType']
# Configuration types
BrowserConfig = Union['BrowserConfigType']
CrawlerRunConfig = Union['CrawlerRunConfigType']
HTTPCrawlerConfig = Union['HTTPCrawlerConfigType']
LLMConfig = Union['LLMConfigType']
# NEW: Add SeedingConfigType
SeedingConfig = Union['SeedingConfigType']
# Content scraping types
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
@@ -98,8 +94,6 @@ if TYPE_CHECKING:
from .models import CrawlResult as CrawlResultType
from .hub import CrawlerHub as CrawlerHubType
from .browser_profiler import BrowserProfiler as BrowserProfilerType
# NEW: Import AsyncUrlSeeder for type checking
from .async_url_seeder import AsyncUrlSeeder as AsyncUrlSeederType
# Configuration imports
from .async_configs import (
@@ -107,8 +101,6 @@ if TYPE_CHECKING:
CrawlerRunConfig as CrawlerRunConfigType,
HTTPCrawlerConfig as HTTPCrawlerConfigType,
LLMConfig as LLMConfigType,
# NEW: Import SeedingConfig for type checking
SeedingConfig as SeedingConfigType,
)
# Content scraping imports
@@ -192,4 +184,4 @@ if TYPE_CHECKING:
def create_llm_config(*args, **kwargs) -> 'LLMConfigType':
from .async_configs import LLMConfig
return LLMConfig(*args, **kwargs)
return LLMConfig(*args, **kwargs)

View File

@@ -15,9 +15,10 @@ from .html2text import html2text, CustomHTML2Text
from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, IMAGE_SCORE_THRESHOLD, DEFAULT_PROVIDER, PROVIDER_MODELS
import httpx
from socket import gaierror
from pathlib import Path
from pathlib import Path , PurePath
from typing import Dict, Any, List, Optional, Callable
from urllib.parse import urljoin
import requests
from requests.exceptions import InvalidSchema
import xxhash
@@ -2056,18 +2057,29 @@ def fast_format_html(html_string):
def normalize_url(href, base_url):
"""Normalize URLs to ensure consistent format"""
from urllib.parse import urljoin, urlparse
if href is None:
return None
href_str = str(href).strip()
if not href_str:
# Empty href, conventionally resolves to the base URL itself.
return base_url
# Parse base URL to get components
parsed_href = urlparse(href_str)
if parsed_href.scheme and parsed_href.scheme.lower() in ["mailto", "tel", "javascript", "data", "file"]:
# If href is already a full URL, return it as is
return href_str
parsed_base = urlparse(base_url)
if not parsed_base.scheme or not parsed_base.netloc:
raise ValueError(f"Invalid base URL format: {base_url}")
# Ensure base_url ends with a trailing slash if it's a directory path
if not base_url.endswith('/'):
base_url = base_url + '/'
# # Ensure base_url ends with a trailing slash if it's a directory path
# if not base_url.endswith('/'):
# base_url = base_url + '/'
# Use urljoin to handle all cases
normalized = urljoin(base_url, href.strip())
normalized = urljoin(base_url, href_str)
return normalized
@@ -2080,7 +2092,7 @@ def normalize_url_for_deep_crawl(href, base_url):
return None
# Use urljoin to handle relative URLs
full_url = urljoin(base_url, href.strip())
full_url = urljoin(base_url, str(href).strip())
# Parse the URL for normalization
parsed = urlparse(full_url)
@@ -2110,7 +2122,7 @@ def normalize_url_for_deep_crawl(href, base_url):
normalized = urlunparse((
parsed.scheme,
netloc,
parsed.path.rstrip('/'), # Normalize trailing slash
str(PurePath(parsed.path)).rstrip('/'), # Normalize path to remove duplicate slashes
parsed.params,
query,
fragment
@@ -2127,7 +2139,7 @@ def efficient_normalize_url_for_deep_crawl(href, base_url):
return None
# Resolve relative URLs
full_url = urljoin(base_url, href.strip())
full_url = urljoin(base_url, str(href).strip())
# Use proper URL parsing
parsed = urlparse(full_url)
@@ -2135,52 +2147,51 @@ def efficient_normalize_url_for_deep_crawl(href, base_url):
# Only perform the most critical normalizations
# 1. Lowercase hostname
# 2. Remove fragment
path = parsed.path
if len(path) > 1 and path.endswith('/'):
path = path.rstrip('/')
normalized = urlunparse((
parsed.scheme,
parsed.netloc.lower(),
parsed.path.rstrip('/'),
parsed.params,
parsed.query,
'' # Remove fragment
))
return normalized
def normalize_url_tmp(href, base_url):
"""Normalize URLs to ensure consistent format"""
# Extract protocol and domain from base URL
try:
base_parts = base_url.split("/")
protocol = base_parts[0]
domain = base_parts[2]
except IndexError:
raise ValueError(f"Invalid base URL format: {base_url}")
# def normalize_url_tmp(href, base_url):
# """Normalize URLs to ensure consistent format"""
# # Extract protocol and domain from base URL
# try:
# base_parts = base_url.split("/")
# protocol = base_parts[0]
# domain = base_parts[2]
# except IndexError:
# raise ValueError(f"Invalid base URL format: {base_url}")
# Handle special protocols
special_protocols = {"mailto:", "tel:", "ftp:", "file:", "data:", "javascript:"}
if any(href.lower().startswith(proto) for proto in special_protocols):
return href.strip()
# # Handle special protocols
# special_protocols = {"mailto:", "tel:", "ftp:", "file:", "data:", "javascript:"}
# if any(href.lower().startswith(proto) for proto in special_protocols):
# return href.strip()
# Handle anchor links
if href.startswith("#"):
return f"{base_url}{href}"
# # Handle anchor links
# if href.startswith("#"):
# return f"{base_url}{href}"
# Handle protocol-relative URLs
if href.startswith("//"):
return f"{protocol}{href}"
# # Handle protocol-relative URLs
# if href.startswith("//"):
# return f"{protocol}{href}"
# Handle root-relative URLs
if href.startswith("/"):
return f"{protocol}//{domain}{href}"
# # Handle root-relative URLs
# if href.startswith("/"):
# return f"{protocol}//{domain}{href}"
# Handle relative URLs
if not href.startswith(("http://", "https://")):
# Remove leading './' if present
href = href.lstrip("./")
return f"{protocol}//{domain}/{href}"
# # Handle relative URLs
# if not href.startswith(("http://", "https://")):
# # Remove leading './' if present
# href = href.lstrip("./")
# return f"{protocol}//{domain}/{href}"
return href.strip()
# return href.strip()
def get_base_domain(url: str) -> str:

View File

@@ -332,7 +332,7 @@ The `clone()` method:
### Key fields to note
1. **`provider`**:
- Which LLM provoder to use.
- Which LLM provider to use.
- Possible values are `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)*
2. **`api_token`**:
@@ -6705,7 +6705,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `600.0`)
4.**`memory_wait_timeout`** (`float`, default: `300.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`)

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -447,7 +447,10 @@
dragNodes: true,
dragView: true,
zoomView: true,
zoomSpeed: 0.15 // Reduced from default 1.0
mouseWheel: {
speed: 0.15, // Reduced from default 1.0
smooth: true // Enable smooth zooming
}
},
nodes: {
font: {

View File

@@ -1,43 +1,55 @@
from crawl4ai import LLMConfig
from crawl4ai import AsyncWebCrawler, LLMExtractionStrategy
import asyncio
import os
import json
from pydantic import BaseModel, Field
url = "https://openai.com/api/pricing/"
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, LLMConfig, BrowserConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from typing import Dict
import os
class OpenAIModelFee(BaseModel):
model_name: str = Field(..., description="Name of the OpenAI model.")
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
output_fee: str = Field(
..., description="Fee for output token for the OpenAI model."
output_fee: str = Field(..., description="Fee for output token for the OpenAI model.")
async def extract_structured_data_using_llm(provider: str, api_token: str = None, extra_headers: Dict[str, str] = None):
print(f"\n--- Extracting Structured Data with {provider} ---")
if api_token is None and provider != "ollama":
print(f"API token is required for {provider}. Skipping this example.")
return
browser_config = BrowserConfig(headless=True)
extra_args = {"temperature": 0, "top_p": 0.9, "max_tokens": 2000}
if extra_headers:
extra_args["extra_headers"] = extra_headers
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=1,
page_timeout=80000,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider=provider, api_token=api_token),
schema=OpenAIModelFee.model_json_schema(),
extraction_type="schema",
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
Do not miss any models in the entire content.""",
extra_args=extra_args,
),
)
async def main():
# Use AsyncWebCrawler
async with AsyncWebCrawler() as crawler:
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=url,
word_count_threshold=1,
extraction_strategy=LLMExtractionStrategy(
# provider= "openai/gpt-4o", api_token = os.getenv('OPENAI_API_KEY'),
llm_config=LLMConfig(provider="groq/llama-3.1-70b-versatile", api_token=os.getenv("GROQ_API_KEY")),
schema=OpenAIModelFee.model_json_schema(),
extraction_type="schema",
instruction="From the crawled content, extract all mentioned model names along with their "
"fees for input and output tokens. Make sure not to miss anything in the entire content. "
"One extracted model JSON format should look like this: "
'{ "model_name": "GPT-4", "input_fee": "US$10.00 / 1M tokens", "output_fee": "US$30.00 / 1M tokens" }',
),
url="https://openai.com/api/pricing/",
config=crawler_config
)
print("Success:", result.success)
model_fees = json.loads(result.extracted_content)
print(len(model_fees))
with open(".data/data.json", "w", encoding="utf-8") as f:
f.write(result.extracted_content)
print(result.extracted_content)
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(
extract_structured_data_using_llm(
provider="openai/gpt-4o", api_token=os.getenv("OPENAI_API_KEY")
)
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,807 +0,0 @@
"""
BBC Sport Research Assistant Pipeline
=====================================
This example demonstrates how URLSeeder helps create an efficient research pipeline:
1. Discover all available URLs without crawling
2. Filter and rank them based on relevance
3. Crawl only the most relevant content
4. Generate comprehensive research insights
Pipeline Steps:
1. Get user query
2. Optionally enhance query using LLM
3. Use URLSeeder to discover and rank URLs
4. Crawl top K URLs with BM25 filtering
5. Generate detailed response with citations
Requirements:
- pip install crawl4ai
- pip install litellm
- export GEMINI_API_KEY="your-api-key"
Usage:
- Run normally: python bbc_sport_research_assistant.py
- Run test mode: python bbc_sport_research_assistant.py test
Note: AsyncUrlSeeder now uses context manager for automatic cleanup.
"""
import asyncio
import json
import os
import hashlib
import pickle
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
# Rich for colored output
from rich.console import Console
from rich.text import Text
from rich.panel import Panel
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
# Crawl4AI imports
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
AsyncUrlSeeder,
SeedingConfig,
AsyncLogger
)
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# LiteLLM for AI communication
import litellm
# Initialize Rich console
console = Console()
# Get the current directory where this script is located
SCRIPT_DIR = Path(__file__).parent.resolve()
# Cache configuration - relative to script directory
CACHE_DIR = SCRIPT_DIR / "temp_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Testing limits
TESTING_MODE = True
MAX_URLS_DISCOVERY = 100 if TESTING_MODE else 1000
MAX_URLS_TO_CRAWL = 5 if TESTING_MODE else 10
def get_cache_key(prefix: str, *args) -> str:
"""Generate cache key from prefix and arguments"""
content = f"{prefix}:{'|'.join(str(arg) for arg in args)}"
return hashlib.md5(content.encode()).hexdigest()
def load_from_cache(cache_key: str) -> Optional[any]:
"""Load data from cache if exists"""
cache_path = CACHE_DIR / f"{cache_key}.pkl"
if cache_path.exists():
with open(cache_path, 'rb') as f:
return pickle.load(f)
return None
def save_to_cache(cache_key: str, data: any) -> None:
"""Save data to cache"""
cache_path = CACHE_DIR / f"{cache_key}.pkl"
with open(cache_path, 'wb') as f:
pickle.dump(data, f)
@dataclass
class ResearchConfig:
"""Configuration for research pipeline"""
# Core settings
domain: str = "www.bbc.com/sport"
max_urls_discovery: int = 100
max_urls_to_crawl: int = 10
top_k_urls: int = 10
# Scoring and filtering
score_threshold: float = 0.1
scoring_method: str = "bm25"
# Processing options
use_llm_enhancement: bool = True
extract_head_metadata: bool = True
live_check: bool = True
force_refresh: bool = False
# Crawler settings
max_concurrent_crawls: int = 5
timeout: int = 30000
headless: bool = True
# Output settings
save_json: bool = True
save_markdown: bool = True
output_dir: str = None # Will be set in __post_init__
# Development settings
test_mode: bool = False
interactive_mode: bool = False
verbose: bool = True
def __post_init__(self):
"""Adjust settings based on test mode"""
if self.test_mode:
self.max_urls_discovery = 50
self.max_urls_to_crawl = 3
self.top_k_urls = 5
# Set default output directory relative to script location
if self.output_dir is None:
self.output_dir = str(SCRIPT_DIR / "research_results")
@dataclass
class ResearchQuery:
"""Container for research query and metadata"""
original_query: str
enhanced_query: Optional[str] = None
search_patterns: List[str] = None
timestamp: str = None
@dataclass
class ResearchResult:
"""Container for research results"""
query: ResearchQuery
discovered_urls: List[Dict]
crawled_content: List[Dict]
synthesis: str
citations: List[Dict]
metadata: Dict
async def get_user_query() -> str:
"""
Get research query from user input
"""
query = input("\n🔍 Enter your research query: ")
return query.strip()
async def enhance_query_with_llm(query: str) -> ResearchQuery:
"""
Use LLM to enhance the research query:
- Extract key terms
- Generate search patterns
- Identify related topics
"""
# Check cache
cache_key = get_cache_key("enhanced_query", query)
cached_result = load_from_cache(cache_key)
if cached_result:
console.print("[dim cyan]📦 Using cached enhanced query[/dim cyan]")
return cached_result
try:
response = await litellm.acompletion(
model="gemini/gemini-2.5-flash-preview-04-17",
messages=[{
"role": "user",
"content": f"""Given this research query: "{query}"
Extract:
1. Key terms and concepts (as a list)
2. Related search terms
3. A more specific/enhanced version of the query
Return as JSON:
{{
"key_terms": ["term1", "term2"],
"related_terms": ["related1", "related2"],
"enhanced_query": "enhanced version of query"
}}"""
}],
# reasoning_effort="low",
temperature=0.3,
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
# Create search patterns
all_terms = data["key_terms"] + data["related_terms"]
patterns = [f"*{term.lower()}*" for term in all_terms]
result = ResearchQuery(
original_query=query,
enhanced_query=data["enhanced_query"],
search_patterns=patterns[:10], # Limit patterns
timestamp=datetime.now().isoformat()
)
# Cache the result
save_to_cache(cache_key, result)
return result
except Exception as e:
console.print(f"[yellow]⚠️ LLM enhancement failed: {e}[/yellow]")
# Fallback to simple tokenization
return ResearchQuery(
original_query=query,
enhanced_query=query,
search_patterns=tokenize_query_to_patterns(query),
timestamp=datetime.now().isoformat()
)
def tokenize_query_to_patterns(query: str) -> List[str]:
"""
Convert query into URL patterns for URLSeeder
Example: "AI startups funding" -> ["*ai*", "*startup*", "*funding*"]
"""
# Simple tokenization - split and create patterns
words = query.lower().split()
# Filter out common words
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'that'}
keywords = [w for w in words if w not in stop_words and len(w) > 2]
# Create patterns
patterns = [f"*{keyword}*" for keyword in keywords]
return patterns[:8] # Limit to 8 patterns
async def discover_urls(domain: str, query: str, config: ResearchConfig) -> List[Dict]:
"""
Use URLSeeder to discover and rank URLs:
1. Fetch all URLs from domain
2. Filter by patterns
3. Extract metadata (titles, descriptions)
4. Rank by BM25 relevance score
5. Return top K URLs
"""
# Check cache
cache_key = get_cache_key("discovered_urls", domain, query, config.top_k_urls)
cached_result = load_from_cache(cache_key)
if cached_result and not config.force_refresh:
console.print("[dim cyan]📦 Using cached URL discovery[/dim cyan]")
return cached_result
console.print(f"\n[cyan]🔍 Discovering URLs from {domain}...[/cyan]")
# Initialize URL seeder with context manager
async with AsyncUrlSeeder(logger=AsyncLogger(verbose=config.verbose)) as seeder:
# Configure seeding
seeding_config = SeedingConfig(
source="sitemap+cc", # Use both sitemap and Common Crawl
extract_head=config.extract_head_metadata,
query=query,
scoring_method=config.scoring_method,
score_threshold=config.score_threshold,
max_urls=config.max_urls_discovery,
live_check=config.live_check,
force=config.force_refresh
)
try:
# Discover URLs
urls = await seeder.urls(domain, seeding_config)
# Sort by relevance score (descending)
sorted_urls = sorted(
urls,
key=lambda x: x.get('relevance_score', 0),
reverse=True
)
# Take top K
top_urls = sorted_urls[:config.top_k_urls]
console.print(f"[green]✅ Discovered {len(urls)} URLs, selected top {len(top_urls)}[/green]")
# Cache the result
save_to_cache(cache_key, top_urls)
return top_urls
except Exception as e:
console.print(f"[red]❌ URL discovery failed: {e}[/red]")
return []
async def crawl_selected_urls(urls: List[str], query: str, config: ResearchConfig) -> List[Dict]:
"""
Crawl selected URLs with content filtering:
- Use AsyncWebCrawler.arun_many()
- Apply content filter
- Generate clean markdown
"""
# Extract just URLs from the discovery results
url_list = [u['url'] for u in urls if 'url' in u][:config.max_urls_to_crawl]
if not url_list:
console.print("[red]❌ No URLs to crawl[/red]")
return []
console.print(f"\n[cyan]🕷️ Crawling {len(url_list)} URLs...[/cyan]")
# Check cache for each URL
crawled_results = []
urls_to_crawl = []
for url in url_list:
cache_key = get_cache_key("crawled_content", url, query)
cached_content = load_from_cache(cache_key)
if cached_content and not config.force_refresh:
crawled_results.append(cached_content)
else:
urls_to_crawl.append(url)
if urls_to_crawl:
console.print(f"[cyan]📥 Crawling {len(urls_to_crawl)} new URLs (cached: {len(crawled_results)})[/cyan]")
# Configure markdown generator with content filter
md_generator = DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="dynamic",
min_word_threshold=10
),
)
# Configure crawler
crawler_config = CrawlerRunConfig(
markdown_generator=md_generator,
exclude_external_links=True,
excluded_tags=['nav', 'header', 'footer', 'aside'],
)
# Create crawler with browser config
async with AsyncWebCrawler(
config=BrowserConfig(
headless=config.headless,
verbose=config.verbose
)
) as crawler:
# Crawl URLs
results = await crawler.arun_many(
urls_to_crawl,
config=crawler_config,
max_concurrent=config.max_concurrent_crawls
)
# Process results
for url, result in zip(urls_to_crawl, results):
if result.success:
content_data = {
'url': url,
'title': result.metadata.get('title', ''),
'markdown': result.markdown.fit_markdown or result.markdown.raw_markdown,
'raw_length': len(result.markdown.raw_markdown),
'fit_length': len(result.markdown.fit_markdown) if result.markdown.fit_markdown else len(result.markdown.raw_markdown),
'metadata': result.metadata
}
crawled_results.append(content_data)
# Cache the result
cache_key = get_cache_key("crawled_content", url, query)
save_to_cache(cache_key, content_data)
else:
console.print(f" [red]❌ Failed: {url[:50]}... - {result.error}[/red]")
console.print(f"[green]✅ Successfully crawled {len(crawled_results)} URLs[/green]")
return crawled_results
async def generate_research_synthesis(
query: str,
crawled_content: List[Dict]
) -> Tuple[str, List[Dict]]:
"""
Use LLM to synthesize research findings:
- Analyze all crawled content
- Generate comprehensive answer
- Extract citations and references
"""
if not crawled_content:
return "No content available for synthesis.", []
console.print("\n[cyan]🤖 Generating research synthesis...[/cyan]")
# Prepare content for LLM
content_sections = []
for i, content in enumerate(crawled_content, 1):
section = f"""
SOURCE {i}:
Title: {content['title']}
URL: {content['url']}
Content Preview:
{content['markdown'][:1500]}...
"""
content_sections.append(section)
combined_content = "\n---\n".join(content_sections)
try:
response = await litellm.acompletion(
model="gemini/gemini-2.5-flash-preview-04-17",
messages=[{
"role": "user",
"content": f"""Research Query: "{query}"
Based on the following sources, provide a comprehensive research synthesis.
{combined_content}
Please provide:
1. An executive summary (2-3 sentences)
2. Key findings (3-5 bullet points)
3. Detailed analysis (2-3 paragraphs)
4. Future implications or trends
Format your response with clear sections and cite sources using [Source N] notation.
Keep the total response under 800 words."""
}],
# reasoning_effort="medium",
temperature=0.7
)
synthesis = response.choices[0].message.content
# Extract citations from the synthesis
citations = []
for i, content in enumerate(crawled_content, 1):
if f"[Source {i}]" in synthesis or f"Source {i}" in synthesis:
citations.append({
'source_id': i,
'title': content['title'],
'url': content['url']
})
return synthesis, citations
except Exception as e:
console.print(f"[red]❌ Synthesis generation failed: {e}[/red]")
# Fallback to simple summary
summary = f"Research on '{query}' found {len(crawled_content)} relevant articles:\n\n"
for content in crawled_content[:3]:
summary += f"- {content['title']}\n {content['url']}\n\n"
return summary, []
def format_research_output(result: ResearchResult) -> str:
"""
Format the final research output with:
- Executive summary
- Key findings
- Detailed analysis
- Citations and sources
"""
output = []
output.append("\n" + "=" * 60)
output.append("🔬 RESEARCH RESULTS")
output.append("=" * 60)
# Query info
output.append(f"\n📋 Query: {result.query.original_query}")
if result.query.enhanced_query != result.query.original_query:
output.append(f" Enhanced: {result.query.enhanced_query}")
# Discovery stats
output.append(f"\n📊 Statistics:")
output.append(f" - URLs discovered: {len(result.discovered_urls)}")
output.append(f" - URLs crawled: {len(result.crawled_content)}")
output.append(f" - Processing time: {result.metadata.get('duration', 'N/A')}")
# Synthesis
output.append(f"\n📝 SYNTHESIS")
output.append("-" * 60)
output.append(result.synthesis)
# Citations
if result.citations:
output.append(f"\n📚 SOURCES")
output.append("-" * 60)
for citation in result.citations:
output.append(f"[{citation['source_id']}] {citation['title']}")
output.append(f" {citation['url']}")
return "\n".join(output)
async def save_research_results(result: ResearchResult, config: ResearchConfig) -> Tuple[str, str]:
"""
Save research results in JSON and Markdown formats
Returns:
Tuple of (json_path, markdown_path)
"""
# Create output directory
output_dir = Path(config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Generate filename based on query and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
query_slug = result.query.original_query[:50].replace(" ", "_").replace("/", "_")
base_filename = f"{timestamp}_{query_slug}"
json_path = None
md_path = None
# Save JSON
if config.save_json:
json_path = output_dir / f"{base_filename}.json"
with open(json_path, 'w') as f:
json.dump(asdict(result), f, indent=2, default=str)
console.print(f"\n[green]💾 JSON saved: {json_path}[/green]")
# Save Markdown
if config.save_markdown:
md_path = output_dir / f"{base_filename}.md"
# Create formatted markdown
md_content = [
f"# Research Report: {result.query.original_query}",
f"\n**Generated on:** {result.metadata.get('timestamp', 'N/A')}",
f"\n**Domain:** {result.metadata.get('domain', 'N/A')}",
f"\n**Processing time:** {result.metadata.get('duration', 'N/A')}",
"\n---\n",
"## Query Information",
f"- **Original Query:** {result.query.original_query}",
f"- **Enhanced Query:** {result.query.enhanced_query or 'N/A'}",
f"- **Search Patterns:** {', '.join(result.query.search_patterns or [])}",
"\n## Statistics",
f"- **URLs Discovered:** {len(result.discovered_urls)}",
f"- **URLs Crawled:** {len(result.crawled_content)}",
f"- **Sources Cited:** {len(result.citations)}",
"\n## Research Synthesis\n",
result.synthesis,
"\n## Sources\n"
]
# Add citations
for citation in result.citations:
md_content.append(f"### [{citation['source_id']}] {citation['title']}")
md_content.append(f"- **URL:** [{citation['url']}]({citation['url']})")
md_content.append("")
# Add discovered URLs summary
md_content.extend([
"\n## Discovered URLs (Top 10)\n",
"| Score | URL | Title |",
"|-------|-----|-------|"
])
for url_data in result.discovered_urls[:10]:
score = url_data.get('relevance_score', 0)
url = url_data.get('url', '')
title = 'N/A'
if 'head_data' in url_data and url_data['head_data']:
title = url_data['head_data'].get('title', 'N/A')[:60] + '...'
md_content.append(f"| {score:.3f} | {url[:50]}... | {title} |")
# Write markdown
with open(md_path, 'w') as f:
f.write('\n'.join(md_content))
console.print(f"[green]📄 Markdown saved: {md_path}[/green]")
return str(json_path) if json_path else None, str(md_path) if md_path else None
async def wait_for_user(message: str = "\nPress Enter to continue..."):
"""Wait for user input in interactive mode"""
input(message)
async def research_pipeline(
query: str,
config: ResearchConfig
) -> ResearchResult:
"""
Main research pipeline orchestrator with configurable settings
"""
start_time = datetime.now()
# Display pipeline header
header = Panel(
f"[bold cyan]Research Pipeline[/bold cyan]\n\n"
f"[dim]Domain:[/dim] {config.domain}\n"
f"[dim]Mode:[/dim] {'Test' if config.test_mode else 'Production'}\n"
f"[dim]Interactive:[/dim] {'Yes' if config.interactive_mode else 'No'}",
title="🚀 Starting",
border_style="cyan"
)
console.print(header)
# Step 1: Enhance query (optional)
console.print(f"\n[bold cyan]📝 Step 1: Query Processing[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
if config.use_llm_enhancement:
research_query = await enhance_query_with_llm(query)
else:
research_query = ResearchQuery(
original_query=query,
enhanced_query=query,
search_patterns=tokenize_query_to_patterns(query),
timestamp=datetime.now().isoformat()
)
console.print(f" [green]✅ Query ready:[/green] {research_query.enhanced_query or query}")
# Step 2: Discover URLs
console.print(f"\n[bold cyan]🔍 Step 2: URL Discovery[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
discovered_urls = await discover_urls(
domain=config.domain,
query=research_query.enhanced_query or query,
config=config
)
if not discovered_urls:
return ResearchResult(
query=research_query,
discovered_urls=[],
crawled_content=[],
synthesis="No relevant URLs found for the given query.",
citations=[],
metadata={'duration': str(datetime.now() - start_time)}
)
console.print(f" [green]✅ Found {len(discovered_urls)} relevant URLs[/green]")
# Step 3: Crawl selected URLs
console.print(f"\n[bold cyan]🕷️ Step 3: Content Crawling[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
crawled_content = await crawl_selected_urls(
urls=discovered_urls,
query=research_query.enhanced_query or query,
config=config
)
console.print(f" [green]✅ Successfully crawled {len(crawled_content)} pages[/green]")
# Step 4: Generate synthesis
console.print(f"\n[bold cyan]🤖 Step 4: Synthesis Generation[/bold cyan]")
if config.interactive_mode:
await wait_for_user()
synthesis, citations = await generate_research_synthesis(
query=research_query.enhanced_query or query,
crawled_content=crawled_content
)
console.print(f" [green]✅ Generated synthesis with {len(citations)} citations[/green]")
# Step 5: Create result
result = ResearchResult(
query=research_query,
discovered_urls=discovered_urls,
crawled_content=crawled_content,
synthesis=synthesis,
citations=citations,
metadata={
'duration': str(datetime.now() - start_time),
'domain': config.domain,
'timestamp': datetime.now().isoformat(),
'config': asdict(config)
}
)
duration = datetime.now() - start_time
console.print(f"\n[bold green]✅ Research completed in {duration}[/bold green]")
return result
async def main():
"""
Main entry point for the BBC Sport Research Assistant
"""
# Example queries
example_queries = [
"Premier League transfer news and rumors",
"Champions League match results and analysis",
"World Cup qualifying updates",
"Football injury reports and return dates",
"Tennis grand slam tournament results"
]
# Display header
console.print(Panel.fit(
"[bold cyan]BBC Sport Research Assistant[/bold cyan]\n\n"
"This tool demonstrates efficient research using URLSeeder:\n"
"[dim]• Discover all URLs without crawling\n"
"• Filter and rank by relevance\n"
"• Crawl only the most relevant content\n"
"• Generate AI-powered insights with citations[/dim]\n\n"
f"[dim]📁 Working directory: {SCRIPT_DIR}[/dim]",
title="🔬 Welcome",
border_style="cyan"
))
# Configuration options table
config_table = Table(title="\n⚙️ Configuration Options", show_header=False, box=None)
config_table.add_column(style="bold cyan", width=3)
config_table.add_column()
config_table.add_row("1", "Quick Test Mode (3 URLs, fast)")
config_table.add_row("2", "Standard Mode (10 URLs, balanced)")
config_table.add_row("3", "Comprehensive Mode (20 URLs, thorough)")
config_table.add_row("4", "Custom Configuration")
console.print(config_table)
config_choice = input("\nSelect configuration (1-4): ").strip()
# Create config based on choice
if config_choice == "1":
config = ResearchConfig(test_mode=True, interactive_mode=False)
elif config_choice == "2":
config = ResearchConfig(max_urls_to_crawl=10, top_k_urls=10)
elif config_choice == "3":
config = ResearchConfig(max_urls_to_crawl=20, top_k_urls=20, max_urls_discovery=200)
else:
# Custom configuration
config = ResearchConfig()
config.test_mode = input("\nTest mode? (y/n): ").lower() == 'y'
config.interactive_mode = input("Interactive mode (pause between steps)? (y/n): ").lower() == 'y'
config.use_llm_enhancement = input("Use AI to enhance queries? (y/n): ").lower() == 'y'
if not config.test_mode:
try:
config.max_urls_to_crawl = int(input("Max URLs to crawl (default 10): ") or "10")
config.top_k_urls = int(input("Top K URLs to select (default 10): ") or "10")
except ValueError:
console.print("[yellow]Using default values[/yellow]")
# Display example queries
query_table = Table(title="\n📋 Example Queries", show_header=False, box=None)
query_table.add_column(style="bold cyan", width=3)
query_table.add_column()
for i, q in enumerate(example_queries, 1):
query_table.add_row(str(i), q)
console.print(query_table)
query_input = input("\nSelect a query (1-5) or enter your own: ").strip()
if query_input.isdigit() and 1 <= int(query_input) <= len(example_queries):
query = example_queries[int(query_input) - 1]
else:
query = query_input if query_input else example_queries[0]
console.print(f"\n[bold cyan]📝 Selected Query:[/bold cyan] {query}")
# Run the research pipeline
result = await research_pipeline(query=query, config=config)
# Display results
formatted_output = format_research_output(result)
# print(formatted_output)
console.print(Panel.fit(
formatted_output,
title="🔬 Research Results",
border_style="green"
))
# Save results
if config.save_json or config.save_markdown:
json_path, md_path = await save_research_results(result, config)
# print(f"\n✅ Results saved successfully!")
if json_path:
console.print(f"[green]JSON saved at:[/green] {json_path}")
if md_path:
console.print(f"[green]Markdown saved at:[/green] {md_path}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
"""
Convert Crawl4AI URL Seeder tutorial markdown to Colab notebook format
"""
import json
import re
from pathlib import Path
def parse_markdown_to_cells(markdown_content):
"""Parse markdown content and convert to notebook cells"""
cells = []
# Split content by cell markers
lines = markdown_content.split('\n')
# Extract the header content before first cell marker
header_lines = []
i = 0
while i < len(lines) and not lines[i].startswith('# cell'):
header_lines.append(lines[i])
i += 1
# Add header as markdown cell if it exists
if header_lines:
header_content = '\n'.join(header_lines).strip()
if header_content:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": header_content.split('\n')
})
# Process cells marked with # cell X type:Y
current_cell_content = []
current_cell_type = None
while i < len(lines):
line = lines[i]
# Check for cell marker
cell_match = re.match(r'^# cell (\d+) type:(markdown|code)$', line)
if cell_match:
# Save previous cell if exists
if current_cell_content and current_cell_type:
content = '\n'.join(current_cell_content).strip()
if content:
if current_cell_type == 'code':
cells.append({
"cell_type": "code",
"execution_count": None,
"metadata": {},
"outputs": [],
"source": content.split('\n')
})
else:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": content.split('\n')
})
# Start new cell
current_cell_type = cell_match.group(2)
current_cell_content = []
else:
# Add line to current cell
current_cell_content.append(line)
i += 1
# Add last cell if exists
if current_cell_content and current_cell_type:
content = '\n'.join(current_cell_content).strip()
if content:
if current_cell_type == 'code':
cells.append({
"cell_type": "code",
"execution_count": None,
"metadata": {},
"outputs": [],
"source": content.split('\n')
})
else:
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": content.split('\n')
})
return cells
def create_colab_notebook(cells):
"""Create a Colab notebook structure"""
notebook = {
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Crawl4AI_URL_Seeder_Tutorial.ipynb",
"provenance": [],
"collapsed_sections": [],
"toc_visible": True
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": cells
}
return notebook
def main():
# Read the markdown file
md_path = Path("tutorial_url_seeder.md")
if not md_path.exists():
print(f"Error: {md_path} not found!")
return
print(f"Reading {md_path}...")
with open(md_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
# Parse markdown to cells
print("Parsing markdown content...")
cells = parse_markdown_to_cells(markdown_content)
print(f"Created {len(cells)} cells")
# Create notebook
print("Creating Colab notebook...")
notebook = create_colab_notebook(cells)
# Save notebook
output_path = Path("Crawl4AI_URL_Seeder_Tutorial.ipynb")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(notebook, f, indent=2, ensure_ascii=False)
print(f"✅ Successfully created {output_path}")
print(f" - Total cells: {len(cells)}")
print(f" - Markdown cells: {sum(1 for c in cells if c['cell_type'] == 'markdown')}")
print(f" - Code cells: {sum(1 for c in cells if c['cell_type'] == 'code')}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -1,263 +0,0 @@
"""
URL Seeder Demo - Interactive showcase of Crawl4AI's URL discovery capabilities
This demo shows:
1. Basic URL discovery from sitemaps and Common Crawl
2. Cache management and forced refresh
3. Live URL validation and metadata extraction
4. BM25 relevance scoring for intelligent filtering
5. Integration with AsyncWebCrawler for the complete pipeline
6. Multi-domain discovery across multiple sites
Note: The AsyncUrlSeeder now supports context manager protocol for automatic cleanup.
"""
import asyncio
import time
from datetime import datetime
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, BarColumn, TimeElapsedColumn
from rich.prompt import Prompt, Confirm
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
AsyncUrlSeeder,
SeedingConfig
)
console = Console()
console.rule("[bold green]🌐 Crawl4AI URL Seeder: Interactive Demo")
DOMAIN = "crawl4ai.com"
# Utils
def print_head_info(head_data):
table = Table(title="<head> Metadata", expand=True)
table.add_column("Key", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
if not head_data:
console.print("[yellow]No head data found.")
return
if head_data.get("title"):
table.add_row("title", head_data["title"])
if head_data.get("charset"):
table.add_row("charset", head_data["charset"])
for k, v in head_data.get("meta", {}).items():
table.add_row(f"meta:{k}", v)
for rel, items in head_data.get("link", {}).items():
for item in items:
table.add_row(f"link:{rel}", item.get("href", ""))
console.print(table)
async def section_1_basic_exploration(seed: AsyncUrlSeeder):
console.rule("[bold cyan]1. Basic Seeding")
cfg = SeedingConfig(source="cc+sitemap", pattern="*", verbose=True)
start_time = time.time()
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
p.add_task(description="Fetching from Common Crawl + Sitemap...", total=None)
urls = await seed.urls(DOMAIN, cfg)
elapsed = time.time() - start_time
console.print(f"[green]✓ Fetched {len(urls)} URLs in {elapsed:.2f} seconds")
console.print(f"[dim] Speed: {len(urls)/elapsed:.0f} URLs/second[/dim]\n")
console.print("[bold]Sample URLs:[/bold]")
for u in urls[:5]:
console.print(f"{u['url']}")
async def section_2_cache_demo(seed: AsyncUrlSeeder):
console.rule("[bold cyan]2. Caching Demonstration")
console.print("[yellow]Using `force=True` to bypass cache and fetch fresh data.[/yellow]")
cfg = SeedingConfig(source="cc", pattern="*crawl4ai.com/core/*", verbose=False, force = True)
await seed.urls(DOMAIN, cfg)
async def section_3_live_head(seed: AsyncUrlSeeder):
console.rule("[bold cyan]3. Live Check + Head Extraction")
cfg = SeedingConfig(
extract_head=True,
concurrency=10,
hits_per_sec=5,
pattern="*crawl4ai.com/*",
max_urls=10,
verbose=False,
)
urls = await seed.urls(DOMAIN, cfg)
valid = [u for u in urls if u["status"] == "valid"]
console.print(f"[green]Valid: {len(valid)} / {len(urls)}")
if valid:
print_head_info(valid[0]["head_data"])
async def section_4_bm25_scoring(seed: AsyncUrlSeeder):
console.rule("[bold cyan]4. BM25 Relevance Scoring")
console.print("[yellow]Using AI-powered relevance scoring to find the most relevant content[/yellow]")
query = "markdown generation extraction strategies"
cfg = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
score_threshold=0.3, # Only URLs with >30% relevance
max_urls=20,
verbose=False
)
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
p.add_task(description=f"Searching for: '{query}'", total=None)
urls = await seed.urls(DOMAIN, cfg)
console.print(f"[green]Found {len(urls)} relevant URLs (score > 0.3)")
# Show top results with scores
table = Table(title="Top 5 Most Relevant Pages", expand=True)
table.add_column("Score", style="cyan", width=8)
table.add_column("Title", style="magenta")
table.add_column("URL", style="blue", overflow="fold")
for url in urls[:5]:
score = f"{url['relevance_score']:.2f}"
title = url['head_data'].get('title', 'No title')[:60] + "..."
table.add_row(score, title, url['url'])
console.print(table)
async def section_5_keyword_filter_to_agent(seed: AsyncUrlSeeder):
console.rule("[bold cyan]5. Complete Pipeline: Discover → Filter → Crawl")
cfg = SeedingConfig(
extract_head=True,
concurrency=20,
hits_per_sec=10,
max_urls=10,
pattern="*crawl4ai.com/*",
force=True,
)
urls = await seed.urls(DOMAIN, cfg)
keywords = ["deep crawling", "markdown", "llm"]
selected = [u for u in urls if any(k in str(u["head_data"]).lower() for k in keywords)]
console.print(f"[cyan]Selected {len(selected)} URLs with relevant keywords:")
for u in selected[:10]:
console.print("", u["url"])
console.print("\n[yellow]Passing above URLs to arun_many() LLM agent for crawling...")
async with AsyncWebCrawler(verbose=True) as crawler:
crawl_run_config = CrawlerRunConfig(
# Example crawl settings for these URLs:
only_text=True, # Just get text content
screenshot=False,
pdf=False,
word_count_threshold=50, # Only process pages with at least 50 words
stream=True,
verbose=False # Keep logs clean for arun_many in this demo
)
# Extract just the URLs from the selected results
urls_to_crawl = [u["url"] for u in selected]
# We'll stream results for large lists, but collect them here for demonstration
crawled_results_stream = await crawler.arun_many(urls_to_crawl, config=crawl_run_config)
final_crawled_data = []
async for result in crawled_results_stream:
final_crawled_data.append(result)
if len(final_crawled_data) % 5 == 0:
print(f" Processed {len(final_crawled_data)}/{len(urls_to_crawl)} URLs...")
print(f"\n Successfully crawled {len(final_crawled_data)} URLs.")
if final_crawled_data:
print("\n Example of a crawled result's URL and Markdown (first successful one):")
for result in final_crawled_data:
if result.success and result.markdown.raw_markdown:
print(f" URL: {result.url}")
print(f" Markdown snippet: {result.markdown.raw_markdown[:200]}...")
break
else:
print(" No successful crawls with markdown found.")
else:
print(" No successful crawls found.")
async def section_6_multi_domain(seed: AsyncUrlSeeder):
console.rule("[bold cyan]6. Multi-Domain Discovery")
console.print("[yellow]Discovering Python tutorials across multiple educational sites[/yellow]\n")
domains = ["docs.python.org", "realpython.com", "docs.crawl4ai.com"]
cfg = SeedingConfig(
source="sitemap",
extract_head=True,
query="python tutorial guide",
scoring_method="bm25",
score_threshold=0.2,
max_urls=5 # Per domain
)
start_time = time.time()
with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
task = p.add_task(description="Discovering across domains...", total=None)
results = await seed.many_urls(domains, cfg)
elapsed = time.time() - start_time
total_urls = sum(len(urls) for urls in results.values())
console.print(f"[green]✓ Found {total_urls} relevant URLs across {len(domains)} domains in {elapsed:.2f}s\n")
# Show results per domain
for domain, urls in results.items():
console.print(f"[bold]{domain}:[/bold] {len(urls)} relevant pages")
if urls:
top = urls[0]
console.print(f" Top result: [{top['relevance_score']:.2f}] {top['head_data'].get('title', 'No title')}")
async def main():
async with AsyncUrlSeeder() as seed:
# Interactive menu
sections = {
"1": ("Basic URL Discovery", section_1_basic_exploration),
"2": ("Cache Management Demo", section_2_cache_demo),
"3": ("Live Check & Metadata Extraction", section_3_live_head),
"4": ("BM25 Relevance Scoring", section_4_bm25_scoring),
"5": ("Complete Pipeline (Discover → Filter → Crawl)", section_5_keyword_filter_to_agent),
"6": ("Multi-Domain Discovery", section_6_multi_domain),
"7": ("Run All Demos", None)
}
console.print("\n[bold]Available Demos:[/bold]")
for key, (title, _) in sections.items():
console.print(f" {key}. {title}")
choice = Prompt.ask("\n[cyan]Which demo would you like to run?[/cyan]",
choices=list(sections.keys()),
default="7")
console.print()
if choice == "7":
# Run all demos
for key, (title, func) in sections.items():
if key != "7" and func:
await func(seed)
if key != "6": # Don't pause after the last demo
if not Confirm.ask("\n[yellow]Continue to next demo?[/yellow]", default=True):
break
console.print()
else:
# Run selected demo
_, func = sections[choice]
await func(seed)
console.rule("[bold green]Demo Complete ✔︎")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,128 +0,0 @@
"""
🚀 URL Seeder + AsyncWebCrawler = Magic!
Quick demo showing discovery → filter → crawl pipeline
Note: Uses context manager for automatic cleanup of resources.
"""
import asyncio, os
from crawl4ai import AsyncUrlSeeder, AsyncWebCrawler, SeedingConfig, CrawlerRunConfig, AsyncLogger, DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import PruningContentFilter
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# 🔍 Example 1: Discover ALL → Filter → Crawl
async def discover_and_crawl():
"""Find Python module tutorials & extract them all!"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
# Step 1: See how many URLs exist (spoiler: A LOT!)
print("📊 Let's see what RealPython has...")
all_urls = await seeder.urls("realpython.com",
SeedingConfig(source="sitemap"))
print(f"😱 Found {len(all_urls)} total URLs!")
# Step 2: Filter for Python modules (perfect size ~13)
print("\n🎯 Filtering for 'python-modules' tutorials...")
module_urls = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*python-modules*",
live_check=True # Make sure they're alive!
))
print(f"✨ Found {len(module_urls)} module tutorials")
for url in module_urls[:3]: # Show first 3
status = "" if url["status"] == "valid" else ""
print(f"{status} {url['url']}")
# Step 3: Crawl them all with pruning (keep it lean!)
print("\n🕷️ Crawling all module tutorials...")
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter( # Smart filtering!
threshold=0.48, # Remove fluff
threshold_type="fixed",
),
),
only_text=True,
stream=True,
)
# Extract just the URLs from the seeder results
urls_to_crawl = [u["url"] for u in module_urls[:5]]
results = await crawler.arun_many(urls_to_crawl, config=config)
# Process & save
saved = 0
async for result in results:
if result.success:
# Save each tutorial (name from URL)
name = result.url.split("/")[-2] + ".md"
name = os.path.join(CURRENT_DIR, name)
with open(name, "w") as f:
f.write(result.markdown.fit_markdown)
saved += 1
print(f"💾 Saved: {name}")
print(f"\n🎉 Successfully saved {saved} tutorials!")
# 🔍 Example 2: Beautiful Soup articles with metadata peek
async def explore_beautifulsoup():
"""Discover BeautifulSoup content & peek at metadata"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
print("🍲 Looking for Beautiful Soup articles...")
soup_urls = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*beautiful-soup*",
extract_head=True # Get the metadata!
))
print(f"\n📚 Found {len(soup_urls)} Beautiful Soup articles:\n")
# Show what we discovered
for i, url in enumerate(soup_urls, 1):
meta = url["head_data"]["meta"]
print(f"{i}. {url['head_data']['title']}")
print(f" 📝 {meta.get('description', 'No description')[:60]}...")
print(f" 👤 By: {meta.get('author', 'Unknown')}")
print(f" 🔗 {url['url']}\n")
# 🔍 Example 3: Smart search with BM25 relevance scoring
async def smart_search_with_bm25():
"""Use AI-powered relevance scoring to find the best content"""
async with AsyncUrlSeeder(logger=AsyncLogger()) as seeder:
print("🧠 Smart search: 'web scraping tutorial quiz'")
# Search with BM25 scoring - AI finds the best matches!
results = await seeder.urls("realpython.com",
SeedingConfig(
source="sitemap",
pattern="*beautiful-soup*",
extract_head=True,
query="web scraping tutorial quiz", # Our search
scoring_method="bm25",
score_threshold=0.2 # Quality filter
))
print(f"\n🎯 Top {len(results)} most relevant results:\n")
# Show ranked results with relevance scores
for i, result in enumerate(results[:3], 1):
print(f"{i}. [{result['relevance_score']:.2f}] {result['head_data']['title']}")
print(f" 🔗 {result['url'][:60]}...")
print("\n✨ BM25 automatically ranked by relevance!")
# 🎬 Run the show!
async def main():
print("=" * 60)
await discover_and_crawl()
print("\n" + "=" * 60 + "\n")
await explore_beautifulsoup()
print("\n" + "=" * 60 + "\n")
await smart_search_with_bm25()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -172,7 +172,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `600.0`)
4.**`memory_wait_timeout`** (`float`, default: `300.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -0,0 +1,204 @@
Okay, here is the Markdown documentation for `PDFCrawlerStrategy` and `PDFContentScrapingStrategy`, formatted for an MkDocs site.
# PDF Processing Strategies
Crawl4AI provides specialized strategies for handling and extracting content from PDF files. These strategies allow you to seamlessly integrate PDF processing into your crawling workflows, whether the PDFs are hosted online or stored locally.
## `PDFCrawlerStrategy`
### Overview
`PDFCrawlerStrategy` is an implementation of `AsyncCrawlerStrategy` designed specifically for PDF documents. Instead of interpreting the input URL as an HTML webpage, this strategy treats it as a pointer to a PDF file. It doesn't perform deep crawling or HTML parsing itself but rather prepares the PDF source for a dedicated PDF scraping strategy. Its primary role is to identify the PDF source (web URL or local file) and pass it along the processing pipeline in a way that `AsyncWebCrawler` can handle.
### When to Use
Use `PDFCrawlerStrategy` when you need to:
- Process PDF files using the `AsyncWebCrawler`.
- Handle PDFs from both web URLs (e.g., `https://example.com/document.pdf`) and local file paths (e.g., `file:///path/to/your/document.pdf`).
- Integrate PDF content extraction into a unified `CrawlResult` object, allowing consistent handling of PDF data alongside web page data.
### Key Methods and Their Behavior
- **`__init__(self, logger: AsyncLogger = None)`**:
- Initializes the strategy.
- `logger`: An optional `AsyncLogger` instance (from `crawl4ai.async_logger`) for logging purposes.
- **`async crawl(self, url: str, **kwargs) -> AsyncCrawlResponse`**:
- This method is called by the `AsyncWebCrawler` during the `arun` process.
- It takes the `url` (which should point to a PDF) and creates a minimal `AsyncCrawlResponse`.
- The `html` attribute of this response is typically empty or a placeholder, as the actual PDF content processing is deferred to the `PDFContentScrapingStrategy` (or a similar PDF-aware scraping strategy).
- It sets `response_headers` to indicate "application/pdf" and `status_code` to 200.
- **`async close(self)`**:
- A method for cleaning up any resources used by the strategy. For `PDFCrawlerStrategy`, this is usually minimal.
- **`async __aenter__(self)` / `async __aexit__(self, exc_type, exc_val, exc_tb)`**:
- Enables asynchronous context management for the strategy, allowing it to be used with `async with`.
### Example Usage
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.processors.pdf import PDFCrawlerStrategy, PDFContentScrapingStrategy
async def main():
# Initialize the PDF crawler strategy
pdf_crawler_strategy = PDFCrawlerStrategy()
# PDFCrawlerStrategy is typically used in conjunction with PDFContentScrapingStrategy
# The scraping strategy handles the actual PDF content extraction
pdf_scraping_strategy = PDFContentScrapingStrategy()
run_config = CrawlerRunConfig(scraping_strategy=pdf_scraping_strategy)
async with AsyncWebCrawler(crawler_strategy=pdf_crawler_strategy) as crawler:
# Example with a remote PDF URL
pdf_url = "https://arxiv.org/pdf/2310.06825.pdf" # A public PDF from arXiv
print(f"Attempting to process PDF: {pdf_url}")
result = await crawler.arun(url=pdf_url, config=run_config)
if result.success:
print(f"Successfully processed PDF: {result.url}")
print(f"Metadata Title: {result.metadata.get('title', 'N/A')}")
# Further processing of result.markdown, result.media, etc.
# would be done here, based on what PDFContentScrapingStrategy extracts.
if result.markdown and hasattr(result.markdown, 'raw_markdown'):
print(f"Extracted text (first 200 chars): {result.markdown.raw_markdown[:200]}...")
else:
print("No markdown (text) content extracted.")
else:
print(f"Failed to process PDF: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
```
### Pros and Cons
**Pros:**
- Enables `AsyncWebCrawler` to handle PDF sources directly using familiar `arun` calls.
- Provides a consistent interface for specifying PDF sources (URLs or local paths).
- Abstracts the source handling, allowing a separate scraping strategy to focus on PDF content parsing.
**Cons:**
- Does not perform any PDF data extraction itself; it strictly relies on a compatible scraping strategy (like `PDFContentScrapingStrategy`) to process the PDF.
- Has limited utility on its own; most of its value comes from being paired with a PDF-specific content scraping strategy.
---
## `PDFContentScrapingStrategy`
### Overview
`PDFContentScrapingStrategy` is an implementation of `ContentScrapingStrategy` designed to extract text, metadata, and optionally images from PDF documents. It is intended to be used in conjunction with a crawler strategy that can provide it with a PDF source, such as `PDFCrawlerStrategy`. This strategy uses the `NaivePDFProcessorStrategy` internally to perform the low-level PDF parsing.
### When to Use
Use `PDFContentScrapingStrategy` when your `AsyncWebCrawler` (often configured with `PDFCrawlerStrategy`) needs to:
- Extract textual content page by page from a PDF document.
- Retrieve standard metadata embedded within the PDF (e.g., title, author, subject, creation date, page count).
- Optionally, extract images contained within the PDF pages. These images can be saved to a local directory or made available for further processing.
- Produce a `ScrapingResult` that can be converted into a `CrawlResult`, making PDF content accessible in a manner similar to HTML web content (e.g., text in `result.markdown`, metadata in `result.metadata`).
### Key Configuration Attributes
When initializing `PDFContentScrapingStrategy`, you can configure its behavior using the following attributes:
- **`extract_images: bool = False`**: If `True`, the strategy will attempt to extract images from the PDF.
- **`save_images_locally: bool = False`**: If `True` (and `extract_images` is also `True`), extracted images will be saved to disk in the `image_save_dir`. If `False`, image data might be available in another form (e.g., base64, depending on the underlying processor) but not saved as separate files by this strategy.
- **`image_save_dir: str = None`**: Specifies the directory where extracted images should be saved if `save_images_locally` is `True`. If `None`, a default or temporary directory might be used.
- **`batch_size: int = 4`**: Defines how many PDF pages are processed in a single batch. This can be useful for managing memory when dealing with very large PDF documents.
- **`logger: AsyncLogger = None`**: An optional `AsyncLogger` instance for logging.
### Key Methods and Their Behavior
- **`__init__(self, save_images_locally: bool = False, extract_images: bool = False, image_save_dir: str = None, batch_size: int = 4, logger: AsyncLogger = None)`**:
- Initializes the strategy with configurations for image handling, batch processing, and logging. It sets up an internal `NaivePDFProcessorStrategy` instance which performs the actual PDF parsing.
- **`scrap(self, url: str, html: str, **params) -> ScrapingResult`**:
- This is the primary synchronous method called by the crawler (via `ascrap`) to process the PDF.
- `url`: The path or URL to the PDF file (provided by `PDFCrawlerStrategy` or similar).
- `html`: Typically an empty string when used with `PDFCrawlerStrategy`, as the content is a PDF, not HTML.
- It first ensures the PDF is accessible locally (downloads it to a temporary file if `url` is remote).
- It then uses its internal PDF processor to extract text, metadata, and images (if configured).
- The extracted information is compiled into a `ScrapingResult` object:
- `cleaned_html`: Contains an HTML-like representation of the PDF, where each page's content is often wrapped in a `<div>` with page number information.
- `media`: A dictionary where `media["images"]` will contain information about extracted images if `extract_images` was `True`.
- `links`: A dictionary where `links["urls"]` can contain URLs found within the PDF content.
- `metadata`: A dictionary holding PDF metadata (e.g., title, author, num_pages).
- **`async ascrap(self, url: str, html: str, **kwargs) -> ScrapingResult`**:
- The asynchronous version of `scrap`. Under the hood, it typically runs the synchronous `scrap` method in a separate thread using `asyncio.to_thread` to avoid blocking the event loop.
- **`_get_pdf_path(self, url: str) -> str`**:
- A private helper method to manage PDF file access. If the `url` is remote (http/https), it downloads the PDF to a temporary local file and returns its path. If `url` indicates a local file (`file://` or a direct path), it resolves and returns the local path.
### Example Usage
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.processors.pdf import PDFCrawlerStrategy, PDFContentScrapingStrategy
import os # For creating image directory
async def main():
# Define the directory for saving extracted images
image_output_dir = "./my_pdf_images"
os.makedirs(image_output_dir, exist_ok=True)
# Configure the PDF content scraping strategy
# Enable image extraction and specify where to save them
pdf_scraping_cfg = PDFContentScrapingStrategy(
extract_images=True,
save_images_locally=True,
image_save_dir=image_output_dir,
batch_size=2 # Process 2 pages at a time for demonstration
)
# The PDFCrawlerStrategy is needed to tell AsyncWebCrawler how to "crawl" a PDF
pdf_crawler_cfg = PDFCrawlerStrategy()
# Configure the overall crawl run
run_cfg = CrawlerRunConfig(
scraping_strategy=pdf_scraping_cfg # Use our PDF scraping strategy
)
# Initialize the crawler with the PDF-specific crawler strategy
async with AsyncWebCrawler(crawler_strategy=pdf_crawler_cfg) as crawler:
pdf_url = "https://arxiv.org/pdf/2310.06825.pdf" # Example PDF
print(f"Starting PDF processing for: {pdf_url}")
result = await crawler.arun(url=pdf_url, config=run_cfg)
if result.success:
print("\n--- PDF Processing Successful ---")
print(f"Processed URL: {result.url}")
print("\n--- Metadata ---")
for key, value in result.metadata.items():
print(f" {key.replace('_', ' ').title()}: {value}")
if result.markdown and hasattr(result.markdown, 'raw_markdown'):
print(f"\n--- Extracted Text (Markdown Snippet) ---")
print(result.markdown.raw_markdown[:500].strip() + "...")
else:
print("\nNo text (markdown) content extracted.")
if result.media and result.media.get("images"):
print(f"\n--- Image Extraction ---")
print(f"Extracted {len(result.media['images'])} image(s).")
for i, img_info in enumerate(result.media["images"][:2]): # Show info for first 2 images
print(f" Image {i+1}:")
print(f" Page: {img_info.get('page')}")
print(f" Format: {img_info.get('format', 'N/A')}")
if img_info.get('path'):
print(f" Saved at: {img_info.get('path')}")
else:
print("\nNo images were extracted (or extract_images was False).")
else:
print(f"\n--- PDF Processing Failed ---")
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
```
### Pros and Cons
**Pros:**
- Provides a comprehensive way to extract text, metadata, and (optionally) images from PDF documents.
- Handles both remote PDFs (via URL) and local PDF files.
- Configurable image extraction allows saving images to disk or accessing their data.
- Integrates smoothly with the `CrawlResult` object structure, making PDF-derived data accessible in a way consistent with web-scraped data.
- The `batch_size` parameter can help in managing memory consumption when processing large or numerous PDF pages.
**Cons:**
- Extraction quality and performance can vary significantly depending on the PDF's complexity, encoding, and whether it's image-based (scanned) or text-based.
- Image extraction can be resource-intensive (both CPU and disk space if `save_images_locally` is true).
- Relies on `NaivePDFProcessorStrategy` internally, which might have limitations with very complex layouts, encrypted PDFs, or forms compared to more sophisticated PDF parsing libraries. Scanned PDFs will not yield text unless an OCR step is performed (which is not part of this strategy by default).
- Link extraction from PDFs can be basic and depends on how hyperlinks are embedded in the document.

View File

@@ -259,7 +259,7 @@ LLMConfig is useful to pass LLM provider config to strategies and functions that
## 3.1 Parameters
| **Parameter** | **Type / Default** | **What It Does** |
|-----------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| **`provider`** | `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)* | Which LLM provoder to use.
| **`provider`** | `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)* | Which LLM provider to use.
| **`api_token`** |1.Optional. When not provided explicitly, api_token will be read from environment variables based on provider. For example: If a gemini model is passed as provider then,`"GEMINI_API_KEY"` will be read from environment variables <br/> 2. API token of LLM provider <br/> eg: `api_token = "gsk_1ClHGGJ7Lpn4WGybR7vNWGdyb3FY7zXEw3SCiy0BAVM9lL8CQv"` <br/> 3. Environment variable - use with prefix "env:" <br/> eg:`api_token = "env: GROQ_API_KEY"` | API token to use for the given provider
| **`base_url`** |Optional. Custom API endpoint | If your provider has a custom endpoint

View File

@@ -6732,7 +6732,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `600.0`)
4.**`memory_wait_timeout`** (`float`, default: `300.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -16,10 +16,22 @@
--mono-font-stack: Menlo, Monaco, Lucida Console, Liberation Mono, DejaVu Sans Mono, Bitstream Vera Sans Mono,
Courier New, monospace, serif;
--background-color: #151515; /* Dark background */
--font-color: #eaeaea; /* Light font color for contrast */
--invert-font-color: #151515; /* Dark color for inverted elements */
--primary-color: #1a95e0; /* Primary color can remain the same or be adjusted for better contrast */
--secondary-color: #727578; /* Secondary color for less important text */
--secondary-dimmed-color: #8b857a; /* Dimmed secondary color */
--error-color: #ff5555; /* Bright color for errors */
--progress-bar-background: #444; /* Darker background for progress bar */
--progress-bar-fill: #1a95e0; /* Bright color for progress bar fill */
--code-bg-color: #1e1e1e; /* Darker background for code blocks */
--input-style: solid; /* Keeping input style solid */
--block-background-color: #202020; /* Darker background for block elements */
--global-font-color: #eaeaea; /* Light font color for global elements */
--background-color: #222225;
--background-color: #070708;
--page-width: 70em;
--font-color: #e8e9ed;
@@ -28,7 +40,7 @@
--secondary-color: #d5cec0;
--tertiary-color: #a3abba;
--primary-dimmed-color: #09b5a5; /* Updated to the brand color */
--primary-color: #0fbbaa; /* Updated to the brand color */
--primary-color: #50ffff; /* Updated to the brand color */
--accent-color: rgb(243, 128, 245);
--error-color: #ff3c74;
--progress-bar-background: #3f3f44;

View File

@@ -252,7 +252,7 @@ The `clone()` method:
### Key fields to note
1. **`provider`**:
- Which LLM provoder to use.
- Which LLM provider to use.
- Possible values are `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)*
2. **`api_token`**:

View File

@@ -200,7 +200,7 @@ config = CrawlerRunConfig(markdown_generator=md_generator)
- **`user_query`**: The term you want to focus on. BM25 tries to keep only content blocks relevant to that query.
- **`bm25_threshold`**: Raise it to keep fewer blocks; lower it to keep more.
- **`use_stemming`** *(default `True`)*: If enabled, variations of words match (e.g., “learn,” “learning,” “learnt”).
- **`use_stemming`**: If `True`, variations of words match (e.g., “learn,” “learning,” “learnt”).
**No query provided?** BM25 tries to glean a context from page metadata, or you can simply treat it as a scorched-earth approach that discards text with low generic score. Realistically, you want to supply a query for best results.

File diff suppressed because it is too large Load Diff

View File

@@ -218,7 +218,7 @@ import json
import asyncio
from typing import List
from pydantic import BaseModel, Field
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, LLMConfig
from crawl4ai.extraction_strategy import LLMExtractionStrategy
class Entity(BaseModel):
@@ -238,8 +238,8 @@ class KnowledgeGraph(BaseModel):
async def main():
# LLM extraction strategy
llm_strat = LLMExtractionStrategy(
llmConfig = LlmConfig(provider="openai/gpt-4", api_token=os.getenv('OPENAI_API_KEY')),
schema=KnowledgeGraph.schema_json(),
llmConfig = LLMConfig(provider="openai/gpt-4", api_token=os.getenv('OPENAI_API_KEY')),
schema=KnowledgeGraph.model_json_schema(),
extraction_type="schema",
instruction="Extract entities and relationships from the content. Return valid JSON.",
chunk_token_threshold=1400,
@@ -258,6 +258,10 @@ async def main():
url = "https://www.nbcnews.com/business"
result = await crawler.arun(url=url, config=crawl_config)
print("--- LLM RAW RESPONSE ---")
print(result.extracted_content)
print("--- END LLM RAW RESPONSE ---")
if result.success:
with open("kb_result.json", "w", encoding="utf-8") as f:
f.write(result.extracted_content)

View File

@@ -43,6 +43,7 @@ nav:
- "Identity Based Crawling": "advanced/identity-based-crawling.md"
- "SSL Certificate": "advanced/ssl-certificate.md"
- "Network & Console Capture": "advanced/network-console-capture.md"
- "PDF Parsing": "advanced/pdf-parsing.md"
- Extraction:
- "LLM-Free Strategies": "extraction/no-llm-strategies.md"
- "LLM Strategies": "extraction/llm-strategies.md"

View File

@@ -0,0 +1,75 @@
# // File: tests/deep_crawling/test_filters.py
import pytest
from urllib.parse import urlparse
from crawl4ai import ContentTypeFilter, URLFilter
# Minimal URLFilter base class stub if not already importable directly for tests
# In a real scenario, this would be imported from the library
if not hasattr(URLFilter, '_update_stats'): # Check if it's a basic stub
class URLFilter: # Basic stub for testing if needed
def __init__(self, name=None): self.name = name
def apply(self, url: str) -> bool: raise NotImplementedError
def _update_stats(self, passed: bool): pass # Mock implementation
# Assume ContentTypeFilter is structured as discussed. If its definition is not fully
# available for direct import in the test environment, a more elaborate stub or direct
# instantiation of the real class (if possible) would be needed.
# For this example, we assume ContentTypeFilter can be imported and used.
class TestContentTypeFilter:
@pytest.mark.parametrize(
"url, allowed_types, expected",
[
# Existing tests (examples)
("http://example.com/page.html", ["text/html"], True),
("http://example.com/page.json", ["application/json"], True),
("http://example.com/image.png", ["text/html"], False),
("http://example.com/document.pdf", ["application/pdf"], True),
("http://example.com/page", ["text/html"], True), # No extension, allowed
("http://example.com/page", ["text/html"], False), # No extension, disallowed
("http://example.com/page.unknown", ["text/html"], False), # Unknown extension
# Tests for PHP extensions
("http://example.com/index.php", ["application/x-httpd-php"], True),
("http://example.com/script.php3", ["application/x-httpd-php"], True),
("http://example.com/legacy.php4", ["application/x-httpd-php"], True),
("http://example.com/main.php5", ["application/x-httpd-php"], True),
("http://example.com/api.php7", ["application/x-httpd-php"], True),
("http://example.com/index.phtml", ["application/x-httpd-php"], True),
("http://example.com/source.phps", ["application/x-httpd-php-source"], True),
# Test rejection of PHP extensions
("http://example.com/index.php", ["text/html"], False),
("http://example.com/script.php3", ["text/plain"], False),
("http://example.com/source.phps", ["application/x-httpd-php"], False), # Mismatch MIME
("http://example.com/source.php", ["application/x-httpd-php-source"], False), # Mismatch MIME for .php
# Test case-insensitivity of extensions in URL
("http://example.com/PAGE.HTML", ["text/html"], True),
("http://example.com/INDEX.PHP", ["application/x-httpd-php"], True),
("http://example.com/SOURCE.PHPS", ["application/x-httpd-php-source"], True),
# Test case-insensitivity of allowed_types
("http://example.com/index.php", ["APPLICATION/X-HTTPD-PHP"], True),
],
)
def test_apply(self, url, allowed_types, expected):
content_filter = ContentTypeFilter(
allowed_types=allowed_types
)
assert content_filter.apply(url) == expected
@pytest.mark.parametrize(
"url, expected_extension",
[
("http://example.com/file.html", "html"),
("http://example.com/file.tar.gz", "gz"),
("http://example.com/path/", ""),
("http://example.com/nodot", ""),
("http://example.com/.config", "config"), # hidden file with extension
("http://example.com/path/to/archive.BIG.zip", "zip"), # Case test
]
)
def test_extract_extension(self, url, expected_extension):
# Test the static method directly
assert ContentTypeFilter._extract_extension(url) == expected_extension

View File

@@ -15,6 +15,24 @@ CRAWL4AI_HOME_DIR = Path(os.path.expanduser("~")).joinpath(".crawl4ai")
if not CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").exists():
CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").mkdir(parents=True)
@pytest.fixture
def basic_html():
return """
<html lang="en">
<head>
<title>Basic HTML</title>
</head>
<body>
<h1>Main Heading</h1>
<main>
<div class="container">
<p>Basic HTML document for testing purposes.</p>
</div>
</main>
</body>
</html>
"""
# Test Config Files
@pytest.fixture
def basic_browser_config():
@@ -325,6 +343,13 @@ async def test_stealth_mode(crawler_strategy):
)
assert response.status_code == 200
@pytest.mark.asyncio
@pytest.mark.parametrize("prefix", ("raw:", "raw://"))
async def test_raw_urls(crawler_strategy, basic_html, prefix):
url = f"{prefix}{basic_html}"
response = await crawler_strategy.crawl(url, CrawlerRunConfig())
assert response.html == basic_html
# Error Handling Tests
@pytest.mark.asyncio
async def test_invalid_url():

View File

@@ -1,711 +0,0 @@
"""
Comprehensive test cases for AsyncUrlSeeder with BM25 scoring functionality.
Tests cover all features including query-based scoring, metadata extraction,
edge cases, and integration scenarios.
"""
import asyncio
import pytest
from typing import List, Dict, Any
from crawl4ai import AsyncUrlSeeder, SeedingConfig, AsyncLogger
import json
from datetime import datetime
# Test domain - using docs.crawl4ai.com as it has the actual documentation
TEST_DOMAIN = "kidocode.com"
TEST_DOMAIN = "docs.crawl4ai.com"
TEST_DOMAIN = "www.bbc.com/sport"
class TestAsyncUrlSeederBM25:
"""Comprehensive test suite for AsyncUrlSeeder with BM25 scoring."""
async def create_seeder(self):
"""Create an AsyncUrlSeeder instance for testing."""
logger = AsyncLogger()
return AsyncUrlSeeder(logger=logger)
# ============================================
# Basic BM25 Scoring Tests
# ============================================
@pytest.mark.asyncio
async def test_basic_bm25_scoring(self, seeder):
"""Test basic BM25 scoring with a simple query."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="premier league highlights",
scoring_method="bm25",
max_urls=200,
verbose=True,
force=True # Force fresh fetch
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify results have relevance scores
assert all("relevance_score" in r for r in results)
# Verify scores are normalized between 0 and 1
scores = [r["relevance_score"] for r in results]
assert all(0.0 <= s <= 1.0 for s in scores)
# Verify results are sorted by relevance (descending)
assert scores == sorted(scores, reverse=True)
# Print top 5 results for manual verification
print("\nTop 5 results for 'web crawling tutorial':")
for i, r in enumerate(results[:5]):
print(f"{i+1}. Score: {r['relevance_score']:.3f} - {r['url']}")
@pytest.mark.asyncio
async def test_query_variations(self, seeder):
"""Test BM25 scoring with different query variations."""
queries = [
"VAR controversy",
"player ratings",
"live score update",
"transfer rumours",
"post match analysis",
"injury news"
]
for query in queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=100,
# force=True
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify each query produces scored results
assert len(results) > 0
assert all("relevance_score" in r for r in results)
print(f"\nTop result for '{query}':")
if results:
top = results[0]
print(f" Score: {top['relevance_score']:.3f} - {top['url']}")
# ============================================
# Score Threshold Tests
# ============================================
@pytest.mark.asyncio
async def test_score_threshold_filtering(self, seeder):
"""Test filtering results by minimum relevance score."""
thresholds = [0.1, 0.3, 0.5, 0.7]
for threshold in thresholds:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="league standings",
score_threshold=threshold,
scoring_method="bm25",
max_urls=50
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify all results meet threshold
if results:
assert all(r["relevance_score"] >= threshold for r in results)
print(f"\nThreshold {threshold}: {len(results)} URLs passed")
@pytest.mark.asyncio
async def test_extreme_thresholds(self, seeder):
"""Test edge cases with extreme threshold values."""
# Very low threshold - should return many results
config_low = SeedingConfig(
source="sitemap",
extract_head=True,
query="match",
score_threshold=0.001,
scoring_method="bm25"
)
results_low = await seeder.urls(TEST_DOMAIN, config_low)
# Very high threshold - might return few or no results
config_high = SeedingConfig(
source="sitemap",
extract_head=True,
query="match",
score_threshold=0.99,
scoring_method="bm25"
)
results_high = await seeder.urls(TEST_DOMAIN, config_high)
# Low threshold should return more results than high
assert len(results_low) >= len(results_high)
print(f"\nLow threshold (0.001): {len(results_low)} results")
print(f"High threshold (0.99): {len(results_high)} results")
# ============================================
# Metadata Extraction Tests
# ============================================
@pytest.mark.asyncio
async def test_comprehensive_metadata_extraction(self, seeder):
"""Test extraction of all metadata types including JSON-LD."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="match report",
scoring_method="bm25",
max_urls=5,
verbose=True
)
results = await seeder.urls(TEST_DOMAIN, config)
for result in results:
head_data = result.get("head_data", {})
# Check for various metadata fields
print(f"\nMetadata for {result['url']}:")
print(f" Title: {head_data.get('title', 'N/A')}")
print(f" Charset: {head_data.get('charset', 'N/A')}")
print(f" Lang: {head_data.get('lang', 'N/A')}")
# Check meta tags
meta = head_data.get("meta", {})
if meta:
print(" Meta tags found:")
for key in ["description", "keywords", "author", "viewport"]:
if key in meta:
print(f" {key}: {meta[key][:50]}...")
# Check for Open Graph tags
og_tags = {k: v for k, v in meta.items() if k.startswith("og:")}
if og_tags:
print(" Open Graph tags found:")
for k, v in list(og_tags.items())[:3]:
print(f" {k}: {v[:50]}...")
# Check JSON-LD
if head_data.get("jsonld"):
print(f" JSON-LD schemas found: {len(head_data['jsonld'])}")
@pytest.mark.asyncio
async def test_jsonld_extraction_scoring(self, seeder):
"""Test that JSON-LD data contributes to BM25 scoring."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="Premier League match report highlights",
scoring_method="bm25",
max_urls=20
)
results = await seeder.urls(TEST_DOMAIN, config)
# Find results with JSON-LD data
jsonld_results = [r for r in results if r.get("head_data", {}).get("jsonld")]
if jsonld_results:
print(f"\nFound {len(jsonld_results)} URLs with JSON-LD data")
for r in jsonld_results[:3]:
print(f" Score: {r['relevance_score']:.3f} - {r['url']}")
jsonld_data = r["head_data"]["jsonld"]
print(f" JSON-LD types: {[item.get('@type', 'Unknown') for item in jsonld_data if isinstance(item, dict)]}")
# ============================================
# Edge Cases and Error Handling
# ============================================
@pytest.mark.asyncio
async def test_empty_query(self, seeder):
"""Test behavior with empty query string."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="",
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
# Should return results but all with zero scores
assert len(results) > 0
assert all(r.get("relevance_score", 0) == 0 for r in results)
@pytest.mark.asyncio
async def test_query_without_extract_head(self, seeder):
"""Test query scoring when extract_head is False."""
config = SeedingConfig(
source="sitemap",
extract_head=False, # This should trigger a warning
query="Premier League match report highlights",
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
# Results should not have relevance scores
assert all("relevance_score" not in r for r in results)
print("\nVerified: No scores added when extract_head=False")
@pytest.mark.asyncio
async def test_special_characters_in_query(self, seeder):
"""Test queries with special characters and symbols."""
special_queries = [
"premier league + analytics",
"injury/rehab routines",
"AI-powered scouting",
"match stats & xG",
"tactical@breakdown",
"transfer-window.yml"
]
for query in special_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
try:
results = await seeder.urls(TEST_DOMAIN, config)
assert isinstance(results, list)
print(f"\n✓ Query '{query}' processed successfully")
except Exception as e:
pytest.fail(f"Failed on query '{query}': {str(e)}")
@pytest.mark.asyncio
async def test_unicode_query(self, seeder):
"""Test queries with Unicode characters."""
unicode_queries = [
"网页爬虫", # Chinese
"веб-краулер", # Russian
"🚀 crawl4ai", # Emoji
"naïve implementation", # Accented characters
]
for query in unicode_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
try:
results = await seeder.urls(TEST_DOMAIN, config)
assert isinstance(results, list)
print(f"\n✓ Unicode query '{query}' processed successfully")
except Exception as e:
print(f"\n✗ Unicode query '{query}' failed: {str(e)}")
# ============================================
# Performance and Scalability Tests
# ============================================
@pytest.mark.asyncio
async def test_large_scale_scoring(self, seeder):
"""Test BM25 scoring with many URLs."""
config = SeedingConfig(
source="cc+sitemap", # Use both sources for more URLs
extract_head=True,
query="world cup group standings",
scoring_method="bm25",
max_urls=100,
concurrency=20,
hits_per_sec=10
)
start_time = asyncio.get_event_loop().time()
results = await seeder.urls(TEST_DOMAIN, config)
elapsed = asyncio.get_event_loop().time() - start_time
print(f"\nProcessed {len(results)} URLs in {elapsed:.2f} seconds")
print(f"Average time per URL: {elapsed/len(results)*1000:.1f}ms")
# Verify scoring worked at scale
assert all("relevance_score" in r for r in results)
# Check score distribution
scores = [r["relevance_score"] for r in results]
print(f"Score distribution:")
print(f" Min: {min(scores):.3f}")
print(f" Max: {max(scores):.3f}")
print(f" Avg: {sum(scores)/len(scores):.3f}")
@pytest.mark.asyncio
async def test_concurrent_scoring_consistency(self, seeder):
"""Test that concurrent requests produce consistent scores."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="live score update",
scoring_method="bm25",
max_urls=20,
concurrency=10
)
# Run the same query multiple times
results_list = []
for _ in range(3):
results = await seeder.urls(TEST_DOMAIN, config)
results_list.append(results)
# Compare scores across runs (they should be identical for same URLs)
url_scores = {}
for results in results_list:
for r in results:
url = r["url"]
score = r["relevance_score"]
if url in url_scores:
# Scores should be very close (allowing for tiny float differences)
assert abs(url_scores[url] - score) < 0.001
else:
url_scores[url] = score
print(f"\n✓ Consistent scores across {len(results_list)} runs")
# ============================================
# Multi-Domain Tests
# ============================================
@pytest.mark.asyncio
async def test_many_urls_with_scoring(self, seeder):
"""Test many_urls method with BM25 scoring."""
domains = [TEST_DOMAIN, "docs.crawl4ai.com", "example.com"]
config = SeedingConfig(
source="sitemap",
extract_head=True,
# live_check=True,
query="fixture list",
scoring_method="bm25",
score_threshold=0.2,
max_urls=10,
force=True, # Force fresh fetch
)
results_dict = await seeder.many_urls(domains, config)
for domain, results in results_dict.items():
print(f"\nDomain: {domain}")
print(f" Found {len(results)} URLs above threshold")
if results:
top = results[0]
print(f" Top result: {top['relevance_score']:.3f} - {top['url']}")
# ============================================
# Complex Query Tests
# ============================================
@pytest.mark.asyncio
async def test_multi_word_complex_queries(self, seeder):
"""Test complex multi-word queries."""
complex_queries = [
"how to follow live match commentary",
"extract expected goals stats from match data",
"premier league match report analysis",
"transfer rumours and confirmed signings tracker",
"tactical breakdown of high press strategy"
]
for query in complex_queries:
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=5
)
results = await seeder.urls(TEST_DOMAIN, config)
if results:
print(f"\nQuery: '{query}'")
print(f"Top match: {results[0]['relevance_score']:.3f} - {results[0]['url']}")
# Extract matched terms from metadata
head_data = results[0].get("head_data", {})
title = head_data.get("title", "")
description = head_data.get("meta", {}).get("description", "")
# Simple term matching for verification
query_terms = set(query.lower().split())
title_terms = set(title.lower().split())
desc_terms = set(description.lower().split())
matched_terms = query_terms & (title_terms | desc_terms)
if matched_terms:
print(f"Matched terms: {', '.join(matched_terms)}")
# ============================================
# Cache and Force Tests
# ============================================
@pytest.mark.asyncio
async def test_scoring_with_cache(self, seeder):
"""Test that scoring works correctly with cached results."""
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="injury update timeline",
scoring_method="bm25",
max_urls=10,
force=False # Use cache
)
# First run - populate cache
results1 = await seeder.urls(TEST_DOMAIN, config)
# Second run - should use cache
results2 = await seeder.urls(TEST_DOMAIN, config)
# Results should be identical
assert len(results1) == len(results2)
for r1, r2 in zip(results1, results2):
assert r1["url"] == r2["url"]
assert abs(r1["relevance_score"] - r2["relevance_score"]) < 0.001
print("\n✓ Cache produces consistent scores")
@pytest.mark.asyncio
async def test_force_refresh_scoring(self, seeder):
"""Test force=True bypasses cache for fresh scoring."""
config_cached = SeedingConfig(
source="sitemap",
extract_head=True,
query="transfer window",
scoring_method="bm25",
max_urls=5,
force=False
)
config_forced = SeedingConfig(
source="sitemap",
extract_head=True,
query="transfer window",
scoring_method="bm25",
max_urls=5,
force=True
)
# Run with cache
start1 = asyncio.get_event_loop().time()
results1 = await seeder.urls(TEST_DOMAIN, config_cached)
time1 = asyncio.get_event_loop().time() - start1
# Run with force (should be slower due to fresh fetch)
start2 = asyncio.get_event_loop().time()
results2 = await seeder.urls(TEST_DOMAIN, config_forced)
time2 = asyncio.get_event_loop().time() - start2
print(f"\nCached run: {time1:.2f}s")
print(f"Forced run: {time2:.2f}s")
# Both should produce scored results
assert all("relevance_score" in r for r in results1)
assert all("relevance_score" in r for r in results2)
# ============================================
# Source Combination Tests
# ============================================
@pytest.mark.asyncio
async def test_scoring_with_multiple_sources(self, seeder):
"""Test BM25 scoring with combined sources (cc+sitemap)."""
config = SeedingConfig(
source="cc+sitemap",
extract_head=True,
query="match highlights video",
scoring_method="bm25",
score_threshold=0.3,
max_urls=30,
concurrency=15
)
results = await seeder.urls(TEST_DOMAIN, config)
# Verify we got results from both sources
print(f"\nCombined sources returned {len(results)} URLs above threshold")
# Check URL diversity
unique_paths = set()
for r in results:
path = r["url"].replace("https://", "").replace("http://", "").split("/", 1)[-1]
unique_paths.add(path.split("?")[0]) # Remove query params
print(f"Unique paths found: {len(unique_paths)}")
# All should be scored and above threshold
assert all(r["relevance_score"] >= 0.3 for r in results)
# ============================================
# Integration Tests
# ============================================
@pytest.mark.asyncio
async def test_full_workflow_integration(self, seeder):
"""Test complete workflow: discover -> score -> filter -> use."""
# Step 1: Discover and score URLs
config = SeedingConfig(
source="sitemap",
extract_head=True,
query="premier league opening fixtures",
scoring_method="bm25",
score_threshold=0.4,
max_urls=10,
verbose=True
)
results = await seeder.urls(TEST_DOMAIN, config)
print(f"\nStep 1: Found {len(results)} relevant URLs")
# Step 2: Analyze top results
if results:
top_urls = results[:3]
print("\nStep 2: Top 3 URLs for crawling:")
for i, r in enumerate(top_urls):
print(f"{i+1}. Score: {r['relevance_score']:.3f}")
print(f" URL: {r['url']}")
print(f" Title: {r['head_data'].get('title', 'N/A')}")
# Check metadata quality
meta = r['head_data'].get('meta', {})
if 'description' in meta:
print(f" Description: {meta['description'][:80]}...")
# Step 3: Verify these URLs would be good for actual crawling
assert all(r["status"] == "valid" for r in results[:3])
print("\nStep 3: All top URLs are valid for crawling ✓")
# ============================================
# Report Generation
# ============================================
@pytest.mark.asyncio
async def test_generate_scoring_report(self, seeder):
"""Generate a comprehensive report of BM25 scoring effectiveness."""
queries = {
"beginner": "match schedule",
"advanced": "tactical analysis pressing",
"api": "VAR decision explanation",
"deployment": "fixture changes due to weather",
"extraction": "expected goals statistics"
}
report = {
"timestamp": datetime.now().isoformat(),
"domain": TEST_DOMAIN,
"results": {}
}
for category, query in queries.items():
config = SeedingConfig(
source="sitemap",
extract_head=True,
query=query,
scoring_method="bm25",
max_urls=10
)
results = await seeder.urls(TEST_DOMAIN, config)
report["results"][category] = {
"query": query,
"total_results": len(results),
"top_results": [
{
"url": r["url"],
"score": r["relevance_score"],
"title": r["head_data"].get("title", "")
}
for r in results[:3]
],
"score_distribution": {
"min": min(r["relevance_score"] for r in results) if results else 0,
"max": max(r["relevance_score"] for r in results) if results else 0,
"avg": sum(r["relevance_score"] for r in results) / len(results) if results else 0
}
}
# Print report
print("\n" + "="*60)
print("BM25 SCORING EFFECTIVENESS REPORT")
print("="*60)
print(f"Domain: {report['domain']}")
print(f"Timestamp: {report['timestamp']}")
print("\nResults by Category:")
for category, data in report["results"].items():
print(f"\n{category.upper()}: '{data['query']}'")
print(f" Total results: {data['total_results']}")
print(f" Score range: {data['score_distribution']['min']:.3f} - {data['score_distribution']['max']:.3f}")
print(f" Average score: {data['score_distribution']['avg']:.3f}")
print(" Top matches:")
for i, result in enumerate(data['top_results']):
print(f" {i+1}. [{result['score']:.3f}] {result['title']}")
# ============================================
# Standalone test runner
# ============================================
async def run_all_tests():
"""Run all tests standalone (without pytest)."""
print("Running AsyncUrlSeeder BM25 Tests...")
print("="*60)
test_instance = TestAsyncUrlSeederBM25()
seeder = await test_instance.create_seeder()
# Run each test method
test_methods = [
# test_instance.test_basic_bm25_scoring,
# test_instance.test_query_variations,
# test_instance.test_score_threshold_filtering,
# test_instance.test_extreme_thresholds,
# test_instance.test_comprehensive_metadata_extraction,
# test_instance.test_jsonld_extraction_scoring,
# test_instance.test_empty_query,
# test_instance.test_query_without_extract_head,
# test_instance.test_special_characters_in_query,
# test_instance.test_unicode_query,
# test_instance.test_large_scale_scoring,
# test_instance.test_concurrent_scoring_consistency,
# test_instance.test_many_urls_with_scoring,
test_instance.test_multi_word_complex_queries,
test_instance.test_scoring_with_cache,
test_instance.test_force_refresh_scoring,
test_instance.test_scoring_with_multiple_sources,
test_instance.test_full_workflow_integration,
test_instance.test_generate_scoring_report
]
for test_method in test_methods:
try:
print(f"\nRunning {test_method.__name__}...")
await test_method(seeder)
print(f"{test_method.__name__} passed")
except Exception as e:
import traceback
print(f"{test_method.__name__} failed: {str(e)}")
print(f" Error type: {type(e).__name__}")
traceback.print_exc()
print("\n" + "="*60)
print("Test suite completed!")
if __name__ == "__main__":
# Run tests directly
asyncio.run(run_all_tests())

View File

@@ -0,0 +1,34 @@
import asyncio
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, BrowserConfig
from pathlib import Path
import os
async def test_basic_download():
# Custom folder (otherwise defaults to ~/.crawl4ai/downloads)
downloads_path = os.path.join(Path.home(), ".crawl4ai", "downloads")
os.makedirs(downloads_path, exist_ok=True)
browser_config = BrowserConfig(
accept_downloads=True,
downloads_path=downloads_path
)
async with AsyncWebCrawler(config=browser_config) as crawler:
run_config = CrawlerRunConfig(
js_code="""
const link = document.querySelector('a[href$=".exe"]');
if (link) { link.click(); }
""",
delay_before_return_html=5
)
result = await crawler.arun("https://www.python.org/downloads/", config=run_config)
if result.downloaded_files:
print("Downloaded files:")
for file_path in result.downloaded_files:
print("", file_path)
else:
print("No files downloaded.")
if __name__ == "__main__":
asyncio.run(test_basic_download())

View File

@@ -0,0 +1,115 @@
"""
Sample script to test the max_scroll_steps parameter implementation
"""
import asyncio
import os
import sys
# Get the grandparent directory
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(grandparent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig
async def test_max_scroll_steps():
"""
Test the max_scroll_steps parameter with different configurations
"""
print("🚀 Testing max_scroll_steps parameter implementation")
print("=" * 60)
async with AsyncWebCrawler(verbose=True) as crawler:
# Test 1: Without max_scroll_steps (unlimited scrolling)
print("\\n📋 Test 1: Unlimited scrolling (max_scroll_steps=None)")
config1 = CrawlerRunConfig(
scan_full_page=True,
scroll_delay=0.1,
max_scroll_steps=None, # Default behavior
verbose=True
)
print(f"Config: scan_full_page={config1.scan_full_page}, max_scroll_steps={config1.max_scroll_steps}")
try:
result1 = await crawler.arun(
url="https://example.com", # Simple page for testing
config=config1
)
print(f"✅ Test 1 Success: Crawled {len(result1.markdown)} characters")
except Exception as e:
print(f"❌ Test 1 Failed: {e}")
# Test 2: With limited scroll steps
print("\\n📋 Test 2: Limited scrolling (max_scroll_steps=3)")
config2 = CrawlerRunConfig(
scan_full_page=True,
scroll_delay=0.1,
max_scroll_steps=3, # Limit to 3 scroll steps
verbose=True
)
print(f"Config: scan_full_page={config2.scan_full_page}, max_scroll_steps={config2.max_scroll_steps}")
try:
result2 = await crawler.arun(
url="https://techcrunch.com/", # Another test page
config=config2
)
print(f"✅ Test 2 Success: Crawled {len(result2.markdown)} characters")
except Exception as e:
print(f"❌ Test 2 Failed: {e}")
# Test 3: Test serialization/deserialization
print("\\n📋 Test 3: Configuration serialization test")
config3 = CrawlerRunConfig(
scan_full_page=True,
max_scroll_steps=5,
scroll_delay=0.2
)
# Test to_dict
config_dict = config3.to_dict()
print(f"Serialized max_scroll_steps: {config_dict.get('max_scroll_steps')}")
# Test from_kwargs
config4 = CrawlerRunConfig.from_kwargs({
'scan_full_page': True,
'max_scroll_steps': 7,
'scroll_delay': 0.3
})
print(f"Deserialized max_scroll_steps: {config4.max_scroll_steps}")
print("✅ Test 3 Success: Serialization works correctly")
# Test 4: Edge case - max_scroll_steps = 0
print("\\n📋 Test 4: Edge case (max_scroll_steps=0)")
config5 = CrawlerRunConfig(
scan_full_page=True,
max_scroll_steps=0, # Should not scroll at all
verbose=True
)
try:
result5 = await crawler.arun(
url="https://techcrunch.com/",
config=config5
)
print(f"✅ Test 4 Success: No scrolling performed, crawled {len(result5.markdown)} characters")
except Exception as e:
print(f"❌ Test 4 Failed: {e}")
print("\\n" + "=" * 60)
print("🎉 All tests completed!")
print("\\nThe max_scroll_steps parameter is working correctly:")
print("- None: Unlimited scrolling (default behavior)")
print("- Positive integer: Limits scroll steps to that number")
print("- 0: No scrolling performed")
print("- Properly serializes/deserializes in config")
if __name__ == "__main__":
print("Starting max_scroll_steps test...")
asyncio.run(test_max_scroll_steps())

View File

@@ -0,0 +1,85 @@
import sys
import os
# Get the grandparent directory
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(grandparent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai.deep_crawling.filters import URLPatternFilter
def test_prefix_boundary_matching():
"""Test that prefix patterns respect path boundaries"""
print("=== Testing URLPatternFilter Prefix Boundary Fix ===")
filter_obj = URLPatternFilter(patterns=['https://langchain-ai.github.io/langgraph/*'])
test_cases = [
('https://langchain-ai.github.io/langgraph/', True),
('https://langchain-ai.github.io/langgraph/concepts/', True),
('https://langchain-ai.github.io/langgraph/tutorials/', True),
('https://langchain-ai.github.io/langgraph?param=1', True),
('https://langchain-ai.github.io/langgraph#section', True),
('https://langchain-ai.github.io/langgraphjs/', False),
('https://langchain-ai.github.io/langgraphjs/concepts/', False),
('https://other-site.com/langgraph/', False),
]
all_passed = True
for url, expected in test_cases:
result = filter_obj.apply(url)
status = "PASS" if result == expected else "FAIL"
if result != expected:
all_passed = False
print(f"{status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
return all_passed
def test_edge_cases():
"""Test edge cases for path boundary matching"""
print("\n=== Testing Edge Cases ===")
test_patterns = [
('/api/*', [
('/api/', True),
('/api/v1', True),
('/api?param=1', True),
('/apiv2/', False),
('/api_old/', False),
]),
('*/docs/*', [
('example.com/docs/', True),
('example.com/docs/guide', True),
('example.com/documentation/', False),
('example.com/docs_old/', False),
]),
]
all_passed = True
for pattern, test_cases in test_patterns:
print(f"\nPattern: {pattern}")
filter_obj = URLPatternFilter(patterns=[pattern])
for url, expected in test_cases:
result = filter_obj.apply(url)
status = "PASS" if result == expected else "FAIL"
if result != expected:
all_passed = False
print(f" {status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
return all_passed
if __name__ == "__main__":
test1_passed = test_prefix_boundary_matching()
test2_passed = test_edge_cases()
if test1_passed and test2_passed:
print("\n✅ All tests passed!")
sys.exit(0)
else:
print("\n❌ Some tests failed!")
sys.exit(1)