diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index d297dfca..972ca04e 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -6,7 +6,8 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy, Cosi from .chunking_strategy import ChunkingStrategy, RegexChunking from .markdown_generation_strategy import DefaultMarkdownGenerator from .content_filter_strategy import PruningContentFilter, BM25ContentFilter -from .models import CrawlResult +from .models import CrawlResult, MarkdownGenerationResult +from .async_dispatcher import MemoryAdaptiveDispatcher, SemaphoreDispatcher, RateLimiter, CrawlerMonitor, DisplayMode from .__version__ import __version__ __all__ = [ @@ -24,6 +25,12 @@ __all__ = [ 'DefaultMarkdownGenerator', 'PruningContentFilter', 'BM25ContentFilter', + 'MemoryAdaptiveDispatcher', + 'SemaphoreDispatcher', + 'RateLimiter', + 'CrawlerMonitor', + 'DisplayMode', + 'MarkdownGenerationResult', ] def is_sync_version_installed(): diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index ee03551f..5094d610 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -11,8 +11,7 @@ from .user_agent_generator import UserAgentGenerator from .extraction_strategy import ExtractionStrategy from .chunking_strategy import ChunkingStrategy from .markdown_generation_strategy import MarkdownGenerationStrategy -from typing import Union, List, Tuple, Optional -from dataclasses import dataclass, field +from typing import Union, List class BrowserConfig: """ @@ -184,14 +183,6 @@ class BrowserConfig: ) -@dataclass -class RateLimitConfig: - base_delay: Tuple[float, float] = (1.0, 3.0) - max_delay: float = 60.0 - max_retries: int = 3 - rate_limit_codes: List[int] = field(default_factory=lambda: [429, 503]) - - class CrawlerRunConfig: """ Configuration class for controlling how the crawler runs each crawl operation. @@ -320,14 +311,8 @@ class CrawlerRunConfig: log_console (bool): If True, log console messages from the page. Default: False. - # Dispatcher configuration - memory_threshold_percent: float = 70.0 - check_interval: float = 1.0 - max_session_permit: int = 20 - enable_rate_limiting: bool = False - rate_limit_config: Optional[RateLimitConfig] = None - display_mode: Optional[str] = None - url: str = None + # Optional Parameters + url: str = None # This is not a compulsory parameter """ def __init__( @@ -400,13 +385,6 @@ class CrawlerRunConfig: verbose: bool = True, log_console: bool = False, - # Dispatcher configuration - memory_threshold_percent: float = 70.0, - check_interval: float = 1.0, - max_session_permit: int = 20, - enable_rate_limiting: bool = False, - rate_limit_config: Optional[RateLimitConfig] = None, - display_mode: Optional[str] = None, url: str = None, ): self.url = url @@ -479,14 +457,6 @@ class CrawlerRunConfig: self.verbose = verbose self.log_console = log_console - # Dispatcher configuration - self.memory_threshold_percent = memory_threshold_percent - self.check_interval = check_interval - self.max_session_permit = max_session_permit - self.enable_rate_limiting = enable_rate_limiting - self.rate_limit_config = rate_limit_config - self.display_mode = display_mode - # Validate type of extraction strategy and chunking strategy if they are provided if self.extraction_strategy is not None and not isinstance( self.extraction_strategy, ExtractionStrategy @@ -573,13 +543,6 @@ class CrawlerRunConfig: verbose=kwargs.get("verbose", True), log_console=kwargs.get("log_console", False), - # Dispatcher configuration - memory_threshold_percent=kwargs.get("memory_threshold_percent", 70.0), - check_interval=kwargs.get("check_interval", 1.0), - max_session_permit=kwargs.get("max_session_permit", 20), - enable_rate_limiting=kwargs.get("enable_rate_limiting", False), - rate_limit_config=kwargs.get("rate_limit_config"), - display_mode=kwargs.get("display_mode"), url=kwargs.get("url"), ) @@ -638,11 +601,5 @@ class CrawlerRunConfig: "exclude_domains": self.exclude_domains, "verbose": self.verbose, "log_console": self.log_console, - "memory_threshold_percent": self.memory_threshold_percent, - "check_interval": self.check_interval, - "max_session_permit": self.max_session_permit, - "enable_rate_limiting": self.enable_rate_limiting, - "rate_limit_config": self.rate_limit_config, - "display_mode": self.display_mode, "url": self.url, } diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py new file mode 100644 index 00000000..8f5fbe81 --- /dev/null +++ b/crawl4ai/async_dispatcher.py @@ -0,0 +1,560 @@ +from typing import Dict, Optional, List +from .async_configs import * +from .models import * + +from rich.live import Live +from rich.table import Table +from rich.console import Console +from rich.style import Style +from rich import box +from datetime import datetime, timedelta +from dataclasses import dataclass + +import time +import psutil +import asyncio +import uuid + +from urllib.parse import urlparse +import random +from abc import ABC, abstractmethod + + +class RateLimiter: + def __init__( + self, + base_delay: Tuple[float, float] = (1.0, 3.0), + max_delay: float = 60.0, + max_retries: int = 3, + rate_limit_codes: List[int] = None + ): + self.base_delay = base_delay + self.max_delay = max_delay + self.max_retries = max_retries + self.rate_limit_codes = rate_limit_codes or [429, 503] + self.domains: Dict[str, DomainState] = {} + + def get_domain(self, url: str) -> str: + return urlparse(url).netloc + + async def wait_if_needed(self, url: str) -> None: + domain = self.get_domain(url) + state = self.domains.get(domain) + + if not state: + self.domains[domain] = DomainState() + state = self.domains[domain] + + now = time.time() + if state.last_request_time: + wait_time = max(0, state.current_delay - (now - state.last_request_time)) + if wait_time > 0: + await asyncio.sleep(wait_time) + + # Random delay within base range if no current delay + if state.current_delay == 0: + state.current_delay = random.uniform(*self.base_delay) + + state.last_request_time = time.time() + + def update_delay(self, url: str, status_code: int) -> bool: + domain = self.get_domain(url) + state = self.domains[domain] + + if status_code in self.rate_limit_codes: + state.fail_count += 1 + if state.fail_count > self.max_retries: + return False + + # Exponential backoff with random jitter + state.current_delay = min( + state.current_delay * 2 * random.uniform(0.75, 1.25), + self.max_delay + ) + else: + # Gradually reduce delay on success + state.current_delay = max( + random.uniform(*self.base_delay), + state.current_delay * 0.75 + ) + state.fail_count = 0 + + return True + +class CrawlerMonitor: + def __init__(self, max_visible_rows: int = 15, display_mode: DisplayMode = DisplayMode.DETAILED): + self.console = Console() + self.max_visible_rows = max_visible_rows + self.display_mode = display_mode + self.stats: Dict[str, CrawlStats] = {} + self.process = psutil.Process() + self.start_time = datetime.now() + self.live = Live(self._create_table(), refresh_per_second=2) + + def start(self): + self.live.start() + + def stop(self): + self.live.stop() + + def add_task(self, task_id: str, url: str): + self.stats[task_id] = CrawlStats(task_id=task_id, url=url, status=CrawlStatus.QUEUED) + self.live.update(self._create_table()) + + def update_task(self, task_id: str, **kwargs): + if task_id in self.stats: + for key, value in kwargs.items(): + setattr(self.stats[task_id], key, value) + self.live.update(self._create_table()) + + def _create_aggregated_table(self) -> Table: + """Creates a compact table showing only aggregated statistics""" + table = Table( + box=box.ROUNDED, + title="Crawler Status Overview", + title_style="bold magenta", + header_style="bold blue", + show_lines=True + ) + + # Calculate statistics + total_tasks = len(self.stats) + queued = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED) + in_progress = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS) + completed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED) + failed = sum(1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED) + + # Memory statistics + current_memory = self.process.memory_info().rss / (1024 * 1024) + total_task_memory = sum(stat.memory_usage for stat in self.stats.values()) + peak_memory = max((stat.peak_memory for stat in self.stats.values()), default=0.0) + + # Duration + duration = datetime.now() - self.start_time + + # Create status row + table.add_column("Status", style="bold cyan") + table.add_column("Count", justify="right") + table.add_column("Percentage", justify="right") + + table.add_row( + "Total Tasks", + str(total_tasks), + "100%" + ) + table.add_row( + "[yellow]In Queue[/yellow]", + str(queued), + f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%" + ) + table.add_row( + "[blue]In Progress[/blue]", + str(in_progress), + f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%" + ) + table.add_row( + "[green]Completed[/green]", + str(completed), + f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%" + ) + table.add_row( + "[red]Failed[/red]", + str(failed), + f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%" + ) + + # Add memory information + table.add_section() + table.add_row( + "[magenta]Current Memory[/magenta]", + f"{current_memory:.1f} MB", + "" + ) + table.add_row( + "[magenta]Total Task Memory[/magenta]", + f"{total_task_memory:.1f} MB", + "" + ) + table.add_row( + "[magenta]Peak Task Memory[/magenta]", + f"{peak_memory:.1f} MB", + "" + ) + table.add_row( + "[yellow]Runtime[/yellow]", + str(timedelta(seconds=int(duration.total_seconds()))), + "" + ) + + return table + + def _create_detailed_table(self) -> Table: + table = Table( + box=box.ROUNDED, + title="Crawler Performance Monitor", + title_style="bold magenta", + header_style="bold blue" + ) + + # Add columns + table.add_column("Task ID", style="cyan", no_wrap=True) + table.add_column("URL", style="cyan", no_wrap=True) + table.add_column("Status", style="bold") + table.add_column("Memory (MB)", justify="right") + table.add_column("Peak (MB)", justify="right") + table.add_column("Duration", justify="right") + table.add_column("Info", style="italic") + + # Add summary row + total_memory = sum(stat.memory_usage for stat in self.stats.values()) + active_count = sum(1 for stat in self.stats.values() + if stat.status == CrawlStatus.IN_PROGRESS) + completed_count = sum(1 for stat in self.stats.values() + if stat.status == CrawlStatus.COMPLETED) + failed_count = sum(1 for stat in self.stats.values() + if stat.status == CrawlStatus.FAILED) + + table.add_row( + "[bold yellow]SUMMARY", + f"Total: {len(self.stats)}", + f"Active: {active_count}", + f"{total_memory:.1f}", + f"{self.process.memory_info().rss / (1024 * 1024):.1f}", + str(timedelta(seconds=int((datetime.now() - self.start_time).total_seconds()))), + f"✓{completed_count} ✗{failed_count}", + style="bold" + ) + + table.add_section() + + # Add rows for each task + visible_stats = sorted( + self.stats.values(), + key=lambda x: ( + x.status != CrawlStatus.IN_PROGRESS, + x.status != CrawlStatus.QUEUED, + x.end_time or datetime.max + ) + )[:self.max_visible_rows] + + for stat in visible_stats: + status_style = { + CrawlStatus.QUEUED: "white", + CrawlStatus.IN_PROGRESS: "yellow", + CrawlStatus.COMPLETED: "green", + CrawlStatus.FAILED: "red" + }[stat.status] + + table.add_row( + stat.task_id[:8], # Show first 8 chars of task ID + stat.url[:40] + "..." if len(stat.url) > 40 else stat.url, + f"[{status_style}]{stat.status.value}[/{status_style}]", + f"{stat.memory_usage:.1f}", + f"{stat.peak_memory:.1f}", + stat.duration, + stat.error_message[:40] if stat.error_message else "" + ) + + return table + + def _create_table(self) -> Table: + """Creates the appropriate table based on display mode""" + if self.display_mode == DisplayMode.AGGREGATED: + return self._create_aggregated_table() + return self._create_detailed_table() + + +class BaseDispatcher(ABC): + def __init__( + self, + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None + ): + self.crawler = None + self._domain_last_hit: Dict[str, float] = {} + self.concurrent_sessions = 0 + self.rate_limiter = rate_limiter + self.monitor = monitor + + @abstractmethod + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + monitor: Optional[CrawlerMonitor] = None + ) -> CrawlerTaskResult: + pass + + @abstractmethod + async def run_urls( + self, + urls: List[str], + crawler: "AsyncWebCrawler", + config: CrawlerRunConfig, + monitor: Optional[CrawlerMonitor] = None + ) -> List[CrawlerTaskResult]: + pass + +class MemoryAdaptiveDispatcher(BaseDispatcher): + def __init__( + self, + memory_threshold_percent: float = 70.0, + check_interval: float = 1.0, + max_session_permit: int = 20, + memory_wait_timeout: float = 300.0, # 5 minutes default timeout + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None + ): + super().__init__(rate_limiter, monitor) + self.memory_threshold_percent = memory_threshold_percent + self.check_interval = check_interval + self.max_session_permit = max_session_permit + self.memory_wait_timeout = memory_wait_timeout + + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + ) -> CrawlerTaskResult: + start_time = datetime.now() + error_message = "" + memory_usage = peak_memory = 0.0 + + try: + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time) + self.concurrent_sessions += 1 + + if self.rate_limiter: + await self.rate_limiter.wait_if_needed(url) + + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + result = await self.crawler.arun(url, config=config, session_id=task_id) + end_memory = process.memory_info().rss / (1024 * 1024) + + memory_usage = peak_memory = end_memory - start_memory + + if self.rate_limiter and result.status_code: + if not self.rate_limiter.update_delay(url, result.status_code): + error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=datetime.now(), + error_message=error_message + ) + + if not result.success: + error_message = result.error_message + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + elif self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) + + except Exception as e: + error_message = str(e) + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + result = CrawlResult(url=url, html="", metadata={}, success=False, error_message=str(e)) + + finally: + end_time = datetime.now() + if self.monitor: + self.monitor.update_task( + task_id, + end_time=end_time, + memory_usage=memory_usage, + peak_memory=peak_memory, + error_message=error_message + ) + self.concurrent_sessions -= 1 + + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=end_time, + error_message=error_message + ) + + async def run_urls( + self, + urls: List[str], + crawler: "AsyncWebCrawler", + config: CrawlerRunConfig, + ) -> List[CrawlerTaskResult]: + self.crawler = crawler + + if self.monitor: + self.monitor.start() + + try: + pending_tasks = [] + active_tasks = [] + task_queue = [] + + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task_queue.append((url, task_id)) + + while task_queue or active_tasks: + wait_start_time = time.time() + while len(active_tasks) < self.max_session_permit and task_queue: + if psutil.virtual_memory().percent >= self.memory_threshold_percent: + # Check if we've exceeded the timeout + if time.time() - wait_start_time > self.memory_wait_timeout: + raise MemoryError(f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds") + await asyncio.sleep(self.check_interval) + continue + + url, task_id = task_queue.pop(0) + task = asyncio.create_task(self.crawl_url(url, config, task_id)) + active_tasks.append(task) + + if not active_tasks: + await asyncio.sleep(self.check_interval) + continue + + done, pending = await asyncio.wait( + active_tasks, + return_when=asyncio.FIRST_COMPLETED + ) + + pending_tasks.extend(done) + active_tasks = list(pending) + + return await asyncio.gather(*pending_tasks) + finally: + if self.monitor: + self.monitor.stop() + +class SemaphoreDispatcher(BaseDispatcher): + def __init__( + self, + semaphore_count: int = 5, + max_session_permit: int = 20, + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None + ): + super().__init__(rate_limiter, monitor) + self.semaphore_count = semaphore_count + self.max_session_permit = max_session_permit + + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + semaphore: asyncio.Semaphore = None + ) -> CrawlerTaskResult: + start_time = datetime.now() + error_message = "" + memory_usage = peak_memory = 0.0 + + try: + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time) + + if self.rate_limiter: + await self.rate_limiter.wait_if_needed(url) + + async with semaphore: + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + result = await self.crawler.arun(url, config=config, session_id=task_id) + end_memory = process.memory_info().rss / (1024 * 1024) + + memory_usage = peak_memory = end_memory - start_memory + + if self.rate_limiter and result.status_code: + if not self.rate_limiter.update_delay(url, result.status_code): + error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=datetime.now(), + error_message=error_message + ) + + if not result.success: + error_message = result.error_message + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + elif self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) + + except Exception as e: + error_message = str(e) + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + result = CrawlResult(url=url, html="", metadata={}, success=False, error_message=str(e)) + + finally: + end_time = datetime.now() + if self.monitor: + self.monitor.update_task( + task_id, + end_time=end_time, + memory_usage=memory_usage, + peak_memory=peak_memory, + error_message=error_message + ) + + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=end_time, + error_message=error_message + ) + + async def run_urls( + self, + crawler: "AsyncWebCrawler", + urls: List[str], + config: CrawlerRunConfig, + ) -> List[CrawlerTaskResult]: + self.crawler = crawler + if self.monitor: + self.monitor.start() + + try: + semaphore = asyncio.Semaphore(self.semaphore_count) + tasks = [] + + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task = asyncio.create_task( + self.crawl_url(url, config, task_id, semaphore) + ) + tasks.append(task) + + return await asyncio.gather(*tasks, return_exceptions=True) + finally: + if self.monitor: + self.monitor.stop() \ No newline at end of file diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 79531e38..82b96070 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -9,7 +9,7 @@ import json import asyncio # from contextlib import nullcontext, asynccontextmanager from contextlib import asynccontextmanager -from .models import CrawlResult, MarkdownGenerationResult +from .models import CrawlResult, MarkdownGenerationResult, CrawlerTaskResult from .async_database import async_db_manager from .chunking_strategy import * from .content_filter_strategy import * @@ -20,6 +20,8 @@ from .markdown_generation_strategy import DefaultMarkdownGenerator, MarkdownGene from .content_scraping_strategy import WebScrapingStrategy from .async_logger import AsyncLogger from .async_configs import BrowserConfig, CrawlerRunConfig +from .async_dispatcher import * + from .config import ( MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, @@ -675,6 +677,7 @@ class AsyncWebCrawler: self, urls: List[str], config: Optional[CrawlerRunConfig] = None, + dispatcher: Optional[BaseDispatcher] = None, # Legacy parameters maintained for backwards compatibility word_count_threshold=MIN_WORD_THRESHOLD, extraction_strategy: ExtractionStrategy = None, @@ -690,7 +693,7 @@ class AsyncWebCrawler: **kwargs, ) -> List[CrawlResult]: """ - Runs the crawler for multiple URLs concurrently using MemoryAdaptiveDispatcher. + Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy. Migration Guide: Old way (deprecated): @@ -705,84 +708,83 @@ class AsyncWebCrawler: config = CrawlerRunConfig( word_count_threshold=200, screenshot=True, - enable_rate_limiting=True, - rate_limit_config=RateLimitConfig(...), + dispatcher_config=DispatcherConfig( + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig(...), + ), ... ) - results = await crawler.arun_many(urls, config=config) + results = await crawler.arun_many( + urls, + config=config, + dispatcher_strategy=MemoryAdaptiveDispatcher # Optional, this is the default + ) Args: urls: List of URLs to crawl config: Configuration object controlling crawl behavior for all URLs + dispatcher_strategy: The dispatcher strategy class to use. Defaults to MemoryAdaptiveDispatcher. [other parameters maintained for backwards compatibility] Returns: List[CrawlResult]: Results for each URL """ - # Handle configuration - if config is not None: - if any(param is not None for param in [ - word_count_threshold, extraction_strategy, chunking_strategy, - content_filter, cache_mode, css_selector, screenshot, pdf - ]): - self.logger.warning( - message="Both config and legacy parameters provided. config will take precedence.", - tag="WARNING" - ) - else: - # Merge all parameters into a single kwargs dict for config creation - config_kwargs = { - "word_count_threshold": word_count_threshold, - "extraction_strategy": extraction_strategy, - "chunking_strategy": chunking_strategy, - "content_filter": content_filter, - "cache_mode": cache_mode, - "bypass_cache": bypass_cache, - "css_selector": css_selector, - "screenshot": screenshot, - "pdf": pdf, - "verbose": verbose, + # Create config if not provided + if config is None: + config = CrawlerRunConfig( + word_count_threshold=word_count_threshold, + extraction_strategy=extraction_strategy, + chunking_strategy=chunking_strategy, + content_filter=content_filter, + cache_mode=cache_mode, + bypass_cache=bypass_cache, + css_selector=css_selector, + screenshot=screenshot, + pdf=pdf, + verbose=verbose, **kwargs - } - config = CrawlerRunConfig.from_kwargs(config_kwargs) - - if bypass_cache: - if kwargs.get("warning", True): - warnings.warn( - "'bypass_cache' is deprecated and will be removed in version 0.5.0. " - "Use 'cache_mode=CacheMode.BYPASS' instead. " - "Pass warning=False to suppress this warning.", - DeprecationWarning, - stacklevel=2 - ) - if config.cache_mode is None: - config.cache_mode = CacheMode.BYPASS - - from .dispatcher import MemoryAdaptiveDispatcher, CrawlerMonitor, DisplayMode - - # Create dispatcher with configuration from CrawlerRunConfig - dispatcher = MemoryAdaptiveDispatcher( - crawler=self, - memory_threshold_percent=config.memory_threshold_percent, - check_interval=config.check_interval, - max_session_permit=config.max_session_permit, - enable_rate_limiting=config.enable_rate_limiting, - rate_limit_config=vars(config.rate_limit_config) if config.rate_limit_config else None - ) - - # Create monitor if display mode is specified - monitor = None - if config.display_mode: - monitor = CrawlerMonitor( - max_visible_rows=15, - display_mode=DisplayMode(config.display_mode) ) - # Run URLs through dispatcher - task_results = await dispatcher.run_urls(urls, config, monitor=monitor) - - # Convert CrawlerTaskResult to CrawlResult - return [task_result.result for task_result in task_results] + # # Initialize the dispatcher with the selected strategy + # dispatcher = dispatcher_strategy(self, config.dispatcher_config) + + # memory_monitor: CrawlerMonitor = None + # if config.dispatcher_config.enable_monitor: + # memory_monitor = CrawlerMonitor(max_visible_rows=config.dispatcher_config.max_display_rows, display_mode=config.dispatcher_config.display_mode) + + # Create default dispatcher if none provided + if dispatcher is None: + dispatcher = MemoryAdaptiveDispatcher( + self, + rate_limiter=RateLimiter( + base_delay=(1.0, 3.0), + max_delay=60.0, + max_retries=3 + ) + ) + + # Run the URLs through the dispatcher + _results: List[CrawlerTaskResult] = await dispatcher.run_urls( + crawler=self, + urls=urls, + config=config + ) + + results: CrawlResult = [] + for res in _results: + _res : CrawlResult = res.result + dispatch_result: DispatchResult = DispatchResult( + task_id=res.task_id, + memory_usage=res.memory_usage, + peak_memory=res.peak_memory, + start_time=res.start_time, + end_time=res.end_time, + error_message=res.error_message + ) + _res.dispatch_result = dispatch_result + results.append(_res) + + return results async def aclear_cache(self): """Clear the cache database.""" diff --git a/crawl4ai/dispatcher.py b/crawl4ai/dispatcher copy.py similarity index 100% rename from crawl4ai/dispatcher.py rename to crawl4ai/dispatcher copy.py diff --git a/crawl4ai/models.py b/crawl4ai/models.py index 6fb362a3..1e2b4794 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -1,8 +1,70 @@ from pydantic import BaseModel, HttpUrl -from typing import List, Dict, Optional, Callable, Awaitable, Union, Any -from dataclasses import dataclass +from typing import List, Dict, Optional, Callable, Awaitable, Union, Tuple +from enum import Enum +from dataclasses import dataclass, field from .ssl_certificate import SSLCertificate +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import Optional + +from datetime import timedelta + + + +############################### +# Dispatcher Models +############################### +@dataclass +class DomainState: + last_request_time: float = 0 + current_delay: float = 0 + fail_count: int = 0 + +@dataclass +class CrawlerTaskResult: + task_id: str + url: str + result: "CrawlResult" + memory_usage: float + peak_memory: float + start_time: datetime + end_time: datetime + error_message: str = "" + +class CrawlStatus(Enum): + QUEUED = "QUEUED" + IN_PROGRESS = "IN_PROGRESS" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + +@dataclass +class CrawlStats: + task_id: str + url: str + status: CrawlStatus + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + memory_usage: float = 0.0 + peak_memory: float = 0.0 + error_message: str = "" + + @property + def duration(self) -> str: + if not self.start_time: + return "0:00" + end = self.end_time or datetime.now() + duration = end - self.start_time + return str(timedelta(seconds=int(duration.total_seconds()))) + +class DisplayMode(Enum): + DETAILED = "DETAILED" + AGGREGATED = "AGGREGATED" + +############################### +# Crawler Models +############################### @dataclass class TokenUsage: completion_tokens: int = 0 @@ -23,6 +85,13 @@ class MarkdownGenerationResult(BaseModel): fit_markdown: Optional[str] = None fit_html: Optional[str] = None +class DispatchResult(BaseModel): + task_id: str + memory_usage: float + peak_memory: float + start_time: datetime + end_time: datetime + error_message: str = "" class CrawlResult(BaseModel): url: str html: str @@ -44,6 +113,7 @@ class CrawlResult(BaseModel): response_headers: Optional[dict] = None status_code: Optional[int] = None ssl_certificate: Optional[SSLCertificate] = None + dispatch_result: Optional[DispatchResult] = None class Config: arbitrary_types_allowed = True diff --git a/docs/examples/dispatcher_example.py b/docs/examples/dispatcher_example.py index 72487797..796ee078 100644 --- a/docs/examples/dispatcher_example.py +++ b/docs/examples/dispatcher_example.py @@ -1,67 +1,121 @@ -import asyncio, time -from crawl4ai.async_webcrawler import AsyncWebCrawler -from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig, RateLimitConfig -from crawl4ai.dispatcher import DisplayMode +import asyncio +import time +from rich import print +from rich.table import Table +from crawl4ai import ( + AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, + MemoryAdaptiveDispatcher, SemaphoreDispatcher, + RateLimiter, CrawlerMonitor, DisplayMode, CacheMode +) -async def crawl_with_rate_limiting(urls): - """ - Example function demonstrating how to use AsyncWebCrawler with rate limiting and resource monitoring. - - Args: - urls (List[str]): List of URLs to crawl - - Returns: - List[CrawlResult]: List of crawl results for each URL - """ - # Configure browser settings - browser_config = BrowserConfig( - headless=True, # Run browser in headless mode - verbose=False # Minimize browser logging - ) - - # Configure crawler settings with rate limiting - run_config = CrawlerRunConfig( - # Enable rate limiting - enable_rate_limiting=True, - rate_limit_config=RateLimitConfig( - base_delay=(1.0, 2.0), # Random delay between 1-2 seconds between requests - max_delay=30.0, # Maximum delay after rate limit hits - max_retries=2, # Number of retries before giving up - rate_limit_codes=[429, 503] # HTTP status codes to trigger rate limiting - ), - # Resource monitoring settings - memory_threshold_percent=70.0, # Pause crawling if memory usage exceeds this - check_interval=0.5, # How often to check resource usage - max_session_permit=10, # Maximum concurrent crawls - display_mode=DisplayMode.DETAILED.value # Show detailed progress - ) - - # Create and use crawler with context manager - async with AsyncWebCrawler(config=browser_config) as crawler: - results = await crawler.arun_many(urls, config=run_config) - return results - -def main(): - # Example URLs (replace with real URLs) - urls = [ - f"https://example.com/page{i}" for i in range(1, 40) - ] - +async def memory_adaptive(urls, browser_config, run_config): + """Memory adaptive crawler with monitoring""" start = time.perf_counter() + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=10, + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher) + duration = time.perf_counter() - start + return len(results), duration + +async def memory_adaptive_with_rate_limit(urls, browser_config, run_config): + """Memory adaptive crawler with rate limiting""" + start = time.perf_counter() + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=10, + rate_limiter=RateLimiter( + base_delay=(1.0, 2.0), + max_delay=30.0, + max_retries=2 + ), + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher) + duration = time.perf_counter() - start + return len(results), duration + +async def semaphore(urls, browser_config, run_config): + """Basic semaphore crawler""" + start = time.perf_counter() + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = SemaphoreDispatcher( + semaphore_count=5, + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher) + duration = time.perf_counter() - start + return len(results), duration + +async def semaphore_with_rate_limit(urls, browser_config, run_config): + """Semaphore crawler with rate limiting""" + start = time.perf_counter() + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = SemaphoreDispatcher( + semaphore_count=5, + rate_limiter=RateLimiter( + base_delay=(1.0, 2.0), + max_delay=30.0, + max_retries=2 + ), + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher) + duration = time.perf_counter() - start + return len(results), duration + +def create_performance_table(results): + """Creates a rich table showing performance results""" + table = Table(title="Crawler Strategy Performance Comparison") + table.add_column("Strategy", style="cyan") + table.add_column("URLs Crawled", justify="right", style="green") + table.add_column("Time (seconds)", justify="right", style="yellow") + table.add_column("URLs/second", justify="right", style="magenta") + + sorted_results = sorted(results.items(), key=lambda x: x[1][1]) - # Run the crawler - results = asyncio.run(crawl_with_rate_limiting(urls)) + for strategy, (urls_crawled, duration) in sorted_results: + urls_per_second = urls_crawled / duration + table.add_row( + strategy, + str(urls_crawled), + f"{duration:.2f}", + f"{urls_per_second:.2f}" + ) - # Process results - successful_results = [result for result in results if result.success] - failed_results = [result for result in results if not result.success] - - end = time.perf_counter() - - # Print results - print(f"Successful crawls: {len(successful_results)}") - print(f"Failed crawls: {len(failed_results)}") - print(f"Time taken: {end - start:.2f} seconds") + return table + +async def main(): + urls = [f"https://example.com/page{i}" for i in range(1, 20)] + browser_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + results = { + "Memory Adaptive": await memory_adaptive(urls, browser_config, run_config), + "Memory Adaptive + Rate Limit": await memory_adaptive_with_rate_limit(urls, browser_config, run_config), + "Semaphore": await semaphore(urls, browser_config, run_config), + "Semaphore + Rate Limit": await semaphore_with_rate_limit(urls, browser_config, run_config), + } + + table = create_performance_table(results) + print("\nPerformance Summary:") + print(table) if __name__ == "__main__": - main() + asyncio.run(main()) \ No newline at end of file diff --git a/docs/md_v2/advanced/multi-url-crawling copy.md b/docs/md_v2/advanced/multi-url-crawling copy.md new file mode 100644 index 00000000..a1d2b423 --- /dev/null +++ b/docs/md_v2/advanced/multi-url-crawling copy.md @@ -0,0 +1,264 @@ +# Optimized Multi-URL Crawling + +> **Note**: We’re developing a new **executor module** that uses a sophisticated algorithm to dynamically manage multi-URL crawling, optimizing for speed and memory usage. The approaches in this document remain fully valid, but keep an eye on **Crawl4AI**’s upcoming releases for this powerful feature! Follow [@unclecode](https://twitter.com/unclecode) on X and check the changelogs to stay updated. + + +Crawl4AI’s **AsyncWebCrawler** can handle multiple URLs in a single run, which can greatly reduce overhead and speed up crawling. This guide shows how to: + +1. **Sequentially** crawl a list of URLs using the **same** session, avoiding repeated browser creation. +2. **Parallel**-crawl subsets of URLs in batches, again reusing the same browser. + +When the entire process finishes, you close the browser once—**minimizing** memory and resource usage. + +--- + +## 1. Why Avoid Simple Loops per URL? + +If you naively do: + +```python +for url in urls: + async with AsyncWebCrawler() as crawler: + result = await crawler.arun(url) +``` + +You end up: + +1. Spinning up a **new** browser for each URL +2. Closing it immediately after the single crawl +3. Potentially using a lot of CPU/memory for short-living browsers +4. Missing out on session reusability if you have login or ongoing states + +**Better** approaches ensure you **create** the browser once, then crawl multiple URLs with minimal overhead. + +--- + +## 2. Sequential Crawling with Session Reuse + +### 2.1 Overview + +1. **One** `AsyncWebCrawler` instance for **all** URLs. +2. **One** session (via `session_id`) so we can preserve local storage or cookies across URLs if needed. +3. The crawler is only closed at the **end**. + +**This** is the simplest pattern if your workload is moderate (dozens to a few hundred URLs). + +### 2.2 Example Code + +```python +import asyncio +from typing import List +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig +from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator + +async def crawl_sequential(urls: List[str]): + print("\n=== Sequential Crawling with Session Reuse ===") + + browser_config = BrowserConfig( + headless=True, + # For better performance in Docker or low-memory environments: + extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], + ) + + crawl_config = CrawlerRunConfig( + markdown_generator=DefaultMarkdownGenerator() + ) + + # Create the crawler (opens the browser) + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + try: + session_id = "session1" # Reuse the same session across all URLs + for url in urls: + result = await crawler.arun( + url=url, + config=crawl_config, + session_id=session_id + ) + if result.success: + print(f"Successfully crawled: {url}") + # E.g. check markdown length + print(f"Markdown length: {len(result.markdown_v2.raw_markdown)}") + else: + print(f"Failed: {url} - Error: {result.error_message}") + finally: + # After all URLs are done, close the crawler (and the browser) + await crawler.close() + +async def main(): + urls = [ + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3" + ] + await crawl_sequential(urls) + +if __name__ == "__main__": + asyncio.run(main()) +``` + +**Why It’s Good**: + +- **One** browser launch. +- Minimal memory usage. +- If the site requires login, you can log in once in `session_id` context and preserve auth across all URLs. + +--- + +## 3. Parallel Crawling with Browser Reuse + +### 3.1 Overview + +To speed up crawling further, you can crawl multiple URLs in **parallel** (batches or a concurrency limit). The crawler still uses **one** browser, but spawns different sessions (or the same, depending on your logic) for each task. + +### 3.2 Example Code + +For this example make sure to install the [psutil](https://pypi.org/project/psutil/) package. + +```bash +pip install psutil +``` + +Then you can run the following code: + +```python +import os +import sys +import psutil +import asyncio + +__location__ = os.path.dirname(os.path.abspath(__file__)) +__output__ = os.path.join(__location__, "output") + +# Append parent directory to system path +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +from typing import List +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode + +async def crawl_parallel(urls: List[str], max_concurrent: int = 3): + print("\n=== Parallel Crawling with Browser Reuse + Memory Check ===") + + # We'll keep track of peak memory usage across all tasks + peak_memory = 0 + process = psutil.Process(os.getpid()) + + def log_memory(prefix: str = ""): + nonlocal peak_memory + current_mem = process.memory_info().rss # in bytes + if current_mem > peak_memory: + peak_memory = current_mem + print(f"{prefix} Current Memory: {current_mem // (1024 * 1024)} MB, Peak: {peak_memory // (1024 * 1024)} MB") + + # Minimal browser config + browser_config = BrowserConfig( + headless=True, + verbose=False, # corrected from 'verbos=False' + extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], + ) + crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + # Create the crawler instance + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + try: + # We'll chunk the URLs in batches of 'max_concurrent' + success_count = 0 + fail_count = 0 + for i in range(0, len(urls), max_concurrent): + batch = urls[i : i + max_concurrent] + tasks = [] + + for j, url in enumerate(batch): + # Unique session_id per concurrent sub-task + session_id = f"parallel_session_{i + j}" + task = crawler.arun(url=url, config=crawl_config, session_id=session_id) + tasks.append(task) + + # Check memory usage prior to launching tasks + log_memory(prefix=f"Before batch {i//max_concurrent + 1}: ") + + # Gather results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check memory usage after tasks complete + log_memory(prefix=f"After batch {i//max_concurrent + 1}: ") + + # Evaluate results + for url, result in zip(batch, results): + if isinstance(result, Exception): + print(f"Error crawling {url}: {result}") + fail_count += 1 + elif result.success: + success_count += 1 + else: + fail_count += 1 + + print(f"\nSummary:") + print(f" - Successfully crawled: {success_count}") + print(f" - Failed: {fail_count}") + + finally: + print("\nClosing crawler...") + await crawler.close() + # Final memory log + log_memory(prefix="Final: ") + print(f"\nPeak memory usage (MB): {peak_memory // (1024 * 1024)}") + +async def main(): + urls = [ + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3", + "https://example.com/page4" + ] + await crawl_parallel(urls, max_concurrent=2) + +if __name__ == "__main__": + asyncio.run(main()) + +``` + +**Notes**: + +- We **reuse** the same `AsyncWebCrawler` instance for all parallel tasks, launching **one** browser. +- Each parallel sub-task might get its own `session_id` so they don’t share cookies/localStorage (unless that’s desired). +- We limit concurrency to `max_concurrent=2` or 3 to avoid saturating CPU/memory. + +--- + +## 4. Performance Tips + +1. **Extra Browser Args** + - `--disable-gpu`, `--no-sandbox` can help in Docker or restricted environments. + - `--disable-dev-shm-usage` avoids using `/dev/shm` which can be small on some systems. + +2. **Session Reuse** + - If your site requires a login or you want to maintain local data across URLs, share the **same** `session_id`. + - If you want isolation (each URL fresh), create unique sessions. + +3. **Batching** + - If you have **many** URLs (like thousands), you can do parallel crawling in chunks (like `max_concurrent=5`). + - Use `arun_many()` for a built-in approach if you prefer, but the example above is often more flexible. + +4. **Cache** + - If your pages share many resources or you’re re-crawling the same domain repeatedly, consider setting `cache_mode=CacheMode.ENABLED` in `CrawlerRunConfig`. + - If you need fresh data each time, keep `cache_mode=CacheMode.BYPASS`. + +5. **Hooks** + - You can set up global hooks for each crawler (like to block images) or per-run if you want. + - Keep them consistent if you’re reusing sessions. + +--- + +## 5. Summary + +- **One** `AsyncWebCrawler` + multiple calls to `.arun()` is far more efficient than launching a new crawler per URL. +- **Sequential** approach with a shared session is simple and memory-friendly for moderate sets of URLs. +- **Parallel** approach can speed up large crawls by concurrency, but keep concurrency balanced to avoid overhead. +- Close the crawler once at the end, ensuring the browser is only opened/closed once. + +For even more advanced memory optimizations or dynamic concurrency patterns, see future sections on hooking or distributed crawling. The patterns above suffice for the majority of multi-URL scenarios—**giving you speed, simplicity, and minimal resource usage**. Enjoy your optimized crawling! \ No newline at end of file diff --git a/docs/md_v2/advanced/multi-url-crawling.md b/docs/md_v2/advanced/multi-url-crawling.md index a1d2b423..b1bd26cf 100644 --- a/docs/md_v2/advanced/multi-url-crawling.md +++ b/docs/md_v2/advanced/multi-url-crawling.md @@ -1,264 +1,205 @@ -# Optimized Multi-URL Crawling +# Advanced Multi-URL Crawling with Dispatchers -> **Note**: We’re developing a new **executor module** that uses a sophisticated algorithm to dynamically manage multi-URL crawling, optimizing for speed and memory usage. The approaches in this document remain fully valid, but keep an eye on **Crawl4AI**’s upcoming releases for this powerful feature! Follow [@unclecode](https://twitter.com/unclecode) on X and check the changelogs to stay updated. +> **Heads Up**: Crawl4AI supports advanced dispatchers for **parallel** or **throttled** crawling, providing dynamic rate limiting and memory usage checks. The built-in `arun_many()` function uses these dispatchers to handle concurrency efficiently. +## 1. Introduction -Crawl4AI’s **AsyncWebCrawler** can handle multiple URLs in a single run, which can greatly reduce overhead and speed up crawling. This guide shows how to: +When crawling many URLs: +- **Basic**: Use `arun()` in a loop (simple but less efficient) +- **Better**: Use `arun_many()`, which efficiently handles multiple URLs with proper concurrency control +- **Best**: Customize dispatcher behavior for your specific needs (memory management, rate limits, etc.) -1. **Sequentially** crawl a list of URLs using the **same** session, avoiding repeated browser creation. -2. **Parallel**-crawl subsets of URLs in batches, again reusing the same browser. +**Why Dispatchers?** +- **Adaptive**: Memory-based dispatchers can pause or slow down based on system resources +- **Rate-limiting**: Built-in rate limiting with exponential backoff for 429/503 responses +- **Real-time Monitoring**: Live dashboard of ongoing tasks, memory usage, and performance +- **Flexibility**: Choose between memory-adaptive or semaphore-based concurrency -When the entire process finishes, you close the browser once—**minimizing** memory and resource usage. +## 2. Core Components ---- - -## 1. Why Avoid Simple Loops per URL? - -If you naively do: +### 2.1 Rate Limiter ```python -for url in urls: - async with AsyncWebCrawler() as crawler: - result = await crawler.arun(url) +class RateLimiter: + def __init__( + base_delay: Tuple[float, float] = (1.0, 3.0), # Random delay range between requests + max_delay: float = 60.0, # Maximum backoff delay + max_retries: int = 3, # Retries before giving up + rate_limit_codes: List[int] = [429, 503] # Status codes triggering backoff + ) ``` -You end up: +The RateLimiter provides: +- Random delays between requests +- Exponential backoff on rate limit responses +- Domain-specific rate limiting +- Automatic retry handling -1. Spinning up a **new** browser for each URL -2. Closing it immediately after the single crawl -3. Potentially using a lot of CPU/memory for short-living browsers -4. Missing out on session reusability if you have login or ongoing states +### 2.2 Crawler Monitor -**Better** approaches ensure you **create** the browser once, then crawl multiple URLs with minimal overhead. - ---- - -## 2. Sequential Crawling with Session Reuse - -### 2.1 Overview - -1. **One** `AsyncWebCrawler` instance for **all** URLs. -2. **One** session (via `session_id`) so we can preserve local storage or cookies across URLs if needed. -3. The crawler is only closed at the **end**. - -**This** is the simplest pattern if your workload is moderate (dozens to a few hundred URLs). - -### 2.2 Example Code +The CrawlerMonitor provides real-time visibility into crawling operations: ```python -import asyncio -from typing import List -from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig -from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator - -async def crawl_sequential(urls: List[str]): - print("\n=== Sequential Crawling with Session Reuse ===") - - browser_config = BrowserConfig( - headless=True, - # For better performance in Docker or low-memory environments: - extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], - ) - - crawl_config = CrawlerRunConfig( - markdown_generator=DefaultMarkdownGenerator() - ) - - # Create the crawler (opens the browser) - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - try: - session_id = "session1" # Reuse the same session across all URLs - for url in urls: - result = await crawler.arun( - url=url, - config=crawl_config, - session_id=session_id - ) - if result.success: - print(f"Successfully crawled: {url}") - # E.g. check markdown length - print(f"Markdown length: {len(result.markdown_v2.raw_markdown)}") - else: - print(f"Failed: {url} - Error: {result.error_message}") - finally: - # After all URLs are done, close the crawler (and the browser) - await crawler.close() - -async def main(): - urls = [ - "https://example.com/page1", - "https://example.com/page2", - "https://example.com/page3" - ] - await crawl_sequential(urls) - -if __name__ == "__main__": - asyncio.run(main()) +monitor = CrawlerMonitor( + max_visible_rows=15, # Maximum rows in live display + display_mode=DisplayMode.DETAILED # DETAILED or AGGREGATED view +) ``` -**Why It’s Good**: +**Display Modes**: +1. **DETAILED**: Shows individual task status, memory usage, and timing +2. **AGGREGATED**: Displays summary statistics and overall progress -- **One** browser launch. -- Minimal memory usage. -- If the site requires login, you can log in once in `session_id` context and preserve auth across all URLs. +## 3. Available Dispatchers ---- +### 3.1 MemoryAdaptiveDispatcher (Default) -## 3. Parallel Crawling with Browser Reuse - -### 3.1 Overview - -To speed up crawling further, you can crawl multiple URLs in **parallel** (batches or a concurrency limit). The crawler still uses **one** browser, but spawns different sessions (or the same, depending on your logic) for each task. - -### 3.2 Example Code - -For this example make sure to install the [psutil](https://pypi.org/project/psutil/) package. - -```bash -pip install psutil -``` - -Then you can run the following code: +Automatically manages concurrency based on system memory usage: ```python -import os -import sys -import psutil -import asyncio - -__location__ = os.path.dirname(os.path.abspath(__file__)) -__output__ = os.path.join(__location__, "output") - -# Append parent directory to system path -parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -sys.path.append(parent_dir) - -from typing import List -from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode - -async def crawl_parallel(urls: List[str], max_concurrent: int = 3): - print("\n=== Parallel Crawling with Browser Reuse + Memory Check ===") - - # We'll keep track of peak memory usage across all tasks - peak_memory = 0 - process = psutil.Process(os.getpid()) - - def log_memory(prefix: str = ""): - nonlocal peak_memory - current_mem = process.memory_info().rss # in bytes - if current_mem > peak_memory: - peak_memory = current_mem - print(f"{prefix} Current Memory: {current_mem // (1024 * 1024)} MB, Peak: {peak_memory // (1024 * 1024)} MB") - - # Minimal browser config - browser_config = BrowserConfig( - headless=True, - verbose=False, # corrected from 'verbos=False' - extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], +dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, # Pause if memory exceeds this + check_interval=1.0, # How often to check memory + max_session_permit=10, # Maximum concurrent tasks + rate_limiter=RateLimiter( # Optional rate limiting + base_delay=(1.0, 2.0), + max_delay=30.0, + max_retries=2 + ), + monitor=CrawlerMonitor( # Optional monitoring + max_visible_rows=15, + display_mode=DisplayMode.DETAILED ) - crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) - - # Create the crawler instance - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - try: - # We'll chunk the URLs in batches of 'max_concurrent' - success_count = 0 - fail_count = 0 - for i in range(0, len(urls), max_concurrent): - batch = urls[i : i + max_concurrent] - tasks = [] - - for j, url in enumerate(batch): - # Unique session_id per concurrent sub-task - session_id = f"parallel_session_{i + j}" - task = crawler.arun(url=url, config=crawl_config, session_id=session_id) - tasks.append(task) - - # Check memory usage prior to launching tasks - log_memory(prefix=f"Before batch {i//max_concurrent + 1}: ") - - # Gather results - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Check memory usage after tasks complete - log_memory(prefix=f"After batch {i//max_concurrent + 1}: ") - - # Evaluate results - for url, result in zip(batch, results): - if isinstance(result, Exception): - print(f"Error crawling {url}: {result}") - fail_count += 1 - elif result.success: - success_count += 1 - else: - fail_count += 1 - - print(f"\nSummary:") - print(f" - Successfully crawled: {success_count}") - print(f" - Failed: {fail_count}") - - finally: - print("\nClosing crawler...") - await crawler.close() - # Final memory log - log_memory(prefix="Final: ") - print(f"\nPeak memory usage (MB): {peak_memory // (1024 * 1024)}") - -async def main(): - urls = [ - "https://example.com/page1", - "https://example.com/page2", - "https://example.com/page3", - "https://example.com/page4" - ] - await crawl_parallel(urls, max_concurrent=2) - -if __name__ == "__main__": - asyncio.run(main()) - +) ``` -**Notes**: +### 3.2 SemaphoreDispatcher -- We **reuse** the same `AsyncWebCrawler` instance for all parallel tasks, launching **one** browser. -- Each parallel sub-task might get its own `session_id` so they don’t share cookies/localStorage (unless that’s desired). -- We limit concurrency to `max_concurrent=2` or 3 to avoid saturating CPU/memory. +Provides simple concurrency control with a fixed limit: ---- +```python +dispatcher = SemaphoreDispatcher( + semaphore_count=5, # Fixed concurrent tasks + rate_limiter=RateLimiter( # Optional rate limiting + base_delay=(0.5, 1.0), + max_delay=10.0 + ), + monitor=CrawlerMonitor( # Optional monitoring + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) +) +``` -## 4. Performance Tips +## 4. Usage Examples -1. **Extra Browser Args** - - `--disable-gpu`, `--no-sandbox` can help in Docker or restricted environments. - - `--disable-dev-shm-usage` avoids using `/dev/shm` which can be small on some systems. +### 4.1 Simple Usage (Default MemoryAdaptiveDispatcher) -2. **Session Reuse** - - If your site requires a login or you want to maintain local data across URLs, share the **same** `session_id`. - - If you want isolation (each URL fresh), create unique sessions. +```python +async with AsyncWebCrawler(config=browser_config) as crawler: + results = await crawler.arun_many(urls, config=run_config) +``` -3. **Batching** - - If you have **many** URLs (like thousands), you can do parallel crawling in chunks (like `max_concurrent=5`). - - Use `arun_many()` for a built-in approach if you prefer, but the example above is often more flexible. +### 4.2 Memory Adaptive with Rate Limiting -4. **Cache** - - If your pages share many resources or you’re re-crawling the same domain repeatedly, consider setting `cache_mode=CacheMode.ENABLED` in `CrawlerRunConfig`. - - If you need fresh data each time, keep `cache_mode=CacheMode.BYPASS`. +```python +async def crawl_with_memory_adaptive(urls): + browser_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=10, + rate_limiter=RateLimiter( + base_delay=(1.0, 2.0), + max_delay=30.0, + max_retries=2 + ), + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + + async with AsyncWebCrawler(config=browser_config) as crawler: + results = await crawler.arun_many( + urls, + config=run_config, + dispatcher=dispatcher + ) + return results +``` -5. **Hooks** - - You can set up global hooks for each crawler (like to block images) or per-run if you want. - - Keep them consistent if you’re reusing sessions. +### 4.3 Semaphore with Rate Limiting ---- +```python +async def crawl_with_semaphore(urls): + browser_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + dispatcher = SemaphoreDispatcher( + semaphore_count=5, + rate_limiter=RateLimiter( + base_delay=(0.5, 1.0), + max_delay=10.0 + ), + monitor=CrawlerMonitor( + max_visible_rows=15, + display_mode=DisplayMode.DETAILED + ) + ) + + async with AsyncWebCrawler(config=browser_config) as crawler: + results = await crawler.arun_many( + urls, + config=run_config, + dispatcher=dispatcher + ) + return results +``` -## 5. Summary +## 5. Dispatch Results -- **One** `AsyncWebCrawler` + multiple calls to `.arun()` is far more efficient than launching a new crawler per URL. -- **Sequential** approach with a shared session is simple and memory-friendly for moderate sets of URLs. -- **Parallel** approach can speed up large crawls by concurrency, but keep concurrency balanced to avoid overhead. -- Close the crawler once at the end, ensuring the browser is only opened/closed once. +Each crawl result includes dispatch information: -For even more advanced memory optimizations or dynamic concurrency patterns, see future sections on hooking or distributed crawling. The patterns above suffice for the majority of multi-URL scenarios—**giving you speed, simplicity, and minimal resource usage**. Enjoy your optimized crawling! \ No newline at end of file +```python +@dataclass +class DispatchResult: + task_id: str + memory_usage: float + peak_memory: float + start_time: datetime + end_time: datetime + error_message: str = "" +``` + +Access via `result.dispatch_result`: + +```python +for result in results: + if result.success: + dr = result.dispatch_result + print(f"URL: {result.url}") + print(f"Memory: {dr.memory_usage:.1f}MB") + print(f"Duration: {dr.end_time - dr.start_time}") +``` + +## 6. Summary + +1. **Two Dispatcher Types**: + - MemoryAdaptiveDispatcher (default): Dynamic concurrency based on memory + - SemaphoreDispatcher: Fixed concurrency limit + +2. **Optional Components**: + - RateLimiter: Smart request pacing and backoff + - CrawlerMonitor: Real-time progress visualization + +3. **Key Benefits**: + - Automatic memory management + - Built-in rate limiting + - Live progress monitoring + - Flexible concurrency control + +Choose the dispatcher that best fits your needs: +- **MemoryAdaptiveDispatcher**: For large crawls or limited resources +- **SemaphoreDispatcher**: For simple, fixed-concurrency scenarios diff --git a/docs/md_v2/api/arun.md b/docs/md_v2/api/arun.md index d1d5eae9..5972f402 100644 --- a/docs/md_v2/api/arun.md +++ b/docs/md_v2/api/arun.md @@ -1,7 +1,3 @@ -Below is a **revised parameter guide** for **`arun()`** in **AsyncWebCrawler**, reflecting the **new** approach where all parameters are passed via a **`CrawlerRunConfig`** instead of directly to `arun()`. Each section includes example usage in the new style, ensuring a clear, modern approach. - ---- - # `arun()` Parameter Guide (New Approach) In Crawl4AI’s **latest** configuration model, nearly all parameters that once went directly to `arun()` are now part of **`CrawlerRunConfig`**. When calling `arun()`, you provide: diff --git a/docs/md_v2/api/arun_many.md b/docs/md_v2/api/arun_many.md new file mode 100644 index 00000000..b1e02d95 --- /dev/null +++ b/docs/md_v2/api/arun_many.md @@ -0,0 +1,100 @@ +# `arun_many(...)` Reference + +> **Note**: This function is very similar to [`arun()`](./arun.md) but focused on **concurrent** or **batch** crawling. If you’re unfamiliar with `arun()` usage, please read that doc first, then review this for differences. + +## Function Signature + +```python +async def arun_many( + urls: Union[List[str], List[Any]], + config: Optional[CrawlerRunConfig] = None, + dispatcher: Optional[BaseDispatcher] = None, + ... +) -> List[CrawlResult]: + """ + Crawl multiple URLs concurrently or in batches. + + :param urls: A list of URLs (or tasks) to crawl. + :param config: (Optional) A default `CrawlerRunConfig` applying to each crawl. + :param dispatcher: (Optional) A concurrency controller (e.g. MemoryAdaptiveDispatcher). + ... + :return: A list of `CrawlResult` objects, one per URL. + """ +``` + +## Differences from `arun()` + +1. **Multiple URLs**: + - Instead of crawling a single URL, you pass a list of them (strings or tasks). + - The function returns a **list** of `CrawlResult`, in the same order as `urls`. + +2. **Concurrency & Dispatchers**: + - **`dispatcher`** param allows advanced concurrency control. + - If omitted, a default dispatcher (like `MemoryAdaptiveDispatcher`) is used internally. + - Dispatchers handle concurrency, rate limiting, and memory-based adaptive throttling (see [Multi-URL Crawling](../advanced/multi-url-crawling.md)). + +3. **Parallel** Execution**: + - `arun_many()` can run multiple requests concurrently under the hood. + - Each `CrawlResult` might also include a **`dispatch_result`** with concurrency details (like memory usage, start/end times). + +### Basic Example + +```python +# Minimal usage: The default dispatcher will be used +results = await crawler.arun_many( + urls=["https://site1.com", "https://site2.com"], + config=my_run_config +) + +for res in results: + if res.success: + print(res.url, "crawled OK!") + else: + print("Failed:", res.url, "-", res.error_message) +``` + +### With a Custom Dispatcher + +```python +dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=10 +) +results = await crawler.arun_many( + urls=["https://site1.com", "https://site2.com", "https://site3.com"], + config=my_run_config, + dispatcher=dispatcher +) +``` + +**Key Points**: +- Each URL is processed by the same or separate sessions, depending on the dispatcher’s strategy. +- `dispatch_result` in each `CrawlResult` (if using concurrency) can hold memory and timing info. +- If you need to handle authentication or session IDs, pass them in each individual task or within your run config. + +### Return Value + +A **list** of [`CrawlResult`](./crawl-result.md) objects, one per URL. You can iterate to check `result.success` or read each item’s `extracted_content`, `markdown`, or `dispatch_result`. + +--- + +## Dispatcher Reference + +- **`MemoryAdaptiveDispatcher`**: Dynamically manages concurrency based on system memory usage. +- **`SemaphoreDispatcher`**: Fixed concurrency limit, simpler but less adaptive. + +For advanced usage or custom settings, see [Multi-URL Crawling with Dispatchers](../advanced/multi-url-crawling.md). + +--- + +## Common Pitfalls + +1. **Large Lists**: If you pass thousands of URLs, be mindful of memory or rate-limits. A dispatcher can help. +2. **Session Reuse**: If you need specialized logins or persistent contexts, ensure your dispatcher or tasks handle sessions accordingly. +3. **Error Handling**: Each `CrawlResult` might fail for different reasons—always check `result.success` or the `error_message` before proceeding. + +--- + +## Conclusion + +Use `arun_many()` when you want to **crawl multiple URLs** simultaneously or in controlled parallel tasks. If you need advanced concurrency features (like memory-based adaptive throttling or complex rate-limiting), provide a **dispatcher**. Each result is a standard `CrawlResult`, possibly augmented with concurrency stats (`dispatch_result`) for deeper inspection. For more details on concurrency logic and dispatchers, see the [Advanced Multi-URL Crawling](../advanced/multi-url-crawling.md) docs. \ No newline at end of file diff --git a/docs/md_v2/api/async-webcrawler.md b/docs/md_v2/api/async-webcrawler.md index f37ad198..e9c6cc6b 100644 --- a/docs/md_v2/api/async-webcrawler.md +++ b/docs/md_v2/api/async-webcrawler.md @@ -130,51 +130,88 @@ For **backward** compatibility, `arun()` can still accept direct arguments like --- -## 4. Helper Methods - -### 4.1 `arun_many()` +## 4. Batch Processing: `arun_many()` ```python async def arun_many( self, urls: List[str], config: Optional[CrawlerRunConfig] = None, - # Legacy parameters... + # Legacy parameters maintained for backwards compatibility... ) -> List[CrawlResult]: - ... + """ + Process multiple URLs with intelligent rate limiting and resource monitoring. + """ ``` -Crawls multiple URLs in concurrency. Accepts the same style `CrawlerRunConfig`. Example: +### 4.1 Resource-Aware Crawling + +The `arun_many()` method now uses an intelligent dispatcher that: +- Monitors system memory usage +- Implements adaptive rate limiting +- Provides detailed progress monitoring +- Manages concurrent crawls efficiently + +### 4.2 Example Usage ```python +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, RateLimitConfig +from crawl4ai.dispatcher import DisplayMode + +# Configure browser +browser_cfg = BrowserConfig(headless=True) + +# Configure crawler with rate limiting run_cfg = CrawlerRunConfig( - # e.g., concurrency, wait_for, caching, extraction, etc. - semaphore_count=5 + # Enable rate limiting + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig( + base_delay=(1.0, 2.0), # Random delay between 1-2 seconds + max_delay=30.0, # Maximum delay after rate limit hits + max_retries=2, # Number of retries before giving up + rate_limit_codes=[429, 503] # Status codes that trigger rate limiting + ), + # Resource monitoring + memory_threshold_percent=70.0, # Pause if memory exceeds this + check_interval=0.5, # How often to check resources + max_session_permit=3, # Maximum concurrent crawls + display_mode=DisplayMode.DETAILED.value # Show detailed progress ) +urls = [ + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3" +] + async with AsyncWebCrawler(config=browser_cfg) as crawler: - results = await crawler.arun_many( - urls=["https://example.com", "https://another.com"], - config=run_cfg - ) - for r in results: - print(r.url, ":", len(r.cleaned_html)) + results = await crawler.arun_many(urls, config=run_cfg) + for result in results: + print(f"URL: {result.url}, Success: {result.success}") ``` -### 4.2 `start()` & `close()` +### 4.3 Key Features -Allows manual lifecycle usage instead of context manager: +1. **Rate Limiting** + - Automatic delay between requests + - Exponential backoff on rate limit detection + - Domain-specific rate limiting + - Configurable retry strategy -```python -crawler = AsyncWebCrawler(config=browser_cfg) -await crawler.start() +2. **Resource Monitoring** + - Memory usage tracking + - Adaptive concurrency based on system load + - Automatic pausing when resources are constrained -# Perform multiple operations -resultA = await crawler.arun("https://exampleA.com", config=run_cfg) -resultB = await crawler.arun("https://exampleB.com", config=run_cfg) +3. **Progress Monitoring** + - Detailed or aggregated progress display + - Real-time status updates + - Memory usage statistics -await crawler.close() -``` +4. **Error Handling** + - Graceful handling of rate limits + - Automatic retries with backoff + - Detailed error reporting --- diff --git a/docs/md_v2/api/crawl-result.md b/docs/md_v2/api/crawl-result.md index 929114c7..7ed6275a 100644 --- a/docs/md_v2/api/crawl-result.md +++ b/docs/md_v2/api/crawl-result.md @@ -26,6 +26,7 @@ class CrawlResult(BaseModel): response_headers: Optional[dict] = None status_code: Optional[int] = None ssl_certificate: Optional[SSLCertificate] = None + dispatch_result: Optional[DispatchResult] = None ... ``` @@ -262,7 +263,31 @@ if result.metadata: --- -## 6. Example: Accessing Everything +## 6. `dispatch_result` (optional) + +A `DispatchResult` object providing additional concurrency and resource usage information when crawling URLs in parallel (e.g., via `arun_many()` with custom dispatchers). It contains: + +- **`task_id`**: A unique identifier for the parallel task. +- **`memory_usage`** (float): The memory (in MB) used at the time of completion. +- **`peak_memory`** (float): The peak memory usage (in MB) recorded during the task’s execution. +- **`start_time`** / **`end_time`** (datetime): Time range for this crawling task. +- **`error_message`** (str): Any dispatcher- or concurrency-related error encountered. + +```python +# Example usage: +for result in results: + if result.success and result.dispatch_result: + dr = result.dispatch_result + print(f"URL: {result.url}, Task ID: {dr.task_id}") + print(f"Memory: {dr.memory_usage:.1f} MB (Peak: {dr.peak_memory:.1f} MB)") + print(f"Duration: {dr.end_time - dr.start_time}") +``` + +> **Note**: This field is typically populated when using `arun_many(...)` alongside a **dispatcher** (e.g., `MemoryAdaptiveDispatcher` or `SemaphoreDispatcher`). If no concurrency or dispatcher is used, `dispatch_result` may remain `None`. + +--- + +## 7. Example: Accessing Everything ```python async def handle_result(result: CrawlResult): @@ -306,7 +331,7 @@ async def handle_result(result: CrawlResult): --- -## 7. Key Points & Future +## 8. Key Points & Future 1. **`markdown_v2` vs `markdown`** - Right now, `markdown_v2` is the more robust container (`MarkdownGenerationResult`), providing **raw_markdown**, **markdown_with_citations**, references, plus possible **fit_markdown**. diff --git a/docs/md_v2/api/parameters.md b/docs/md_v2/api/parameters.md index 7645084c..55cd39ab 100644 --- a/docs/md_v2/api/parameters.md +++ b/docs/md_v2/api/parameters.md @@ -157,7 +157,32 @@ Use these for link-level content filtering (often to keep crawls “internal” --- -### G) **Debug & Logging** +### G) **Rate Limiting & Resource Management** + +| **Parameter** | **Type / Default** | **What It Does** | +|------------------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| **`enable_rate_limiting`** | `bool` (default: `False`) | Enable intelligent rate limiting for multiple URLs | +| **`rate_limit_config`** | `RateLimitConfig` (default: `None`) | Configuration for rate limiting behavior | + +The `RateLimitConfig` class has these fields: + +| **Field** | **Type / Default** | **What It Does** | +|--------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| **`base_delay`** | `Tuple[float, float]` (1.0, 3.0) | Random delay range between requests to the same domain | +| **`max_delay`** | `float` (60.0) | Maximum delay after rate limit detection | +| **`max_retries`** | `int` (3) | Number of retries before giving up on rate-limited requests | +| **`rate_limit_codes`** | `List[int]` ([429, 503]) | HTTP status codes that trigger rate limiting behavior | + +| **Parameter** | **Type / Default** | **What It Does** | +|-------------------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| **`memory_threshold_percent`** | `float` (70.0) | Maximum memory usage before pausing new crawls | +| **`check_interval`** | `float` (1.0) | How often to check system resources (in seconds) | +| **`max_session_permit`** | `int` (20) | Maximum number of concurrent crawl sessions | +| **`display_mode`** | `str` (`None`, "DETAILED", "AGGREGATED") | How to display progress information | + +--- + +### H) **Debug & Logging** | **Parameter** | **Type / Default** | **What It Does** | |----------------|--------------------|---------------------------------------------------------------------------| @@ -170,7 +195,7 @@ Use these for link-level content filtering (often to keep crawls “internal” ```python import asyncio -from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, RateLimitConfig async def main(): # Configure the browser @@ -190,7 +215,18 @@ async def main(): excluded_tags=["script", "style"], exclude_external_links=True, wait_for="css:.article-loaded", - screenshot=True + screenshot=True, + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig( + base_delay=(1.0, 3.0), + max_delay=60.0, + max_retries=3, + rate_limit_codes=[429, 503] + ), + memory_threshold_percent=70.0, + check_interval=1.0, + max_session_permit=20, + display_mode="DETAILED" ) async with AsyncWebCrawler(config=browser_cfg) as crawler: @@ -223,4 +259,3 @@ if __name__ == "__main__": - **Use** `BrowserConfig` for **global** browser settings: engine, headless, proxy, user agent. - **Use** `CrawlerRunConfig` for each crawl’s **context**: how to filter content, handle caching, wait for dynamic elements, or run JS. - **Pass** both configs to `AsyncWebCrawler` (the `BrowserConfig`) and then to `arun()` (the `CrawlerRunConfig`). - diff --git a/docs/md_v2/core/browser-crawler-config.md b/docs/md_v2/core/browser-crawler-config.md index 11fa3493..288a35d7 100644 --- a/docs/md_v2/core/browser-crawler-config.md +++ b/docs/md_v2/core/browser-crawler-config.md @@ -116,6 +116,12 @@ class CrawlerRunConfig: wait_for=None, screenshot=False, pdf=False, + enable_rate_limiting=False, + rate_limit_config=None, + memory_threshold_percent=70.0, + check_interval=1.0, + max_session_permit=20, + display_mode=None, verbose=True, # ... other advanced parameters omitted ): @@ -156,6 +162,58 @@ class CrawlerRunConfig: - Logs additional runtime details. - Overlaps with the browser’s verbosity if also set to `True` in `BrowserConfig`. +9. **`enable_rate_limiting`**: + - If `True`, enables rate limiting for batch processing. + - Requires `rate_limit_config` to be set. + +10. **`rate_limit_config`**: + - A `RateLimitConfig` object controlling rate limiting behavior. + - See below for details. + +11. **`memory_threshold_percent`**: + - The memory threshold (as a percentage) to monitor. + - If exceeded, the crawler will pause or slow down. + +12. **`check_interval`**: + - The interval (in seconds) to check system resources. + - Affects how often memory and CPU usage are monitored. + +13. **`max_session_permit`**: + - The maximum number of concurrent crawl sessions. + - Helps prevent overwhelming the system. + +14. **`display_mode`**: + - The display mode for progress information (`DETAILED`, `BRIEF`, etc.). + - Affects how much information is printed during the crawl. + +### Rate Limiting & Resource Management + +For batch processing with `arun_many()`, you can enable intelligent rate limiting: + +```python +from crawl4ai import RateLimitConfig + +config = CrawlerRunConfig( + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig( + base_delay=(1.0, 3.0), # Random delay range + max_delay=60.0, # Max delay after rate limits + max_retries=3, # Retries before giving up + rate_limit_codes=[429, 503] # Status codes to watch + ), + memory_threshold_percent=70.0, # Memory threshold + check_interval=1.0, # Resource check interval + max_session_permit=20, # Max concurrent crawls + display_mode="DETAILED" # Progress display mode +) +``` + +This configuration: +- Implements intelligent rate limiting per domain +- Monitors system resources +- Provides detailed progress information +- Manages concurrent crawls efficiently + **Minimal Example**: ```python @@ -164,7 +222,14 @@ from crawl4ai import AsyncWebCrawler, CrawlerRunConfig crawl_conf = CrawlerRunConfig( js_code="document.querySelector('button#loadMore')?.click()", wait_for="css:.loaded-content", - screenshot=True + screenshot=True, + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig( + base_delay=(1.0, 3.0), + max_delay=60.0, + max_retries=3, + rate_limit_codes=[429, 503] + ) ) async with AsyncWebCrawler() as crawler: @@ -205,7 +270,14 @@ async def main(): # 3) Crawler run config: skip cache, use extraction run_conf = CrawlerRunConfig( extraction_strategy=extraction, - cache_mode=CacheMode.BYPASS + cache_mode=CacheMode.BYPASS, + enable_rate_limiting=True, + rate_limit_config=RateLimitConfig( + base_delay=(1.0, 3.0), + max_delay=60.0, + max_retries=3, + rate_limit_codes=[429, 503] + ) ) async with AsyncWebCrawler(config=browser_conf) as crawler: diff --git a/docs/md_v2/core/quickstart.md b/docs/md_v2/core/quickstart.md index c4e6561e..0d99bd3c 100644 --- a/docs/md_v2/core/quickstart.md +++ b/docs/md_v2/core/quickstart.md @@ -1,7 +1,3 @@ -Below is the **revised Quickstart** guide with the **Installation** section removed, plus an updated **dynamic content** crawl example that uses `BrowserConfig` and `CrawlerRunConfig` (instead of passing parameters directly to `arun()`). Everything else remains as before. - ---- - # Getting Started with Crawl4AI Welcome to **Crawl4AI**, an open-source LLM-friendly Web Crawler & Scraper. In this tutorial, you’ll: @@ -254,7 +250,39 @@ if __name__ == "__main__": --- -## 7. Dynamic Content Example +## 7. Multi-URL Concurrency (Preview) + +If you need to crawl multiple URLs in **parallel**, you can use `arun_many()`. By default, Crawl4AI employs a **MemoryAdaptiveDispatcher**, automatically adjusting concurrency based on system resources. Here’s a quick glimpse: + +```python +import asyncio +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode + +async def quick_parallel_example(): + urls = [ + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3" + ] + + run_conf = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + async with AsyncWebCrawler() as crawler: + results = await crawler.arun_many(urls, config=run_conf) + for res in results: + if res.success: + print(f"[OK] {res.url}, length: {len(res.markdown_v2.raw_markdown)}") + else: + print(f"[ERROR] {res.url} => {res.error_message}") + +if __name__ == "__main__": + asyncio.run(quick_parallel_example()) +``` + +For more advanced concurrency (e.g., a **semaphore-based** approach, **adaptive memory usage throttling**, or customized rate limiting), see [Advanced Multi-URL Crawling](../advanced/multi-url-crawling.md). + + +## 8. Dynamic Content Example Some sites require multiple “page clicks” or dynamic JavaScript updates. Below is an example showing how to **click** a “Next Page” button and wait for new commits to load on GitHub, using **`BrowserConfig`** and **`CrawlerRunConfig`**: @@ -343,7 +371,7 @@ if __name__ == "__main__": --- -## 8. Next Steps +## 9. Next Steps Congratulations! You have: diff --git a/mkdocs.yml b/mkdocs.yml index ccf2aff5..255492e3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -44,6 +44,7 @@ nav: - API Reference: - "AsyncWebCrawler": "api/async-webcrawler.md" - "arun()": "api/arun.md" + - "arun_many()": "api/arun_many.md" - "Browser & Crawler Config": "api/parameters.md" - "CrawlResult": "api/crawl-result.md" - "Strategies": "api/strategies.md" diff --git a/tests/async/test_dispatchers.py b/tests/async/test_dispatchers.py new file mode 100644 index 00000000..5c3788e5 --- /dev/null +++ b/tests/async/test_dispatchers.py @@ -0,0 +1,147 @@ +import pytest +import asyncio, time +from crawl4ai import ( + AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, + MemoryAdaptiveDispatcher, SemaphoreDispatcher, + RateLimiter, CrawlerMonitor, DisplayMode, CacheMode +) + +@pytest.fixture +def browser_config(): + return BrowserConfig( + headless=True, + verbose=False + ) + +@pytest.fixture +def run_config(): + return CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + verbose=False + ) + +@pytest.fixture +def test_urls(): + return [ + "http://example.com", + "http://example.com/page1", + "http://example.com/page2" + ] + +@pytest.mark.asyncio +class TestDispatchStrategies: + + async def test_memory_adaptive_basic(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=2, + check_interval=0.1 + ) + results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + assert len(results) == len(test_urls) + assert all(r.success for r in results) + + async def test_memory_adaptive_with_rate_limit(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=70.0, + max_session_permit=2, + check_interval=0.1, + rate_limiter=RateLimiter( + base_delay=(0.1, 0.2), + max_delay=1.0, + max_retries=2 + ) + ) + results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + assert len(results) == len(test_urls) + assert all(r.success for r in results) + + async def test_semaphore_basic(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = SemaphoreDispatcher( + semaphore_count=2 + ) + results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + assert len(results) == len(test_urls) + assert all(r.success for r in results) + + async def test_semaphore_with_rate_limit(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = SemaphoreDispatcher( + semaphore_count=2, + rate_limiter=RateLimiter( + base_delay=(0.1, 0.2), + max_delay=1.0, + max_retries=2 + ) + ) + results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + assert len(results) == len(test_urls) + assert all(r.success for r in results) + + async def test_memory_adaptive_memory_error(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=1.0, # Set unrealistically low threshold + max_session_permit=2, + check_interval=0.1, + memory_wait_timeout=1.0 # Short timeout for testing + ) + with pytest.raises(MemoryError): + await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + + async def test_empty_urls(self, browser_config, run_config): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2) + results = await crawler.arun_many([], config=run_config, dispatcher=dispatcher) + assert len(results) == 0 + + async def test_single_url(self, browser_config, run_config): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2) + results = await crawler.arun_many(["http://example.com"], config=run_config, dispatcher=dispatcher) + assert len(results) == 1 + assert results[0].success + + async def test_invalid_urls(self, browser_config, run_config): + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2) + results = await crawler.arun_many(["http://invalid.url.that.doesnt.exist"], config=run_config, dispatcher=dispatcher) + assert len(results) == 1 + assert not results[0].success + + async def test_rate_limit_backoff(self, browser_config, run_config): + urls = ["http://example.com"] * 5 # Multiple requests to same domain + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + max_session_permit=2, + rate_limiter=RateLimiter( + base_delay=(0.1, 0.2), + max_delay=1.0, + max_retries=2, + rate_limit_codes=[200] # Force rate limiting for testing + ) + ) + start_time = time.time() + results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher) + duration = time.time() - start_time + assert len(results) == len(urls) + assert duration > 1.0 # Ensure rate limiting caused delays + + async def test_monitor_integration(self, browser_config, run_config, test_urls): + async with AsyncWebCrawler(config=browser_config) as crawler: + monitor = CrawlerMonitor(max_visible_rows=5, display_mode=DisplayMode.DETAILED) + dispatcher = MemoryAdaptiveDispatcher( + max_session_permit=2, + monitor=monitor + ) + results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher) + assert len(results) == len(test_urls) + # Check monitor stats + assert len(monitor.stats) == len(test_urls) + assert all(stat.end_time is not None for stat in monitor.stats.values()) + +if __name__ == "__main__": + pytest.main([__file__, "-v", "--asyncio-mode=auto"]) \ No newline at end of file diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py deleted file mode 100644 index b354504e..00000000 --- a/tests/test_dispatcher.py +++ /dev/null @@ -1,38 +0,0 @@ -import pytest -import asyncio -from crawl4ai.async_webcrawler import AsyncWebCrawler -from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig, RateLimitConfig -from crawl4ai.dispatcher import DisplayMode - -@pytest.mark.asyncio -async def test_crawler_with_dispatcher(): - # Create test URLs - urls = [f"https://example.com/page_{i}" for i in range(5)] - - # Configure browser - browser_config = BrowserConfig(headless=True, verbose=False) - - # Configure crawler with rate limiting - run_config = CrawlerRunConfig( - enable_rate_limiting=True, - rate_limit_config=RateLimitConfig( - base_delay=(1.0, 2.0), - max_delay=30.0, - max_retries=2, - rate_limit_codes=[429, 503] - ), - memory_threshold_percent=70.0, - check_interval=0.5, - max_session_permit=3, - display_mode=DisplayMode.DETAILED.value - ) - - async with AsyncWebCrawler(config=browser_config) as crawler: - results = await crawler.arun_many(urls, config=run_config) - - # Basic validation - assert len(results) == len(urls) - for result in results: - assert result is not None - # Note: example.com URLs will fail, which is expected for this test - assert not result.success # We expect these to fail since they're fake URLs