feat(config): add streaming support and config cloning

Add streaming capability to crawler configurations and introduce clone() methods
for both BrowserConfig and CrawlerRunConfig to support immutable config updates.
Move stream parameter from arun_many() method to CrawlerRunConfig.

BREAKING CHANGE: Removed stream parameter from AsyncWebCrawler.arun_many() method.
Use config.stream=True instead.
This commit is contained in:
UncleCode
2025-01-19 17:51:47 +08:00
parent 1221be30a3
commit 91463e34f1
3 changed files with 87 additions and 10 deletions

View File

@@ -186,6 +186,50 @@ class BrowserConfig:
extra_args=kwargs.get("extra_args", []),
)
def to_dict(self):
return {
"browser_type": self.browser_type,
"headless": self.headless,
"use_managed_browser": self.use_managed_browser,
"use_persistent_context": self.use_persistent_context,
"user_data_dir": self.user_data_dir,
"chrome_channel": self.chrome_channel,
"channel": self.channel,
"proxy": self.proxy,
"proxy_config": self.proxy_config,
"viewport_width": self.viewport_width,
"viewport_height": self.viewport_height,
"accept_downloads": self.accept_downloads,
"downloads_path": self.downloads_path,
"storage_state": self.storage_state,
"ignore_https_errors": self.ignore_https_errors,
"java_script_enabled": self.java_script_enabled,
"cookies": self.cookies,
"headers": self.headers,
"user_agent": self.user_agent,
"user_agent_mode": self.user_agent_mode,
"user_agent_generator_config": self.user_agent_generator_config,
"text_mode": self.text_mode,
"light_mode": self.light_mode,
"extra_args": self.extra_args,
"sleep_on_close": self.sleep_on_close,
"verbose": self.verbose,
"debugging_port": self.debugging_port,
}
def clone(self, **kwargs):
"""Create a copy of this configuration with updated values.
Args:
**kwargs: Key-value pairs of configuration options to update
Returns:
BrowserConfig: A new instance with the specified updates
"""
config_dict = self.to_dict()
config_dict.update(kwargs)
return BrowserConfig.from_kwargs(config_dict)
class CrawlerRunConfig:
"""
@@ -319,7 +363,12 @@ class CrawlerRunConfig:
log_console (bool): If True, log console messages from the page.
Default: False.
# Streaming Parameters
stream (bool): If True, enables streaming of crawled URLs as they are processed when used with arun_many.
Default: False.
# Optional Parameters
stream (bool): If True, stream the page content as it is being loaded.
url: str = None # This is not a compulsory parameter
"""
@@ -387,6 +436,8 @@ class CrawlerRunConfig:
# Debugging and Logging Parameters
verbose: bool = True,
log_console: bool = False,
# Streaming Parameters
stream: bool = False,
url: str = None,
):
self.url = url
@@ -463,6 +514,9 @@ class CrawlerRunConfig:
self.verbose = verbose
self.log_console = log_console
# Streaming Parameters
self.stream = stream
# Validate type of extraction strategy and chunking strategy if they are provided
if self.extraction_strategy is not None and not isinstance(
self.extraction_strategy, ExtractionStrategy
@@ -555,6 +609,8 @@ class CrawlerRunConfig:
# Debugging and Logging Parameters
verbose=kwargs.get("verbose", True),
log_console=kwargs.get("log_console", False),
# Streaming Parameters
stream=kwargs.get("stream", False),
url=kwargs.get("url"),
)
@@ -583,7 +639,6 @@ class CrawlerRunConfig:
"no_cache_read": self.no_cache_read,
"no_cache_write": self.no_cache_write,
"shared_data": self.shared_data,
# Page Navigation and Timing Parameters
"wait_until": self.wait_until,
"page_timeout": self.page_timeout,
"wait_for": self.wait_for,
@@ -616,5 +671,32 @@ class CrawlerRunConfig:
"exclude_domains": self.exclude_domains,
"verbose": self.verbose,
"log_console": self.log_console,
"stream": self.stream,
"url": self.url,
}
def clone(self, **kwargs):
"""Create a copy of this configuration with updated values.
Args:
**kwargs: Key-value pairs of configuration options to update
Returns:
CrawlerRunConfig: A new instance with the specified updates
Example:
```python
# Create a new config with streaming enabled
stream_config = config.clone(stream=True)
# Create a new config with multiple updates
new_config = config.clone(
stream=True,
cache_mode=CacheMode.BYPASS,
verbose=True
)
```
"""
config_dict = self.to_dict()
config_dict.update(kwargs)
return CrawlerRunConfig.from_kwargs(config_dict)

View File

@@ -701,7 +701,6 @@ class AsyncWebCrawler:
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
stream: bool = False,
# Legacy parameters maintained for backwards compatibility
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
@@ -723,7 +722,6 @@ class AsyncWebCrawler:
urls: List of URLs to crawl
config: Configuration object controlling crawl behavior for all URLs
dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher
stream: If True, returns an AsyncGenerator yielding results as they complete
[other parameters maintained for backwards compatibility]
Returns:
@@ -743,8 +741,7 @@ class AsyncWebCrawler:
# Streaming results
async for result in await crawler.arun_many(
urls=["https://example1.com", "https://example2.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS),
stream=True
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=True),
):
print(f"Processed {result.url}: {len(result.markdown)} chars")
"""
@@ -783,6 +780,8 @@ class AsyncWebCrawler:
) or task_result.result
)
stream = config.stream
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):

View File

@@ -33,9 +33,7 @@ async def test_crawler():
print("\n=== Testing Streaming Mode ===")
async for result in await crawler.arun_many(
urls=urls,
config=crawler_config,
stream=True,
verbose=True
config=crawler_config.clone(stream=True),
):
print(f"Received result for: {result.url} - Success: {result.success}")
@@ -43,8 +41,6 @@ async def test_crawler():
results = await crawler.arun_many(
urls=urls,
config=crawler_config,
stream=False,
verbose=True
)
print(f"Received all {len(results)} results at once")
for result in results: