diff --git a/crawl4ai/docker_client.py b/crawl4ai/docker_client.py index 5fbdbc8d..ad2342a0 100644 --- a/crawl4ai/docker_client.py +++ b/crawl4ai/docker_client.py @@ -113,8 +113,12 @@ class Crawl4aiDockerClient: self.logger.info(f"Crawling {len(urls)} URLs {'(streaming)' if is_streaming else ''}", tag="CRAWL") if is_streaming: - # Create and return the async generator directly - return self._stream_crawl_results(data) + # For streaming, we need to return the async generator properly + # The caller should be able to do: async for result in await client.crawl(...) + async def streaming_wrapper(): + async for result in self._stream_crawl_results(data): + yield result + return streaming_wrapper() response = await self._request("POST", "/crawl", json=data) result_data = response.json() @@ -131,17 +135,27 @@ class Crawl4aiDockerClient: response.raise_for_status() async for line in response.aiter_lines(): if line.strip(): - result = json.loads(line) - if "error" in result: - self.logger.error_status(url=result.get("url", "unknown"), error=result["error"]) + try: + result = json.loads(line) + if "error" in result: + self.logger.error_status(url=result.get("url", "unknown"), error=result["error"]) + continue + + # Check if this is a crawl result (has required fields) + if "url" in result and "success" in result: + self.logger.url_status(url=result.get("url", "unknown"), success=result.get("success", False), timing=result.get("timing", 0.0)) + + # Create CrawlResult object properly + crawl_result = CrawlResult(**result) + yield crawl_result + # Skip status-only messages + elif result.get("status") == "completed": + continue + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse streaming response: {e}", tag="STREAM") continue - - # Check if this is a crawl result (has required fields) - if "url" in result and "success" in result: - self.logger.url_status(url=result.get("url", "unknown"), success=result.get("success", False), timing=result.get("timing", 0.0)) - yield CrawlResult(**result) - # Skip status-only messages - elif result.get("status") == "completed": + except Exception as e: + self.logger.error(f"Error processing streaming result: {e}", tag="STREAM") continue async def get_schema(self) -> Dict[str, Any]: diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 8cad45f0..80553032 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -387,6 +387,7 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) """Stream results with heartbeats and completion markers.""" import orjson from datetime import datetime + import inspect def orjson_default(obj): # Handle datetime (if not already handled by orjson) @@ -399,23 +400,43 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) return str(obj) try: - async for result in results_gen: - try: - server_memory_mb = _get_memory_mb() - # Use ORJSON serialization to handle property objects properly - result_json = result.model_dump_json() - result_dict = orjson.loads(result_json) - result_dict['server_memory_mb'] = server_memory_mb - # If PDF exists, encode it to base64 - if result_dict.get('pdf') is not None: - result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8') - logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") - data = orjson.dumps(result_dict, default=orjson_default).decode('utf-8') + "\n" - yield data.encode('utf-8') - except Exception as e: - logger.error(f"Serialization error: {e}") - error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')} - yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8') + logger.info(f"Starting streaming with results_gen type: {type(results_gen)}") + logger.info(f"Is results_gen async generator: {inspect.isasyncgen(results_gen)}") + + # Check if results_gen is actually an async generator vs another type + if inspect.isasyncgen(results_gen): + logger.info("Processing as async generator") + async for result in results_gen: + try: + logger.info(f"Processing streaming result of type: {type(result)}") + + # Check if this result is actually a CrawlResult + if hasattr(result, 'model_dump_json'): + server_memory_mb = _get_memory_mb() + result_json = result.model_dump_json() + result_dict = orjson.loads(result_json) + result_dict['server_memory_mb'] = server_memory_mb + + if result_dict.get('pdf') is not None: + result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8') + + logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") + data = orjson.dumps(result_dict, default=orjson_default).decode('utf-8') + "\n" + yield data.encode('utf-8') + else: + logger.error(f"Result doesn't have model_dump_json method: {type(result)}") + error_response = {"error": f"Invalid result type: {type(result)}", "url": "unknown"} + yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8') + + except Exception as e: + logger.error(f"Serialization error: {e}") + logger.error(f"Result type was: {type(result)}") + error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')} + yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8') + else: + logger.error(f"results_gen is not an async generator: {type(results_gen)}") + error_response = {"error": f"Invalid results_gen type: {type(results_gen)}"} + yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8') yield orjson.dumps({"status": "completed"}).decode('utf-8').encode('utf-8') @@ -574,10 +595,28 @@ async def handle_stream_crawl_request( async def single_result_generator(): # Handle CrawlResultContainer - extract the actual results - if hasattr(single_result_container, '__iter__'): - # It's a CrawlResultContainer with multiple results (e.g., from deep crawl) - for result in single_result_container: + if hasattr(single_result_container, '_results'): + # It's a CrawlResultContainer - iterate over the internal results + for result in single_result_container._results: + # Check if the result is an async generator (from deep crawl) + if hasattr(result, '__aiter__'): + async for sub_result in result: + yield sub_result + else: + yield result + elif hasattr(single_result_container, '__aiter__'): + # It's an async generator (from streaming deep crawl) + async for result in single_result_container: yield result + elif hasattr(single_result_container, '__iter__') and not hasattr(single_result_container, 'url'): + # It's iterable but not a CrawlResult itself + for result in single_result_container: + # Check if each result is an async generator + if hasattr(result, '__aiter__'): + async for sub_result in result: + yield sub_result + else: + yield result else: # It's a single CrawlResult yield single_result_container diff --git a/tests/test_comprehensive_fixes.py b/tests/test_comprehensive_fixes.py index aa66e647..42339d21 100644 --- a/tests/test_comprehensive_fixes.py +++ b/tests/test_comprehensive_fixes.py @@ -400,6 +400,483 @@ class TestDockerClientFunctionality: test_result.finish(False, f"Request preparation failed: {str(e)}") +class TestSDKCrawling: + """Test SDK (AsyncWebCrawler) crawling in both streaming and non-streaming modes.""" + + def test_sdk_simple_non_streaming(self, test_runner: ComprehensiveTestRunner): + """Test SDK simple crawling without streaming.""" + test_result = test_runner.add_test("SDK Simple Non-Streaming") + test_result.start() + + try: + from crawl4ai import AsyncWebCrawler, CacheMode + + # Simple configuration without deep crawl + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + stream=False, + word_count_threshold=50 + ) + + browser_config = BrowserConfig(headless=True) + + # Test configuration serialization (server would do this) + config_data = crawler_config.dump() + loaded_config = CrawlerRunConfig.load(config_data) + + assert loaded_config.stream is False + assert loaded_config.word_count_threshold == 50 + + test_result.finish(True, "SDK simple non-streaming configuration working") + + except Exception as e: + test_result.finish(False, f"SDK simple non-streaming failed: {str(e)}") + + def test_sdk_simple_streaming(self, test_runner: ComprehensiveTestRunner): + """Test SDK simple crawling with streaming.""" + test_result = test_runner.add_test("SDK Simple Streaming") + test_result.start() + + try: + from crawl4ai import AsyncWebCrawler, CacheMode + + # Simple configuration with streaming + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + stream=True, + word_count_threshold=50 + ) + + browser_config = BrowserConfig(headless=True) + + # Test configuration serialization + config_data = crawler_config.dump() + loaded_config = CrawlerRunConfig.load(config_data) + + assert loaded_config.stream is True + assert loaded_config.word_count_threshold == 50 + + test_result.finish(True, "SDK simple streaming configuration working") + + except Exception as e: + test_result.finish(False, f"SDK simple streaming failed: {str(e)}") + + def test_sdk_complex_non_streaming(self, test_runner: ComprehensiveTestRunner): + """Test SDK complex crawling (with deep crawl) without streaming.""" + test_result = test_runner.add_test("SDK Complex Non-Streaming") + test_result.start() + + try: + from crawl4ai import AsyncWebCrawler, CacheMode + + # Complex configuration with deep crawl strategy + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode=CacheMode.BYPASS, + stream=False, + word_count_threshold=100 + ) + + # Test configuration serialization/deserialization + config_data = crawler_config.dump() + loaded_config = CrawlerRunConfig.load(config_data) + + assert hasattr(loaded_config.deep_crawl_strategy, 'arun') + assert loaded_config.stream is False + assert loaded_config.deep_crawl_strategy.max_depth == 2 + assert loaded_config.deep_crawl_strategy.max_pages == 3 + + test_result.finish(True, "SDK complex non-streaming with deep crawl working") + + except Exception as e: + test_result.finish(False, f"SDK complex non-streaming failed: {str(e)}") + + def test_sdk_complex_streaming(self, test_runner: ComprehensiveTestRunner): + """Test SDK complex crawling (with deep crawl) with streaming.""" + test_result = test_runner.add_test("SDK Complex Streaming") + test_result.start() + + try: + from crawl4ai import AsyncWebCrawler, CacheMode + + # Complex configuration with deep crawl strategy and streaming + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode=CacheMode.BYPASS, + stream=True, + word_count_threshold=100 + ) + + # Test configuration serialization/deserialization + config_data = crawler_config.dump() + loaded_config = CrawlerRunConfig.load(config_data) + + assert hasattr(loaded_config.deep_crawl_strategy, 'arun') + assert loaded_config.stream is True + assert loaded_config.deep_crawl_strategy.max_depth == 2 + assert loaded_config.deep_crawl_strategy.max_pages == 3 + + test_result.finish(True, "SDK complex streaming with deep crawl working") + + except Exception as e: + test_result.finish(False, f"SDK complex streaming failed: {str(e)}") + + +class TestDirectAPICrawling: + """Test Direct API crawling via HTTP requests.""" + + def test_direct_api_simple_non_streaming_preparation(self, test_runner: ComprehensiveTestRunner): + """Test Direct API simple non-streaming request preparation.""" + test_result = test_runner.add_test("Direct API Simple Non-Streaming Prep") + test_result.start() + + try: + import json + + browser_config = BrowserConfig(headless=True) + crawler_config = CrawlerRunConfig( + cache_mode="bypass", # Use string for API + stream=False, + word_count_threshold=50 + ) + + # Prepare request payload like client would + payload = { + "urls": ["https://example.com"], + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Test JSON serialization (what HTTP client would do) + json_payload = json.dumps(payload, default=str) + assert isinstance(json_payload, str) + + # Test deserialization (what server would do) + loaded_payload = json.loads(json_payload) + loaded_crawler = CrawlerRunConfig.load(loaded_payload["crawler_config"]) + + assert loaded_crawler.stream is False + assert loaded_crawler.word_count_threshold == 50 + + test_result.finish(True, "Direct API simple non-streaming prep working") + + except Exception as e: + test_result.finish(False, f"Direct API simple non-streaming prep failed: {str(e)}") + + def test_direct_api_simple_streaming_preparation(self, test_runner: ComprehensiveTestRunner): + """Test Direct API simple streaming request preparation.""" + test_result = test_runner.add_test("Direct API Simple Streaming Prep") + test_result.start() + + try: + import json + + browser_config = BrowserConfig(headless=True) + crawler_config = CrawlerRunConfig( + cache_mode="bypass", + stream=True, + word_count_threshold=50 + ) + + # Prepare request payload + payload = { + "urls": ["https://example.com"], + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Test JSON serialization + json_payload = json.dumps(payload, default=str) + assert isinstance(json_payload, str) + + # Test deserialization + loaded_payload = json.loads(json_payload) + loaded_crawler = CrawlerRunConfig.load(loaded_payload["crawler_config"]) + + assert loaded_crawler.stream is True + assert loaded_crawler.word_count_threshold == 50 + + test_result.finish(True, "Direct API simple streaming prep working") + + except Exception as e: + test_result.finish(False, f"Direct API simple streaming prep failed: {str(e)}") + + def test_direct_api_complex_non_streaming_preparation(self, test_runner: ComprehensiveTestRunner): + """Test Direct API complex non-streaming (with deep crawl) request preparation.""" + test_result = test_runner.add_test("Direct API Complex Non-Streaming Prep") + test_result.start() + + try: + import json + + browser_config = BrowserConfig(headless=True) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode="bypass", + stream=False, + word_count_threshold=100 + ) + + # Prepare request payload + payload = { + "urls": ["https://example.com"], + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Test JSON serialization + json_payload = json.dumps(payload, default=str) + assert isinstance(json_payload, str) + + # Test deserialization (critical for deep crawl strategy) + loaded_payload = json.loads(json_payload) + loaded_crawler = CrawlerRunConfig.load(loaded_payload["crawler_config"]) + + assert hasattr(loaded_crawler.deep_crawl_strategy, 'arun') + assert loaded_crawler.stream is False + assert loaded_crawler.deep_crawl_strategy.max_depth == 2 + + test_result.finish(True, "Direct API complex non-streaming prep working") + + except Exception as e: + test_result.finish(False, f"Direct API complex non-streaming prep failed: {str(e)}") + + def test_direct_api_complex_streaming_preparation(self, test_runner: ComprehensiveTestRunner): + """Test Direct API complex streaming (with deep crawl) request preparation.""" + test_result = test_runner.add_test("Direct API Complex Streaming Prep") + test_result.start() + + try: + import json + + browser_config = BrowserConfig(headless=True) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode="bypass", + stream=True, + word_count_threshold=100 + ) + + # Prepare request payload + payload = { + "urls": ["https://example.com"], + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Test JSON serialization + json_payload = json.dumps(payload, default=str) + assert isinstance(json_payload, str) + + # Test deserialization (critical for streaming deep crawl) + loaded_payload = json.loads(json_payload) + loaded_crawler = CrawlerRunConfig.load(loaded_payload["crawler_config"]) + + assert hasattr(loaded_crawler.deep_crawl_strategy, 'arun') + assert loaded_crawler.stream is True + assert loaded_crawler.deep_crawl_strategy.max_depth == 2 + + test_result.finish(True, "Direct API complex streaming prep working") + + except Exception as e: + test_result.finish(False, f"Direct API complex streaming prep failed: {str(e)}") + + +class TestDockerClientCrawling: + """Test Crawl4aiDockerClient crawling functionality.""" + + def test_docker_client_simple_non_streaming(self, test_runner: ComprehensiveTestRunner): + """Test Docker client simple non-streaming crawling preparation.""" + test_result = test_runner.add_test("Docker Client Simple Non-Streaming") + test_result.start() + + try: + client = Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=False) + + browser_config = BrowserConfig(headless=True) + crawler_config = CrawlerRunConfig( + cache_mode="bypass", + stream=False, + word_count_threshold=50 + ) + + # Test request preparation (what client does internally) + request_data = client._prepare_request( + urls=["https://example.com"], + browser_config=browser_config, + crawler_config=crawler_config + ) + + assert "urls" in request_data + assert "browser_config" in request_data + assert "crawler_config" in request_data + assert request_data["urls"] == ["https://example.com"] + + # Test that config can be deserialized on server side + loaded_crawler = CrawlerRunConfig.load(request_data["crawler_config"]) + assert loaded_crawler.stream is False + assert loaded_crawler.word_count_threshold == 50 + + test_result.finish(True, "Docker client simple non-streaming prep working") + + except Exception as e: + test_result.finish(False, f"Docker client simple non-streaming failed: {str(e)}") + + def test_docker_client_simple_streaming(self, test_runner: ComprehensiveTestRunner): + """Test Docker client simple streaming crawling preparation.""" + test_result = test_runner.add_test("Docker Client Simple Streaming") + test_result.start() + + try: + client = Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=False) + + browser_config = BrowserConfig(headless=True) + crawler_config = CrawlerRunConfig( + cache_mode="bypass", + stream=True, + word_count_threshold=50 + ) + + # Test request preparation + request_data = client._prepare_request( + urls=["https://example.com"], + browser_config=browser_config, + crawler_config=crawler_config + ) + + assert "urls" in request_data + assert "browser_config" in request_data + assert "crawler_config" in request_data + + # Test server-side deserialization + loaded_crawler = CrawlerRunConfig.load(request_data["crawler_config"]) + assert loaded_crawler.stream is True + assert loaded_crawler.word_count_threshold == 50 + + test_result.finish(True, "Docker client simple streaming prep working") + + except Exception as e: + test_result.finish(False, f"Docker client simple streaming failed: {str(e)}") + + def test_docker_client_complex_non_streaming(self, test_runner: ComprehensiveTestRunner): + """Test Docker client complex non-streaming (with deep crawl) crawling preparation.""" + test_result = test_runner.add_test("Docker Client Complex Non-Streaming") + test_result.start() + + try: + client = Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=False) + + browser_config = BrowserConfig(headless=True) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode="bypass", + stream=False, + word_count_threshold=100 + ) + + # Test request preparation + request_data = client._prepare_request( + urls=["https://example.com"], + browser_config=browser_config, + crawler_config=crawler_config + ) + + assert "urls" in request_data + assert "browser_config" in request_data + assert "crawler_config" in request_data + + # Critical test: deep crawl strategy deserialization + loaded_crawler = CrawlerRunConfig.load(request_data["crawler_config"]) + assert hasattr(loaded_crawler.deep_crawl_strategy, 'arun') + assert loaded_crawler.stream is False + assert loaded_crawler.deep_crawl_strategy.max_depth == 2 + assert loaded_crawler.deep_crawl_strategy.max_pages == 3 + + test_result.finish(True, "Docker client complex non-streaming with deep crawl working") + + except Exception as e: + test_result.finish(False, f"Docker client complex non-streaming failed: {str(e)}") + + def test_docker_client_complex_streaming(self, test_runner: ComprehensiveTestRunner): + """Test Docker client complex streaming (with deep crawl) crawling preparation.""" + test_result = test_runner.add_test("Docker Client Complex Streaming") + test_result.start() + + try: + client = Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=False) + + browser_config = BrowserConfig(headless=True) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + cache_mode="bypass", + stream=True, + word_count_threshold=100 + ) + + # Test request preparation + request_data = client._prepare_request( + urls=["https://example.com"], + browser_config=browser_config, + crawler_config=crawler_config + ) + + assert "urls" in request_data + assert "browser_config" in request_data + assert "crawler_config" in request_data + + # Critical test: streaming deep crawl strategy deserialization + loaded_crawler = CrawlerRunConfig.load(request_data["crawler_config"]) + assert hasattr(loaded_crawler.deep_crawl_strategy, 'arun') + assert loaded_crawler.stream is True + assert loaded_crawler.deep_crawl_strategy.max_depth == 2 + assert loaded_crawler.deep_crawl_strategy.max_pages == 3 + + test_result.finish(True, "Docker client complex streaming with deep crawl working") + + except Exception as e: + test_result.finish(False, f"Docker client complex streaming failed: {str(e)}") + + class ComprehensiveTestSuite(unittest.TestCase): """Main test suite class.""" @@ -434,6 +911,9 @@ class ComprehensiveTestSuite(unittest.TestCase): strategy_tests = TestDeepCrawlStrategySerialization() config_tests = TestCrawlerConfigSerialization() docker_tests = TestDockerClientFunctionality() + sdk_tests = TestSDKCrawling() + api_tests = TestDirectAPICrawling() + client_tests = TestDockerClientCrawling() test_methods = [ # ORJSON Tests @@ -452,9 +932,27 @@ class ComprehensiveTestSuite(unittest.TestCase): # Config Tests (config_tests.test_config_with_strategy_serialization, "Config Serialization"), - # Docker Client Tests + # Basic Docker Client Tests (docker_tests.test_docker_client_initialization, "Docker Init"), (docker_tests.test_docker_client_request_preparation, "Docker Requests"), + + # SDK Crawling Tests (Simple & Complex, Streaming & Non-Streaming) + (sdk_tests.test_sdk_simple_non_streaming, "SDK Simple Non-Stream"), + (sdk_tests.test_sdk_simple_streaming, "SDK Simple Stream"), + (sdk_tests.test_sdk_complex_non_streaming, "SDK Complex Non-Stream"), + (sdk_tests.test_sdk_complex_streaming, "SDK Complex Stream"), + + # Direct API Tests (Simple & Complex, Streaming & Non-Streaming) + (api_tests.test_direct_api_simple_non_streaming_preparation, "API Simple Non-Stream"), + (api_tests.test_direct_api_simple_streaming_preparation, "API Simple Stream"), + (api_tests.test_direct_api_complex_non_streaming_preparation, "API Complex Non-Stream"), + (api_tests.test_direct_api_complex_streaming_preparation, "API Complex Stream"), + + # Docker Client Crawling Tests (Simple & Complex, Streaming & Non-Streaming) + (client_tests.test_docker_client_simple_non_streaming, "Client Simple Non-Stream"), + (client_tests.test_docker_client_simple_streaming, "Client Simple Stream"), + (client_tests.test_docker_client_complex_non_streaming, "Client Complex Non-Stream"), + (client_tests.test_docker_client_complex_streaming, "Client Complex Stream"), ] total_tests = len(test_methods) @@ -485,6 +983,9 @@ class ComprehensiveTestSuite(unittest.TestCase): if success: console.print("\nšŸŽ‰ All tests completed successfully!", style="bold green") console.print("āœ… Deep crawl streaming functionality is fully operational", style="green") + console.print("āœ… All crawling patterns (SDK, Direct API, Docker Client) validated", style="green") + console.print("āœ… Both simple and complex crawling scenarios tested", style="green") + console.print("āœ… Streaming and non-streaming modes validated", style="green") else: console.print("\nāš ļø Some tests failed - review results above", style="bold yellow") @@ -527,6 +1028,60 @@ class ComprehensiveTestSuite(unittest.TestCase): self.assertTrue(hasattr(loaded_crawler.deep_crawl_strategy, 'arun')) self.assertTrue(loaded_crawler.stream) self.assertTrue(loaded_browser.headless) + + def test_server_method_selection_logic(self): + """Test the critical server-side method selection logic (arun vs arun_many).""" + + browser_config = BrowserConfig(headless=True) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=3 + ) + + crawler_config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + stream=True, + word_count_threshold=100 + ) + + # Test single URL scenario (should use arun) + single_url_payload = { + "urls": ["https://example.com"], # Single URL + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Simulate server-side deserialization + loaded_crawler = CrawlerRunConfig.load(single_url_payload["crawler_config"]) + + # For single URL, server should use arun method + # This returns CrawlResultContainer which needs proper handling + self.assertEqual(len(single_url_payload["urls"]), 1, "Single URL test case") + self.assertTrue(hasattr(loaded_crawler.deep_crawl_strategy, 'arun'), "Strategy must have arun method") + + # Test multiple URL scenario (should use arun_many) + multiple_url_payload = { + "urls": ["https://example.com", "https://example.org"], # Multiple URLs + "browser_config": browser_config.dump(), + "crawler_config": crawler_config.dump() + } + + # Simulate server-side deserialization + loaded_crawler_multi = CrawlerRunConfig.load(multiple_url_payload["crawler_config"]) + + # For multiple URLs, server should use arun_many method + self.assertEqual(len(multiple_url_payload["urls"]), 2, "Multiple URL test case") + self.assertTrue(hasattr(loaded_crawler_multi.deep_crawl_strategy, 'arun'), "Strategy must have arun method for arun_many") + + # Test streaming configuration consistency + self.assertTrue(loaded_crawler.stream, "Single URL config must preserve streaming") + self.assertTrue(loaded_crawler_multi.stream, "Multiple URL config must preserve streaming") + + # Test deep crawl strategy consistency + self.assertEqual(loaded_crawler.deep_crawl_strategy.max_depth, 2) + self.assertEqual(loaded_crawler_multi.deep_crawl_strategy.max_depth, 2) if __name__ == "__main__":