From 91463e34f1f9e78f71b6d2cba8049de227c69cd8 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Sun, 19 Jan 2025 17:51:47 +0800 Subject: [PATCH] feat(config): add streaming support and config cloning Add streaming capability to crawler configurations and introduce clone() methods for both BrowserConfig and CrawlerRunConfig to support immutable config updates. Move stream parameter from arun_many() method to CrawlerRunConfig. BREAKING CHANGE: Removed stream parameter from AsyncWebCrawler.arun_many() method. Use config.stream=True instead. --- crawl4ai/async_configs.py | 84 ++++++++++++++++++++++++++++++++++- crawl4ai/async_webcrawler.py | 7 ++- tests/20241401/test_stream.py | 6 +-- 3 files changed, 87 insertions(+), 10 deletions(-) diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 4677e1d5..f4914726 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -186,6 +186,50 @@ class BrowserConfig: extra_args=kwargs.get("extra_args", []), ) + def to_dict(self): + return { + "browser_type": self.browser_type, + "headless": self.headless, + "use_managed_browser": self.use_managed_browser, + "use_persistent_context": self.use_persistent_context, + "user_data_dir": self.user_data_dir, + "chrome_channel": self.chrome_channel, + "channel": self.channel, + "proxy": self.proxy, + "proxy_config": self.proxy_config, + "viewport_width": self.viewport_width, + "viewport_height": self.viewport_height, + "accept_downloads": self.accept_downloads, + "downloads_path": self.downloads_path, + "storage_state": self.storage_state, + "ignore_https_errors": self.ignore_https_errors, + "java_script_enabled": self.java_script_enabled, + "cookies": self.cookies, + "headers": self.headers, + "user_agent": self.user_agent, + "user_agent_mode": self.user_agent_mode, + "user_agent_generator_config": self.user_agent_generator_config, + "text_mode": self.text_mode, + "light_mode": self.light_mode, + "extra_args": self.extra_args, + "sleep_on_close": self.sleep_on_close, + "verbose": self.verbose, + "debugging_port": self.debugging_port, + } + + def clone(self, **kwargs): + """Create a copy of this configuration with updated values. + + Args: + **kwargs: Key-value pairs of configuration options to update + + Returns: + BrowserConfig: A new instance with the specified updates + """ + config_dict = self.to_dict() + config_dict.update(kwargs) + return BrowserConfig.from_kwargs(config_dict) + class CrawlerRunConfig: """ @@ -319,7 +363,12 @@ class CrawlerRunConfig: log_console (bool): If True, log console messages from the page. Default: False. + # Streaming Parameters + stream (bool): If True, enables streaming of crawled URLs as they are processed when used with arun_many. + Default: False. + # Optional Parameters + stream (bool): If True, stream the page content as it is being loaded. url: str = None # This is not a compulsory parameter """ @@ -387,6 +436,8 @@ class CrawlerRunConfig: # Debugging and Logging Parameters verbose: bool = True, log_console: bool = False, + # Streaming Parameters + stream: bool = False, url: str = None, ): self.url = url @@ -463,6 +514,9 @@ class CrawlerRunConfig: self.verbose = verbose self.log_console = log_console + # Streaming Parameters + self.stream = stream + # Validate type of extraction strategy and chunking strategy if they are provided if self.extraction_strategy is not None and not isinstance( self.extraction_strategy, ExtractionStrategy @@ -555,6 +609,8 @@ class CrawlerRunConfig: # Debugging and Logging Parameters verbose=kwargs.get("verbose", True), log_console=kwargs.get("log_console", False), + # Streaming Parameters + stream=kwargs.get("stream", False), url=kwargs.get("url"), ) @@ -583,7 +639,6 @@ class CrawlerRunConfig: "no_cache_read": self.no_cache_read, "no_cache_write": self.no_cache_write, "shared_data": self.shared_data, - # Page Navigation and Timing Parameters "wait_until": self.wait_until, "page_timeout": self.page_timeout, "wait_for": self.wait_for, @@ -616,5 +671,32 @@ class CrawlerRunConfig: "exclude_domains": self.exclude_domains, "verbose": self.verbose, "log_console": self.log_console, + "stream": self.stream, "url": self.url, } + + def clone(self, **kwargs): + """Create a copy of this configuration with updated values. + + Args: + **kwargs: Key-value pairs of configuration options to update + + Returns: + CrawlerRunConfig: A new instance with the specified updates + + Example: + ```python + # Create a new config with streaming enabled + stream_config = config.clone(stream=True) + + # Create a new config with multiple updates + new_config = config.clone( + stream=True, + cache_mode=CacheMode.BYPASS, + verbose=True + ) + ``` + """ + config_dict = self.to_dict() + config_dict.update(kwargs) + return CrawlerRunConfig.from_kwargs(config_dict) diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 99f6b9b8..61dc4a51 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -701,7 +701,6 @@ class AsyncWebCrawler: urls: List[str], config: Optional[CrawlerRunConfig] = None, dispatcher: Optional[BaseDispatcher] = None, - stream: bool = False, # Legacy parameters maintained for backwards compatibility word_count_threshold=MIN_WORD_THRESHOLD, extraction_strategy: ExtractionStrategy = None, @@ -723,7 +722,6 @@ class AsyncWebCrawler: urls: List of URLs to crawl config: Configuration object controlling crawl behavior for all URLs dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher - stream: If True, returns an AsyncGenerator yielding results as they complete [other parameters maintained for backwards compatibility] Returns: @@ -743,8 +741,7 @@ class AsyncWebCrawler: # Streaming results async for result in await crawler.arun_many( urls=["https://example1.com", "https://example2.com"], - config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS), - stream=True + config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=True), ): print(f"Processed {result.url}: {len(result.markdown)} chars") """ @@ -783,6 +780,8 @@ class AsyncWebCrawler: ) or task_result.result ) + stream = config.stream + if stream: async def result_transformer(): async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config): diff --git a/tests/20241401/test_stream.py b/tests/20241401/test_stream.py index 4baaa10a..5614eb72 100644 --- a/tests/20241401/test_stream.py +++ b/tests/20241401/test_stream.py @@ -33,9 +33,7 @@ async def test_crawler(): print("\n=== Testing Streaming Mode ===") async for result in await crawler.arun_many( urls=urls, - config=crawler_config, - stream=True, - verbose=True + config=crawler_config.clone(stream=True), ): print(f"Received result for: {result.url} - Success: {result.success}") @@ -43,8 +41,6 @@ async def test_crawler(): results = await crawler.arun_many( urls=urls, config=crawler_config, - stream=False, - verbose=True ) print(f"Received all {len(results)} results at once") for result in results: