From e36323417283e780c0615bb34d89d54686bbb57c Mon Sep 17 00:00:00 2001 From: UncleCode Date: Sun, 19 Jan 2025 14:03:34 +0800 Subject: [PATCH] feat(dispatcher): add streaming support for URL processing Add new streaming capability to the MemoryAdaptiveDispatcher and AsyncWebCrawler to allow processing URLs with real-time result streaming. This enables processing results as they become available rather than waiting for all URLs to complete. Key changes: - Add run_urls_stream method to MemoryAdaptiveDispatcher - Update AsyncWebCrawler.arun_many to support streaming mode - Add result queue for better result handling - Improve type hints and documentation BREAKING CHANGE: The return type of arun_many now depends on the 'stream' parameter, returning either List[CrawlResult] or AsyncGenerator[CrawlResult, None] --- crawl4ai/async_dispatcher.py | 105 ++++- crawl4ai/async_dispatcher_.py | 588 +++++++++++++++++++++++++ crawl4ai/async_webcrawler.py | 114 +++-- tests/20241401/test_stream.py | 54 +++ tests/20241401/test_stream_dispatch.py | 39 ++ 5 files changed, 817 insertions(+), 83 deletions(-) create mode 100644 crawl4ai/async_dispatcher_.py create mode 100644 tests/20241401/test_stream.py create mode 100644 tests/20241401/test_stream_dispatch.py diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index 64578bf6..ed40b8b4 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -14,7 +14,7 @@ from rich.table import Table from rich.console import Console from rich import box from datetime import datetime, timedelta - +from collections.abc import AsyncGenerator import time import psutil import asyncio @@ -25,6 +25,7 @@ import random from abc import ABC, abstractmethod + class RateLimiter: def __init__( self, @@ -329,6 +330,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): self.check_interval = check_interval self.max_session_permit = max_session_permit self.memory_wait_timeout = memory_wait_timeout + self.result_queue = asyncio.Queue() # Queue for storing results async def crawl_url( self, @@ -362,7 +364,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - return CrawlerTaskResult( + result = CrawlerTaskResult( task_id=task_id, url=url, result=result, @@ -372,6 +374,8 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): end_time=datetime.now(), error_message=error_message, ) + await self.result_queue.put(result) + return result if not result.success: error_message = result.error_message @@ -416,32 +420,82 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): urls: List[str], crawler: "AsyncWebCrawler", # noqa: F821 config: CrawlerRunConfig, - ) -> List[CrawlerTaskResult]: - self.crawler = crawler + ) -> List[CrawlerTaskResult]: + self.crawler = crawler + if self.monitor: + self.monitor.start() + + try: + pending_tasks = [] + active_tasks = [] + task_queue = [] + + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task_queue.append((url, task_id)) + + while task_queue or active_tasks: + wait_start_time = time.time() + while len(active_tasks) < self.max_session_permit and task_queue: + if psutil.virtual_memory().percent >= self.memory_threshold_percent: + # Check if we've exceeded the timeout + if time.time() - wait_start_time > self.memory_wait_timeout: + raise MemoryError( + f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" + ) + await asyncio.sleep(self.check_interval) + continue + + url, task_id = task_queue.pop(0) + task = asyncio.create_task(self.crawl_url(url, config, task_id)) + active_tasks.append(task) + + if not active_tasks: + await asyncio.sleep(self.check_interval) + continue + + done, pending = await asyncio.wait( + active_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + pending_tasks.extend(done) + active_tasks = list(pending) + + return await asyncio.gather(*pending_tasks) + finally: + if self.monitor: + self.monitor.stop() + + async def run_urls_stream( + self, + urls: List[str], + crawler: "AsyncWebCrawler", + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlerTaskResult, None]: + self.crawler = crawler if self.monitor: self.monitor.start() try: - pending_tasks = [] active_tasks = [] task_queue = [] + completed_count = 0 + total_urls = len(urls) + # Initialize task queue for url in urls: task_id = str(uuid.uuid4()) if self.monitor: self.monitor.add_task(task_id, url) task_queue.append((url, task_id)) - while task_queue or active_tasks: - wait_start_time = time.time() + while completed_count < total_urls: + # Start new tasks if memory permits while len(active_tasks) < self.max_session_permit and task_queue: if psutil.virtual_memory().percent >= self.memory_threshold_percent: - # Check if we've exceeded the timeout - if time.time() - wait_start_time > self.memory_wait_timeout: - raise MemoryError( - f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" - ) await asyncio.sleep(self.check_interval) continue @@ -449,23 +503,28 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): task = asyncio.create_task(self.crawl_url(url, config, task_id)) active_tasks.append(task) - if not active_tasks: + if not active_tasks and not task_queue: + break + + # Wait for any task to complete and yield results + if active_tasks: + done, pending = await asyncio.wait( + active_tasks, + timeout=0.1, + return_when=asyncio.FIRST_COMPLETED + ) + for completed_task in done: + result = await completed_task + completed_count += 1 + yield result + active_tasks = list(pending) + else: await asyncio.sleep(self.check_interval) - continue - done, pending = await asyncio.wait( - active_tasks, return_when=asyncio.FIRST_COMPLETED - ) - - pending_tasks.extend(done) - active_tasks = list(pending) - - return await asyncio.gather(*pending_tasks) finally: if self.monitor: self.monitor.stop() - class SemaphoreDispatcher(BaseDispatcher): def __init__( self, diff --git a/crawl4ai/async_dispatcher_.py b/crawl4ai/async_dispatcher_.py new file mode 100644 index 00000000..64578bf6 --- /dev/null +++ b/crawl4ai/async_dispatcher_.py @@ -0,0 +1,588 @@ +from typing import Dict, Optional, List, Tuple +from .async_configs import CrawlerRunConfig +from .models import ( + CrawlResult, + CrawlerTaskResult, + CrawlStatus, + DisplayMode, + CrawlStats, + DomainState, +) + +from rich.live import Live +from rich.table import Table +from rich.console import Console +from rich import box +from datetime import datetime, timedelta + +import time +import psutil +import asyncio +import uuid + +from urllib.parse import urlparse +import random +from abc import ABC, abstractmethod + + +class RateLimiter: + def __init__( + self, + base_delay: Tuple[float, float] = (1.0, 3.0), + max_delay: float = 60.0, + max_retries: int = 3, + rate_limit_codes: List[int] = None, + ): + self.base_delay = base_delay + self.max_delay = max_delay + self.max_retries = max_retries + self.rate_limit_codes = rate_limit_codes or [429, 503] + self.domains: Dict[str, DomainState] = {} + + def get_domain(self, url: str) -> str: + return urlparse(url).netloc + + async def wait_if_needed(self, url: str) -> None: + domain = self.get_domain(url) + state = self.domains.get(domain) + + if not state: + self.domains[domain] = DomainState() + state = self.domains[domain] + + now = time.time() + if state.last_request_time: + wait_time = max(0, state.current_delay - (now - state.last_request_time)) + if wait_time > 0: + await asyncio.sleep(wait_time) + + # Random delay within base range if no current delay + if state.current_delay == 0: + state.current_delay = random.uniform(*self.base_delay) + + state.last_request_time = time.time() + + def update_delay(self, url: str, status_code: int) -> bool: + domain = self.get_domain(url) + state = self.domains[domain] + + if status_code in self.rate_limit_codes: + state.fail_count += 1 + if state.fail_count > self.max_retries: + return False + + # Exponential backoff with random jitter + state.current_delay = min( + state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay + ) + else: + # Gradually reduce delay on success + state.current_delay = max( + random.uniform(*self.base_delay), state.current_delay * 0.75 + ) + state.fail_count = 0 + + return True + + +class CrawlerMonitor: + def __init__( + self, + max_visible_rows: int = 15, + display_mode: DisplayMode = DisplayMode.DETAILED, + ): + self.console = Console() + self.max_visible_rows = max_visible_rows + self.display_mode = display_mode + self.stats: Dict[str, CrawlStats] = {} + self.process = psutil.Process() + self.start_time = datetime.now() + self.live = Live(self._create_table(), refresh_per_second=2) + + def start(self): + self.live.start() + + def stop(self): + self.live.stop() + + def add_task(self, task_id: str, url: str): + self.stats[task_id] = CrawlStats( + task_id=task_id, url=url, status=CrawlStatus.QUEUED + ) + self.live.update(self._create_table()) + + def update_task(self, task_id: str, **kwargs): + if task_id in self.stats: + for key, value in kwargs.items(): + setattr(self.stats[task_id], key, value) + self.live.update(self._create_table()) + + def _create_aggregated_table(self) -> Table: + """Creates a compact table showing only aggregated statistics""" + table = Table( + box=box.ROUNDED, + title="Crawler Status Overview", + title_style="bold magenta", + header_style="bold blue", + show_lines=True, + ) + + # Calculate statistics + total_tasks = len(self.stats) + queued = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED + ) + in_progress = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS + ) + completed = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED + ) + failed = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED + ) + + # Memory statistics + current_memory = self.process.memory_info().rss / (1024 * 1024) + total_task_memory = sum(stat.memory_usage for stat in self.stats.values()) + peak_memory = max( + (stat.peak_memory for stat in self.stats.values()), default=0.0 + ) + + # Duration + duration = datetime.now() - self.start_time + + # Create status row + table.add_column("Status", style="bold cyan") + table.add_column("Count", justify="right") + table.add_column("Percentage", justify="right") + + table.add_row("Total Tasks", str(total_tasks), "100%") + table.add_row( + "[yellow]In Queue[/yellow]", + str(queued), + f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + ) + table.add_row( + "[blue]In Progress[/blue]", + str(in_progress), + f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + ) + table.add_row( + "[green]Completed[/green]", + str(completed), + f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + ) + table.add_row( + "[red]Failed[/red]", + str(failed), + f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + ) + + # Add memory information + table.add_section() + table.add_row( + "[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", "" + ) + table.add_row( + "[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", "" + ) + table.add_row( + "[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", "" + ) + table.add_row( + "[yellow]Runtime[/yellow]", + str(timedelta(seconds=int(duration.total_seconds()))), + "", + ) + + return table + + def _create_detailed_table(self) -> Table: + table = Table( + box=box.ROUNDED, + title="Crawler Performance Monitor", + title_style="bold magenta", + header_style="bold blue", + ) + + # Add columns + table.add_column("Task ID", style="cyan", no_wrap=True) + table.add_column("URL", style="cyan", no_wrap=True) + table.add_column("Status", style="bold") + table.add_column("Memory (MB)", justify="right") + table.add_column("Peak (MB)", justify="right") + table.add_column("Duration", justify="right") + table.add_column("Info", style="italic") + + # Add summary row + total_memory = sum(stat.memory_usage for stat in self.stats.values()) + active_count = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS + ) + completed_count = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED + ) + failed_count = sum( + 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED + ) + + table.add_row( + "[bold yellow]SUMMARY", + f"Total: {len(self.stats)}", + f"Active: {active_count}", + f"{total_memory:.1f}", + f"{self.process.memory_info().rss / (1024 * 1024):.1f}", + str( + timedelta( + seconds=int((datetime.now() - self.start_time).total_seconds()) + ) + ), + f"✓{completed_count} ✗{failed_count}", + style="bold", + ) + + table.add_section() + + # Add rows for each task + visible_stats = sorted( + self.stats.values(), + key=lambda x: ( + x.status != CrawlStatus.IN_PROGRESS, + x.status != CrawlStatus.QUEUED, + x.end_time or datetime.max, + ), + )[: self.max_visible_rows] + + for stat in visible_stats: + status_style = { + CrawlStatus.QUEUED: "white", + CrawlStatus.IN_PROGRESS: "yellow", + CrawlStatus.COMPLETED: "green", + CrawlStatus.FAILED: "red", + }[stat.status] + + table.add_row( + stat.task_id[:8], # Show first 8 chars of task ID + stat.url[:40] + "..." if len(stat.url) > 40 else stat.url, + f"[{status_style}]{stat.status.value}[/{status_style}]", + f"{stat.memory_usage:.1f}", + f"{stat.peak_memory:.1f}", + stat.duration, + stat.error_message[:40] if stat.error_message else "", + ) + + return table + + def _create_table(self) -> Table: + """Creates the appropriate table based on display mode""" + if self.display_mode == DisplayMode.AGGREGATED: + return self._create_aggregated_table() + return self._create_detailed_table() + + +class BaseDispatcher(ABC): + def __init__( + self, + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None, + ): + self.crawler = None + self._domain_last_hit: Dict[str, float] = {} + self.concurrent_sessions = 0 + self.rate_limiter = rate_limiter + self.monitor = monitor + + @abstractmethod + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + monitor: Optional[CrawlerMonitor] = None, + ) -> CrawlerTaskResult: + pass + + @abstractmethod + async def run_urls( + self, + urls: List[str], + crawler: "AsyncWebCrawler", # noqa: F821 + config: CrawlerRunConfig, + monitor: Optional[CrawlerMonitor] = None, + ) -> List[CrawlerTaskResult]: + pass + + +class MemoryAdaptiveDispatcher(BaseDispatcher): + def __init__( + self, + memory_threshold_percent: float = 90.0, + check_interval: float = 1.0, + max_session_permit: int = 20, + memory_wait_timeout: float = 300.0, # 5 minutes default timeout + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None, + ): + super().__init__(rate_limiter, monitor) + self.memory_threshold_percent = memory_threshold_percent + self.check_interval = check_interval + self.max_session_permit = max_session_permit + self.memory_wait_timeout = memory_wait_timeout + + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + ) -> CrawlerTaskResult: + start_time = datetime.now() + error_message = "" + memory_usage = peak_memory = 0.0 + + try: + if self.monitor: + self.monitor.update_task( + task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time + ) + self.concurrent_sessions += 1 + + if self.rate_limiter: + await self.rate_limiter.wait_if_needed(url) + + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + result = await self.crawler.arun(url, config=config, session_id=task_id) + end_memory = process.memory_info().rss / (1024 * 1024) + + memory_usage = peak_memory = end_memory - start_memory + + if self.rate_limiter and result.status_code: + if not self.rate_limiter.update_delay(url, result.status_code): + error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=datetime.now(), + error_message=error_message, + ) + + if not result.success: + error_message = result.error_message + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + elif self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) + + except Exception as e: + error_message = str(e) + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + result = CrawlResult( + url=url, html="", metadata={}, success=False, error_message=str(e) + ) + + finally: + end_time = datetime.now() + if self.monitor: + self.monitor.update_task( + task_id, + end_time=end_time, + memory_usage=memory_usage, + peak_memory=peak_memory, + error_message=error_message, + ) + self.concurrent_sessions -= 1 + + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=end_time, + error_message=error_message, + ) + + async def run_urls( + self, + urls: List[str], + crawler: "AsyncWebCrawler", # noqa: F821 + config: CrawlerRunConfig, + ) -> List[CrawlerTaskResult]: + self.crawler = crawler + + if self.monitor: + self.monitor.start() + + try: + pending_tasks = [] + active_tasks = [] + task_queue = [] + + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task_queue.append((url, task_id)) + + while task_queue or active_tasks: + wait_start_time = time.time() + while len(active_tasks) < self.max_session_permit and task_queue: + if psutil.virtual_memory().percent >= self.memory_threshold_percent: + # Check if we've exceeded the timeout + if time.time() - wait_start_time > self.memory_wait_timeout: + raise MemoryError( + f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" + ) + await asyncio.sleep(self.check_interval) + continue + + url, task_id = task_queue.pop(0) + task = asyncio.create_task(self.crawl_url(url, config, task_id)) + active_tasks.append(task) + + if not active_tasks: + await asyncio.sleep(self.check_interval) + continue + + done, pending = await asyncio.wait( + active_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + pending_tasks.extend(done) + active_tasks = list(pending) + + return await asyncio.gather(*pending_tasks) + finally: + if self.monitor: + self.monitor.stop() + + +class SemaphoreDispatcher(BaseDispatcher): + def __init__( + self, + semaphore_count: int = 5, + max_session_permit: int = 20, + rate_limiter: Optional[RateLimiter] = None, + monitor: Optional[CrawlerMonitor] = None, + ): + super().__init__(rate_limiter, monitor) + self.semaphore_count = semaphore_count + self.max_session_permit = max_session_permit + + async def crawl_url( + self, + url: str, + config: CrawlerRunConfig, + task_id: str, + semaphore: asyncio.Semaphore = None, + ) -> CrawlerTaskResult: + start_time = datetime.now() + error_message = "" + memory_usage = peak_memory = 0.0 + + try: + if self.monitor: + self.monitor.update_task( + task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time + ) + + if self.rate_limiter: + await self.rate_limiter.wait_if_needed(url) + + async with semaphore: + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + result = await self.crawler.arun(url, config=config, session_id=task_id) + end_memory = process.memory_info().rss / (1024 * 1024) + + memory_usage = peak_memory = end_memory - start_memory + + if self.rate_limiter and result.status_code: + if not self.rate_limiter.update_delay(url, result.status_code): + error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=datetime.now(), + error_message=error_message, + ) + + if not result.success: + error_message = result.error_message + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + elif self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) + + except Exception as e: + error_message = str(e) + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + result = CrawlResult( + url=url, html="", metadata={}, success=False, error_message=str(e) + ) + + finally: + end_time = datetime.now() + if self.monitor: + self.monitor.update_task( + task_id, + end_time=end_time, + memory_usage=memory_usage, + peak_memory=peak_memory, + error_message=error_message, + ) + + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=result, + memory_usage=memory_usage, + peak_memory=peak_memory, + start_time=start_time, + end_time=end_time, + error_message=error_message, + ) + + async def run_urls( + self, + crawler: "AsyncWebCrawler", # noqa: F821 + urls: List[str], + config: CrawlerRunConfig, + ) -> List[CrawlerTaskResult]: + self.crawler = crawler + if self.monitor: + self.monitor.start() + + try: + semaphore = asyncio.Semaphore(self.semaphore_count) + tasks = [] + + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task = asyncio.create_task( + self.crawl_url(url, config, task_id, semaphore) + ) + tasks.append(task) + + return await asyncio.gather(*tasks, return_exceptions=True) + finally: + if self.monitor: + self.monitor.stop() diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 929fa924..99f6b9b8 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -42,6 +42,12 @@ from .utils import ( get_error_context, ) +from typing import Union, AsyncGenerator, List, TypeVar +from collections.abc import AsyncGenerator + +CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult) +RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] + from .__version__ import __version__ as crawl4ai_version @@ -693,8 +699,9 @@ class AsyncWebCrawler: async def arun_many( self, urls: List[str], - config: Optional[CrawlerRunConfig] = None, + config: Optional[CrawlerRunConfig] = None, dispatcher: Optional[BaseDispatcher] = None, + stream: bool = False, # Legacy parameters maintained for backwards compatibility word_count_threshold=MIN_WORD_THRESHOLD, extraction_strategy: ExtractionStrategy = None, @@ -707,46 +714,40 @@ class AsyncWebCrawler: pdf: bool = False, user_agent: str = None, verbose=True, - **kwargs, - ) -> List[CrawlResult]: + **kwargs + ) -> RunManyReturn: """ Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy. - Migration Guide: - Old way (deprecated): - results = await crawler.arun_many( - urls, - word_count_threshold=200, - screenshot=True, - ... - ) - - New way (recommended): - config = CrawlerRunConfig( - word_count_threshold=200, - screenshot=True, - dispatcher_config=DispatcherConfig( - enable_rate_limiting=True, - rate_limit_config=RateLimitConfig(...), - ), - ... - ) - results = await crawler.arun_many( - urls, - config=config, - dispatcher_strategy=MemoryAdaptiveDispatcher # Optional, this is the default - ) - Args: - urls: List of URLs to crawl - config: Configuration object controlling crawl behavior for all URLs - dispatcher_strategy: The dispatcher strategy class to use. Defaults to MemoryAdaptiveDispatcher. - [other parameters maintained for backwards compatibility] + urls: List of URLs to crawl + config: Configuration object controlling crawl behavior for all URLs + dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher + stream: If True, returns an AsyncGenerator yielding results as they complete + [other parameters maintained for backwards compatibility] Returns: - List[CrawlResult]: Results for each URL + Union[List[CrawlResult], AsyncGenerator[CrawlResult, None]]: + Either a list of all results or an async generator yielding results + + Examples: + + # Batch processing (default) + results = await crawler.arun_many( + urls=["https://example1.com", "https://example2.com"], + config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + ) + for result in results: + print(f"Processed {result.url}: {len(result.markdown)} chars") + + # Streaming results + async for result in await crawler.arun_many( + urls=["https://example1.com", "https://example2.com"], + config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS), + stream=True + ): + print(f"Processed {result.url}: {len(result.markdown)} chars") """ - # Create config if not provided if config is None: config = CrawlerRunConfig( word_count_threshold=word_count_threshold, @@ -762,14 +763,6 @@ class AsyncWebCrawler: **kwargs, ) - # # Initialize the dispatcher with the selected strategy - # dispatcher = dispatcher_strategy(self, config.dispatcher_config) - - # memory_monitor: CrawlerMonitor = None - # if config.dispatcher_config.enable_monitor: - # memory_monitor = CrawlerMonitor(max_visible_rows=config.dispatcher_config.max_display_rows, display_mode=config.dispatcher_config.display_mode) - - # Create default dispatcher if none provided if dispatcher is None: dispatcher = MemoryAdaptiveDispatcher( rate_limiter=RateLimiter( @@ -777,26 +770,27 @@ class AsyncWebCrawler: ), ) - # Run the URLs through the dispatcher - _results: List[CrawlerTaskResult] = await dispatcher.run_urls( - crawler=self, urls=urls, config=config + transform_result = lambda task_result: ( + setattr(task_result.result, 'dispatch_result', + DispatchResult( + task_id=task_result.task_id, + memory_usage=task_result.memory_usage, + peak_memory=task_result.peak_memory, + start_time=task_result.start_time, + end_time=task_result.end_time, + error_message=task_result.error_message, + ) + ) or task_result.result ) - results: List[CrawlResult] = [] - for res in _results: - _res: CrawlResult = res.result - dispatch_result: DispatchResult = DispatchResult( - task_id=res.task_id, - memory_usage=res.memory_usage, - peak_memory=res.peak_memory, - start_time=res.start_time, - end_time=res.end_time, - error_message=res.error_message, - ) - _res.dispatch_result = dispatch_result - results.append(_res) - - return results + if stream: + async def result_transformer(): + async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config): + yield transform_result(task_result) + return result_transformer() + else: + _results = await dispatcher.run_urls(crawler=self, urls=urls, config=config) + return [transform_result(res) for res in _results] async def aclear_cache(self): """Clear the cache database.""" diff --git a/tests/20241401/test_stream.py b/tests/20241401/test_stream.py new file mode 100644 index 00000000..4baaa10a --- /dev/null +++ b/tests/20241401/test_stream.py @@ -0,0 +1,54 @@ +import os, sys +# append 2 parent directories to sys.path to import crawl4ai +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) +parent_parent_dir = os.path.dirname(parent_dir) +sys.path.append(parent_parent_dir) + +import asyncio +from crawl4ai import * + +async def test_crawler(): + # Setup configurations + browser_config = BrowserConfig(headless=True, verbose=False) + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="fixed", + min_word_threshold=0 + ) + ), + ) + + # Test URLs - mix of different sites + urls = [ + "http://example.com", + "http://example.org", + "http://example.net", + ] * 10 # 15 total URLs + + async with AsyncWebCrawler(config=browser_config) as crawler: + print("\n=== Testing Streaming Mode ===") + async for result in await crawler.arun_many( + urls=urls, + config=crawler_config, + stream=True, + verbose=True + ): + print(f"Received result for: {result.url} - Success: {result.success}") + + print("\n=== Testing Batch Mode ===") + results = await crawler.arun_many( + urls=urls, + config=crawler_config, + stream=False, + verbose=True + ) + print(f"Received all {len(results)} results at once") + for result in results: + print(f"Batch result for: {result.url} - Success: {result.success}") + +if __name__ == "__main__": + asyncio.run(test_crawler()) \ No newline at end of file diff --git a/tests/20241401/test_stream_dispatch.py b/tests/20241401/test_stream_dispatch.py new file mode 100644 index 00000000..0b5d004c --- /dev/null +++ b/tests/20241401/test_stream_dispatch.py @@ -0,0 +1,39 @@ +import os, sys +# append 2 parent directories to sys.path to import crawl4ai +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) +parent_parent_dir = os.path.dirname(parent_dir) +sys.path.append(parent_parent_dir) + + +import asyncio +from typing import List +from crawl4ai import * +from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher + +async def test_streaming(): + browser_config = BrowserConfig(headless=True, verbose=True) + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + # content_filter=PruningContentFilter( + # threshold=0.48, + # threshold_type="fixed", + # min_word_threshold=0 + # ) + ), + ) + + urls = ["http://example.com"] * 10 + + async with AsyncWebCrawler(config=browser_config) as crawler: + dispatcher = MemoryAdaptiveDispatcher( + max_session_permit=5, + check_interval=0.5 + ) + + async for result in dispatcher.run_urls_stream(urls, crawler, crawler_config): + print(f"Got result for {result.url} - Success: {result.result.success}") + +if __name__ == "__main__": + asyncio.run(test_streaming()) \ No newline at end of file