feat(dispatcher): add streaming support for URL processing

Add new streaming capability to the MemoryAdaptiveDispatcher and AsyncWebCrawler
to allow processing URLs with real-time result streaming. This enables
processing results as they become available rather than waiting for all
URLs to complete.

Key changes:
- Add run_urls_stream method to MemoryAdaptiveDispatcher
- Update AsyncWebCrawler.arun_many to support streaming mode
- Add result queue for better result handling
- Improve type hints and documentation

BREAKING CHANGE: The return type of arun_many now depends on the 'stream'
parameter, returning either List[CrawlResult] or AsyncGenerator[CrawlResult, None]
This commit is contained in:
UncleCode
2025-01-19 14:03:34 +08:00
parent 3d09b6a221
commit e363234172
5 changed files with 817 additions and 83 deletions

View File

@@ -42,6 +42,12 @@ from .utils import (
get_error_context,
)
from typing import Union, AsyncGenerator, List, TypeVar
from collections.abc import AsyncGenerator
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
from .__version__ import __version__ as crawl4ai_version
@@ -693,8 +699,9 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
stream: bool = False,
# Legacy parameters maintained for backwards compatibility
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
@@ -707,46 +714,40 @@ class AsyncWebCrawler:
pdf: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> List[CrawlResult]:
**kwargs
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
Migration Guide:
Old way (deprecated):
results = await crawler.arun_many(
urls,
word_count_threshold=200,
screenshot=True,
...
)
New way (recommended):
config = CrawlerRunConfig(
word_count_threshold=200,
screenshot=True,
dispatcher_config=DispatcherConfig(
enable_rate_limiting=True,
rate_limit_config=RateLimitConfig(...),
),
...
)
results = await crawler.arun_many(
urls,
config=config,
dispatcher_strategy=MemoryAdaptiveDispatcher # Optional, this is the default
)
Args:
urls: List of URLs to crawl
config: Configuration object controlling crawl behavior for all URLs
dispatcher_strategy: The dispatcher strategy class to use. Defaults to MemoryAdaptiveDispatcher.
[other parameters maintained for backwards compatibility]
urls: List of URLs to crawl
config: Configuration object controlling crawl behavior for all URLs
dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher
stream: If True, returns an AsyncGenerator yielding results as they complete
[other parameters maintained for backwards compatibility]
Returns:
List[CrawlResult]: Results for each URL
Union[List[CrawlResult], AsyncGenerator[CrawlResult, None]]:
Either a list of all results or an async generator yielding results
Examples:
# Batch processing (default)
results = await crawler.arun_many(
urls=["https://example1.com", "https://example2.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
for result in results:
print(f"Processed {result.url}: {len(result.markdown)} chars")
# Streaming results
async for result in await crawler.arun_many(
urls=["https://example1.com", "https://example2.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS),
stream=True
):
print(f"Processed {result.url}: {len(result.markdown)} chars")
"""
# Create config if not provided
if config is None:
config = CrawlerRunConfig(
word_count_threshold=word_count_threshold,
@@ -762,14 +763,6 @@ class AsyncWebCrawler:
**kwargs,
)
# # Initialize the dispatcher with the selected strategy
# dispatcher = dispatcher_strategy(self, config.dispatcher_config)
# memory_monitor: CrawlerMonitor = None
# if config.dispatcher_config.enable_monitor:
# memory_monitor = CrawlerMonitor(max_visible_rows=config.dispatcher_config.max_display_rows, display_mode=config.dispatcher_config.display_mode)
# Create default dispatcher if none provided
if dispatcher is None:
dispatcher = MemoryAdaptiveDispatcher(
rate_limiter=RateLimiter(
@@ -777,26 +770,27 @@ class AsyncWebCrawler:
),
)
# Run the URLs through the dispatcher
_results: List[CrawlerTaskResult] = await dispatcher.run_urls(
crawler=self, urls=urls, config=config
transform_result = lambda task_result: (
setattr(task_result.result, 'dispatch_result',
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
)
) or task_result.result
)
results: List[CrawlResult] = []
for res in _results:
_res: CrawlResult = res.result
dispatch_result: DispatchResult = DispatchResult(
task_id=res.task_id,
memory_usage=res.memory_usage,
peak_memory=res.peak_memory,
start_time=res.start_time,
end_time=res.end_time,
error_message=res.error_message,
)
_res.dispatch_result = dispatch_result
results.append(_res)
return results
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):
yield transform_result(task_result)
return result_transformer()
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
async def aclear_cache(self):
"""Clear the cache database."""