From 72d8e679ad6399bf35bef2b6e800f858b587813e Mon Sep 17 00:00:00 2001 From: UncleCode Date: Mon, 7 Apr 2025 22:50:44 +0800 Subject: [PATCH] feat(pipeline): add high-level Crawler utility class for simplified web crawling Add new Crawler class that provides a simplified interface for both single and batch URL crawling operations. Key features include: - Simple single URL crawling with configurable options - Parallel batch crawling with concurrency control - Shared browser hub support for resource efficiency - Progress tracking and custom retry strategies - Comprehensive error handling and retry logic Remove demo and extended test files in favor of new focused test suite. --- crawl4ai/__init__.py | 13 + crawl4ai/async_configs.py | 2 +- crawl4ai/async_crawler_strategy.py | 2 +- crawl4ai/browser/__init__.py | 3 +- crawl4ai/extraction_strategy.py | 2 +- crawl4ai/models.py | 12 +- crawl4ai/pipeline/__init__.py | 6 + crawl4ai/pipeline/crawler.py | 406 ++++++++++++++ .../pipeline/demo_browser_hub_pipeline.py | 222 -------- .../pipeline/extended_browser_hub_tests.py | 505 ----------------- crawl4ai/pipeline/pipeline.py | 22 +- tests/pipeline/test_batch_crawl.py | 514 +++++------------- tests/pipeline/test_crawler.py | 447 +++++++++++++++ 13 files changed, 1039 insertions(+), 1117 deletions(-) create mode 100644 crawl4ai/pipeline/__init__.py create mode 100644 crawl4ai/pipeline/crawler.py delete mode 100644 crawl4ai/pipeline/demo_browser_hub_pipeline.py delete mode 100644 crawl4ai/pipeline/extended_browser_hub_tests.py create mode 100644 tests/pipeline/test_crawler.py diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index 0ab808f3..31da045f 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -4,6 +4,12 @@ import warnings from .async_webcrawler import AsyncWebCrawler, CacheMode from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig +from .pipeline.pipeline import ( + Pipeline, + create_pipeline, +) +from .pipeline.crawler import Crawler + from .content_scraping_strategy import ( ContentScrapingStrategy, WebScrapingStrategy, @@ -65,7 +71,14 @@ from .deep_crawling import ( DeepCrawlDecorator, ) +from .async_crawler_strategy import AsyncPlaywrightCrawlerStrategy, AsyncHTTPCrawlerStrategy + __all__ = [ + "Pipeline", + "AsyncPlaywrightCrawlerStrategy", + "AsyncHTTPCrawlerStrategy", + "create_pipeline", + "Crawler", "AsyncLoggerBase", "AsyncLogger", "AsyncWebCrawler", diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 2f0efe90..7cd40292 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -270,7 +270,7 @@ class BrowserConfig: host: str = "localhost", ): self.browser_type = browser_type - self.headless = headless or True + self.headless = headless self.browser_mode = browser_mode self.use_managed_browser = use_managed_browser self.cdp_url = cdp_url diff --git a/crawl4ai/async_crawler_strategy.py b/crawl4ai/async_crawler_strategy.py index 89b4df84..630dfd95 100644 --- a/crawl4ai/async_crawler_strategy.py +++ b/crawl4ai/async_crawler_strategy.py @@ -625,7 +625,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): except Error: visibility_info = await self.check_visibility(page) - if self.config.verbose: + if config.verbose: self.logger.debug( message="Body visibility info: {info}", tag="DEBUG", diff --git a/crawl4ai/browser/__init__.py b/crawl4ai/browser/__init__.py index af4d74c7..a57cdb0b 100644 --- a/crawl4ai/browser/__init__.py +++ b/crawl4ai/browser/__init__.py @@ -9,6 +9,7 @@ from .profiles import BrowserProfileManager from .models import DockerConfig from .docker_registry import DockerRegistry from .docker_utils import DockerUtils +from .browser_hub import BrowserHub from .strategies import ( BaseBrowserStrategy, PlaywrightBrowserStrategy, @@ -19,4 +20,4 @@ from .strategies import ( __all__ = ['BrowserManager', 'BrowserProfileManager', 'DockerConfig', 'DockerRegistry', 'DockerUtils', 'BaseBrowserStrategy', 'PlaywrightBrowserStrategy', 'CDPBrowserStrategy', 'BuiltinBrowserStrategy', - 'DockerBrowserStrategy'] \ No newline at end of file + 'DockerBrowserStrategy', 'BrowserHub'] \ No newline at end of file diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index bf4825cc..a144b599 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -672,7 +672,7 @@ class LLMExtractionStrategy(ExtractionStrategy): block["error"] = False except Exception: parsed, unparsed = split_and_parse_json_objects( - response.choices[0].message.content + response ) blocks = parsed if unparsed: diff --git a/crawl4ai/models.py b/crawl4ai/models.py index aad14a1d..8f4fdf16 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, HttpUrl, PrivateAttr +from pydantic import BaseModel, HttpUrl, PrivateAttr, ConfigDict from typing import List, Dict, Optional, Callable, Awaitable, Union, Any from typing import AsyncGenerator from typing import Generic, TypeVar @@ -146,8 +146,9 @@ class CrawlResult(BaseModel): dispatch_result: Optional[DispatchResult] = None redirected_url: Optional[str] = None - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict(arbitrary_types_allowed=True) + # class Config: + # arbitrary_types_allowed = True # NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters, # and model_dump override all exist to support a smooth transition from markdown as a string @@ -312,8 +313,9 @@ class AsyncCrawlResponse(BaseModel): ssl_certificate: Optional[SSLCertificate] = None redirected_url: Optional[str] = None - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict(arbitrary_types_allowed=True) + # class Config: + # arbitrary_types_allowed = True ############################### # Scraping Models diff --git a/crawl4ai/pipeline/__init__.py b/crawl4ai/pipeline/__init__.py new file mode 100644 index 00000000..a65169fe --- /dev/null +++ b/crawl4ai/pipeline/__init__.py @@ -0,0 +1,6 @@ +"""Pipeline module providing high-level crawling functionality.""" + +from .pipeline import Pipeline, create_pipeline +from .crawler import Crawler + +__all__ = ["Pipeline", "create_pipeline", "Crawler"] \ No newline at end of file diff --git a/crawl4ai/pipeline/crawler.py b/crawl4ai/pipeline/crawler.py new file mode 100644 index 00000000..bd7f42bf --- /dev/null +++ b/crawl4ai/pipeline/crawler.py @@ -0,0 +1,406 @@ +"""Crawler utility class for simplified crawling operations. + +This module provides a high-level utility class for crawling web pages +with support for both single and multiple URL processing. +""" + +import asyncio +from typing import Dict, List, Optional, Tuple, Union, Callable + +from crawl4ai.models import CrawlResultContainer, CrawlResult +from crawl4ai.pipeline.pipeline import create_pipeline +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.browser.browser_hub import BrowserHub + +# Type definitions +UrlList = List[str] +UrlBatch = Tuple[List[str], CrawlerRunConfig] +UrlFullBatch = Tuple[List[str], BrowserConfig, CrawlerRunConfig] +BatchType = Union[UrlList, UrlBatch, UrlFullBatch] +ProgressCallback = Callable[[str, str, Optional[CrawlResultContainer]], None] +RetryStrategy = Callable[[str, int, Exception], Tuple[bool, float]] + +class Crawler: + """High-level utility class for crawling web pages. + + This class provides simplified methods for crawling both single URLs + and batches of URLs, with parallel processing capabilities. + """ + + @classmethod + async def crawl( + cls, + urls: Union[str, List[str]], + browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None, + browser_hub: Optional[BrowserHub] = None, + logger: Optional[AsyncLogger] = None, + max_retries: int = 0, + retry_delay: float = 1.0, + use_new_loop: bool = True # By default use a new loop for safety + ) -> Union[CrawlResultContainer, Dict[str, CrawlResultContainer]]: + """Crawl one or more URLs with the specified configurations. + + Args: + urls: Single URL or list of URLs to crawl + browser_config: Optional browser configuration + crawler_config: Optional crawler run configuration + browser_hub: Optional shared browser hub + logger: Optional logger instance + max_retries: Maximum number of retries for failed requests + retry_delay: Delay between retries in seconds + + Returns: + For a single URL: CrawlResultContainer with crawl results + For multiple URLs: Dict mapping URLs to their CrawlResultContainer results + """ + # Handle single URL case + if isinstance(urls, str): + return await cls._crawl_single_url( + urls, + browser_config, + crawler_config, + browser_hub, + logger, + max_retries, + retry_delay, + use_new_loop + ) + + # Handle multiple URLs case (sequential processing) + results = {} + for url in urls: + results[url] = await cls._crawl_single_url( + url, + browser_config, + crawler_config, + browser_hub, + logger, + max_retries, + retry_delay, + use_new_loop + ) + + return results + + @classmethod + async def _crawl_single_url( + cls, + url: str, + browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None, + browser_hub: Optional[BrowserHub] = None, + logger: Optional[AsyncLogger] = None, + max_retries: int = 0, + retry_delay: float = 1.0, + use_new_loop: bool = False + ) -> CrawlResultContainer: + """Internal method to crawl a single URL with retry logic.""" + # Create a logger if none provided + if logger is None: + logger = AsyncLogger(verbose=True) + + # Create or use the provided crawler config + if crawler_config is None: + crawler_config = CrawlerRunConfig() + + attempts = 0 + last_error = None + + # For testing purposes, each crawler gets a new event loop to avoid conflicts + # This is especially important in test suites where multiple tests run in sequence + if use_new_loop: + old_loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + while attempts <= max_retries: + try: + # Create a pipeline + pipeline_args = {} + if browser_config: + pipeline_args["browser_config"] = browser_config + if browser_hub: + pipeline_args["browser_hub"] = browser_hub + if logger: + pipeline_args["logger"] = logger + + pipeline = await create_pipeline(**pipeline_args) + + # Perform the crawl + result = await pipeline.crawl(url=url, config=crawler_config) + + # Close the pipeline if we created it (not using a shared hub) + if not browser_hub: + await pipeline.close() + + # Restore the original event loop if we created a new one + if use_new_loop: + asyncio.set_event_loop(old_loop) + loop.close() + + return result + + except Exception as e: + last_error = e + attempts += 1 + + if attempts <= max_retries: + logger.warning( + message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...", + tag="RETRY", + params={ + "attempt": attempts, + "url": url, + "error": str(e), + "delay": retry_delay + } + ) + await asyncio.sleep(retry_delay) + else: + logger.error( + message="All {attempts} crawl attempts failed for {url}: {error}", + tag="FAILED", + params={ + "attempts": attempts, + "url": url, + "error": str(e) + } + ) + + # If we get here, all attempts failed + result = CrawlResultContainer( + CrawlResult( + url=url, + html="", + success=False, + error_message=f"All {attempts} crawl attempts failed: {str(last_error)}" + ) + ) + + # Restore the original event loop if we created a new one + if use_new_loop: + asyncio.set_event_loop(old_loop) + loop.close() + + return result + + @classmethod + async def parallel_crawl( + cls, + url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]], + browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None, + browser_hub: Optional[BrowserHub] = None, + logger: Optional[AsyncLogger] = None, + concurrency: int = 5, + max_retries: int = 0, + retry_delay: float = 1.0, + retry_strategy: Optional[RetryStrategy] = None, + progress_callback: Optional[ProgressCallback] = None, + use_new_loop: bool = True # By default use a new loop for safety + ) -> Dict[str, CrawlResultContainer]: + """Crawl multiple URLs in parallel with concurrency control. + + Args: + url_batches: List of URLs or list of URL batches with configurations + browser_config: Default browser configuration (used if not in batch) + crawler_config: Default crawler configuration (used if not in batch) + browser_hub: Optional shared browser hub for resource efficiency + logger: Optional logger instance + concurrency: Maximum number of concurrent crawls + max_retries: Maximum number of retries for failed requests + retry_delay: Delay between retries in seconds + retry_strategy: Optional custom retry strategy function + progress_callback: Optional callback for progress reporting + + Returns: + Dict mapping URLs to their CrawlResultContainer results + """ + # Create a logger if none provided + if logger is None: + logger = AsyncLogger(verbose=True) + + # For testing purposes, each crawler gets a new event loop to avoid conflicts + # This is especially important in test suites where multiple tests run in sequence + if use_new_loop: + old_loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Process batches to consistent format + processed_batches = cls._process_url_batches( + url_batches, browser_config, crawler_config + ) + + # Initialize results dictionary + results = {} + + # Create semaphore for concurrency control + semaphore = asyncio.Semaphore(concurrency) + + # Create shared browser hub if not provided + shared_hub = browser_hub + if not shared_hub: + shared_hub = await BrowserHub.get_browser_manager( + config=browser_config or BrowserConfig(), + logger=logger, + max_browsers_per_config=concurrency, + max_pages_per_browser=1, + initial_pool_size=min(concurrency, 3) # Start with a reasonable number + ) + + try: + # Create worker function for each URL + async def process_url(url, b_config, c_config): + async with semaphore: + # Report start if callback provided + if progress_callback: + await progress_callback("started", url) + + attempts = 0 + last_error = None + + while attempts <= max_retries: + try: + # Create a pipeline using the shared hub + pipeline = await create_pipeline( + browser_config=b_config, + browser_hub=shared_hub, + logger=logger + ) + + # Perform the crawl + result = await pipeline.crawl(url=url, config=c_config) + + # Report completion if callback provided + if progress_callback: + await progress_callback("completed", url, result) + + return url, result + + except Exception as e: + last_error = e + attempts += 1 + + # Determine if we should retry and with what delay + should_retry = attempts <= max_retries + delay = retry_delay + + # Use custom retry strategy if provided + if retry_strategy and should_retry: + try: + should_retry, delay = await retry_strategy(url, attempts, e) + except Exception as strategy_error: + logger.error( + message="Error in retry strategy: {error}", + tag="RETRY", + params={"error": str(strategy_error)} + ) + + if should_retry: + logger.warning( + message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...", + tag="RETRY", + params={ + "attempt": attempts, + "url": url, + "error": str(e), + "delay": delay + } + ) + await asyncio.sleep(delay) + else: + logger.error( + message="All {attempts} crawl attempts failed for {url}: {error}", + tag="FAILED", + params={ + "attempts": attempts, + "url": url, + "error": str(e) + } + ) + break + + # If we get here, all attempts failed + error_result = CrawlResultContainer( + CrawlResult( + url=url, + html="", + success=False, + error_message=f"All {attempts} crawl attempts failed: {str(last_error)}" + ) + ) + + # Report completion with error if callback provided + if progress_callback: + await progress_callback("completed", url, error_result) + + return url, error_result + + # Create tasks for all URLs + tasks = [] + for urls, b_config, c_config in processed_batches: + for url in urls: + tasks.append(process_url(url, b_config, c_config)) + + # Run all tasks and collect results + for completed_task in asyncio.as_completed(tasks): + url, result = await completed_task + results[url] = result + + return results + + finally: + # Clean up the hub only if we created it + if not browser_hub and shared_hub: + await shared_hub.close() + + # Restore the original event loop if we created a new one + if use_new_loop: + asyncio.set_event_loop(old_loop) + loop.close() + + @classmethod + def _process_url_batches( + cls, + url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]], + default_browser_config: Optional[BrowserConfig], + default_crawler_config: Optional[CrawlerRunConfig] + ) -> List[Tuple[List[str], BrowserConfig, CrawlerRunConfig]]: + """Process URL batches into a consistent format. + + Converts various input formats into a consistent list of + (urls, browser_config, crawler_config) tuples. + """ + processed_batches = [] + + # Handle case where input is just a list of URLs + if all(isinstance(item, str) for item in url_batches): + urls = url_batches + browser_config = default_browser_config or BrowserConfig() + crawler_config = default_crawler_config or CrawlerRunConfig() + processed_batches.append((urls, browser_config, crawler_config)) + return processed_batches + + # Process each batch + for batch in url_batches: + # Handle case: (urls, crawler_config) + if len(batch) == 2 and isinstance(batch[1], CrawlerRunConfig): + urls, c_config = batch + b_config = default_browser_config or BrowserConfig() + processed_batches.append((urls, b_config, c_config)) + + # Handle case: (urls, browser_config, crawler_config) + elif len(batch) == 3 and isinstance(batch[1], BrowserConfig) and isinstance(batch[2], CrawlerRunConfig): + processed_batches.append(batch) + + # Fallback for unknown formats - assume it's just a list of URLs + else: + urls = batch + browser_config = default_browser_config or BrowserConfig() + crawler_config = default_crawler_config or CrawlerRunConfig() + processed_batches.append((urls, browser_config, crawler_config)) + + return processed_batches \ No newline at end of file diff --git a/crawl4ai/pipeline/demo_browser_hub_pipeline.py b/crawl4ai/pipeline/demo_browser_hub_pipeline.py deleted file mode 100644 index 32298f68..00000000 --- a/crawl4ai/pipeline/demo_browser_hub_pipeline.py +++ /dev/null @@ -1,222 +0,0 @@ -# demo_browser_hub.py - -import asyncio -from typing import List - -from crawl4ai.browser.browser_hub import BrowserHub -from pipeline import create_pipeline -from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig -from crawl4ai.async_logger import AsyncLogger -from crawl4ai.models import CrawlResultContainer -from crawl4ai.cache_context import CacheMode -from crawl4ai import DefaultMarkdownGenerator -from crawl4ai import PruningContentFilter - -async def create_prewarmed_browser_hub(urls_to_crawl: List[str]): - """Create a pre-warmed browser hub with 10 browsers and 5 pages each.""" - # Set up logging - logger = AsyncLogger(verbose=True) - logger.info("Setting up pre-warmed browser hub", tag="DEMO") - - # Create browser configuration - browser_config = BrowserConfig( - browser_type="chromium", - headless=True, # Set to False to see the browsers in action - viewport_width=1280, - viewport_height=800, - light_mode=True, # Optimize for performance - java_script_enabled=True - ) - - # Create crawler configurations for pre-warming with different user agents - # This allows pages to be ready for different scenarios - crawler_configs = [ - CrawlerRunConfig( - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", - wait_until="networkidle" - ), - # CrawlerRunConfig( - # user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15", - # wait_until="networkidle" - # ), - # CrawlerRunConfig( - # user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", - # wait_until="networkidle" - # ) - ] - - # Number of browsers and pages per browser - num_browsers = 1 - pages_per_browser = 1 - - # Distribute pages across configurations - # We'll create a total of 50 pages (10 browsers × 5 pages) - page_configs = [] - total_pages = num_browsers * pages_per_browser - pages_per_config = total_pages // len(crawler_configs) - - for i, config in enumerate(crawler_configs): - # For the last config, add any remaining pages - if i == len(crawler_configs) - 1: - remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1)) - page_configs.append((browser_config, config, remaining)) - else: - page_configs.append((browser_config, config, pages_per_config)) - - # Create browser hub with pre-warmed pages - start_time = asyncio.get_event_loop().time() - logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO") - - hub = await BrowserHub.get_browser_manager( - config=browser_config, - hub_id="demo_hub", - logger=logger, - max_browsers_per_config=num_browsers, - max_pages_per_browser=pages_per_browser, - initial_pool_size=num_browsers, - page_configs=page_configs - ) - - end_time = asyncio.get_event_loop().time() - logger.success( - message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds", - tag="DEMO", - params={ - "total_pages": total_pages, - "duration": end_time - start_time - } - ) - - # Get and display pool status - status = await hub.get_pool_status() - logger.info( - message="Browser pool status: {status}", - tag="DEMO", - params={"status": status} - ) - - return hub - -async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]: - """Crawl a list of URLs using a pre-warmed browser hub.""" - logger = AsyncLogger(verbose=True) - - # Create crawler configuration - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter( - threshold=0.48, - threshold_type="fixed", - min_word_threshold=0 - ) - ), - wait_until="networkidle", - screenshot=True - ) - - # Create pipeline with the browser hub - pipeline = await create_pipeline( - browser_hub=hub, - logger=logger - ) - - results = [] - - # Crawl all URLs in parallel - async def crawl_url(url): - logger.info(f"Crawling {url}...", tag="CRAWL") - result = await pipeline.crawl(url=url, config=crawler_config) - logger.success(f"Completed crawl of {url}", tag="CRAWL") - return result - - # Create tasks for all URLs - tasks = [crawl_url(url) for url in urls] - - # Execute all tasks in parallel and collect results - results = await asyncio.gather(*tasks) - - return results - -async def main(): - """Main demo function.""" - # List of URLs to crawl - urls_to_crawl = [ - "https://example.com", - # "https://www.python.org", - # "https://httpbin.org/html", - # "https://news.ycombinator.com", - # "https://github.com", - # "https://pypi.org", - # "https://docs.python.org/3/", - # "https://opensource.org", - # "https://whatismyipaddress.com", - # "https://en.wikipedia.org/wiki/Web_scraping" - ] - - # Set up logging - logger = AsyncLogger(verbose=True) - logger.info("Starting browser hub demo", tag="DEMO") - - try: - # Create pre-warmed browser hub - hub = await create_prewarmed_browser_hub(urls_to_crawl) - - # Use hub to crawl URLs - logger.info("Crawling URLs in parallel...", tag="DEMO") - start_time = asyncio.get_event_loop().time() - - results = await crawl_urls_with_hub(hub, urls_to_crawl) - - end_time = asyncio.get_event_loop().time() - - # Display results - logger.success( - message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)", - tag="DEMO", - params={ - "count": len(results), - "duration": end_time - start_time, - "avg": (end_time - start_time) / len(results) - } - ) - - # Print summary of results - logger.info("Crawl results summary:", tag="DEMO") - for i, result in enumerate(results): - logger.info( - message="{idx}. {url}: Success={success}, Content length={length}", - tag="RESULT", - params={ - "idx": i+1, - "url": result.url, - "success": result.success, - "length": len(result.html) if result.html else 0 - } - ) - - if result.success and result.markdown and result.markdown.raw_markdown: - # Print a snippet of the markdown - markdown_snippet = result.markdown.raw_markdown[:150] + "..." - logger.info( - message=" Markdown: {snippet}", - tag="RESULT", - params={"snippet": markdown_snippet} - ) - - # Display final browser pool status - status = await hub.get_pool_status() - logger.info( - message="Final browser pool status: {status}", - tag="DEMO", - params={"status": status} - ) - - finally: - # Clean up - logger.info("Shutting down browser hub...", tag="DEMO") - await BrowserHub.shutdown_all() - logger.success("Demo completed", tag="DEMO") - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/crawl4ai/pipeline/extended_browser_hub_tests.py b/crawl4ai/pipeline/extended_browser_hub_tests.py deleted file mode 100644 index 2f80d83b..00000000 --- a/crawl4ai/pipeline/extended_browser_hub_tests.py +++ /dev/null @@ -1,505 +0,0 @@ -# extended_browser_hub_tests.py - -import asyncio - -from crawl4ai.browser.browser_hub import BrowserHub -from pipeline import create_pipeline -from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig -from crawl4ai.async_logger import AsyncLogger -from crawl4ai.cache_context import CacheMode - -# Common test URLs -TEST_URLS = [ - "https://example.com", - "https://example.com/page1", - "https://httpbin.org/html", - "https://httpbin.org/headers", - "https://httpbin.org/ip", - "https://httpstat.us/200" -] - -class TestResults: - """Simple container for test results""" - def __init__(self, name: str): - self.name = name - self.results = [] - self.start_time = None - self.end_time = None - self.errors = [] - - @property - def duration(self) -> float: - if self.start_time and self.end_time: - return self.end_time - self.start_time - return 0 - - @property - def success_rate(self) -> float: - if not self.results: - return 0 - return sum(1 for r in self.results if r.success) / len(self.results) * 100 - - def log_summary(self, logger: AsyncLogger): - logger.info(f"=== Test: {self.name} ===", tag="SUMMARY") - logger.info( - message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}", - tag="SUMMARY", - params={ - "duration": self.duration, - "success_rate": self.success_rate, - "count": len(self.results) - } - ) - - if self.errors: - logger.error( - message="Errors ({count}): {errors}", - tag="SUMMARY", - params={ - "count": len(self.errors), - "errors": "; ".join(str(e) for e in self.errors) - } - ) - -# ======== TEST SCENARIO 1: Simple default configuration ======== -async def test_default_configuration(): - """ - Test Scenario 1: Simple default configuration - - This tests the basic case where the user does not provide any specific - browser configuration, relying on default auto-setup. - """ - logger = AsyncLogger(verbose=True) - results = TestResults("Default Configuration") - - try: - # Create pipeline with no browser config - pipeline = await create_pipeline(logger=logger) - - # Start timing - results.start_time = asyncio.get_event_loop().time() - - # Create basic crawler config - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - wait_until="domcontentloaded" - ) - - # Process each URL sequentially - for url in TEST_URLS: - try: - logger.info(f"Crawling {url} with default configuration", tag="TEST") - result = await pipeline.crawl(url=url, config=crawler_config) - results.results.append(result) - - logger.success( - message="Result: url={url}, success={success}, content_length={length}", - tag="TEST", - params={ - "url": url, - "success": result.success, - "length": len(result.html) if result.html else 0 - } - ) - except Exception as e: - logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") - results.errors.append(e) - - # End timing - results.end_time = asyncio.get_event_loop().time() - - except Exception as e: - logger.error(f"Test failed with error: {str(e)}", tag="TEST") - results.errors.append(e) - - # Log summary - results.log_summary(logger) - - return results - -# ======== TEST SCENARIO 2: Detailed custom configuration ======== -async def test_custom_configuration(): - """ - Test Scenario 2: Detailed custom configuration - - This tests the case where the user provides detailed browser configuration - to customize the browser behavior. - """ - logger = AsyncLogger(verbose=True) - results = TestResults("Custom Configuration") - - try: - # Create custom browser config - browser_config = BrowserConfig( - browser_type="chromium", - headless=True, - viewport_width=1920, - viewport_height=1080, - user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36", - light_mode=True, - ignore_https_errors=True, - extra_args=["--disable-extensions"] - ) - - # Create custom crawler config - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - wait_until="networkidle", - page_timeout=30000, - screenshot=True, - pdf=False, - screenshot_wait_for=0.5, - wait_for_images=True, - scan_full_page=True, - scroll_delay=0.2, - process_iframes=True, - remove_overlay_elements=True - ) - - # Create pipeline with custom configuration - pipeline = await create_pipeline( - browser_config=browser_config, - logger=logger - ) - - # Start timing - results.start_time = asyncio.get_event_loop().time() - - # Process each URL sequentially - for url in TEST_URLS: - try: - logger.info(f"Crawling {url} with custom configuration", tag="TEST") - result = await pipeline.crawl(url=url, config=crawler_config) - results.results.append(result) - - has_screenshot = result.screenshot is not None - - logger.success( - message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}", - tag="TEST", - params={ - "url": url, - "success": result.success, - "screenshot": has_screenshot, - "length": len(result.html) if result.html else 0 - } - ) - except Exception as e: - logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") - results.errors.append(e) - - # End timing - results.end_time = asyncio.get_event_loop().time() - - # Get browser hub status from context - try: - # Run a dummy crawl to get the context with browser hub - context = await pipeline.process({"url": "about:blank", "config": crawler_config}) - browser_hub = context.get("browser_hub") - if browser_hub: - status = await browser_hub.get_pool_status() - logger.info( - message="Browser hub status: {status}", - tag="TEST", - params={"status": status} - ) - except Exception as e: - logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST") - - except Exception as e: - logger.error(f"Test failed with error: {str(e)}", tag="TEST") - results.errors.append(e) - - # Log summary - results.log_summary(logger) - - return results - -# ======== TEST SCENARIO 3: Using pre-initialized browser hub ======== -async def test_preinitalized_browser_hub(): - """ - Test Scenario 3: Using pre-initialized browser hub - - This tests the case where a browser hub is initialized separately - and then passed to the pipeline. - """ - logger = AsyncLogger(verbose=True) - results = TestResults("Pre-initialized Browser Hub") - - browser_hub = None - try: - # Create and initialize browser hub separately - logger.info("Initializing browser hub separately", tag="TEST") - - browser_config = BrowserConfig( - browser_type="chromium", - headless=True, - verbose=True - ) - - browser_hub = await BrowserHub.get_browser_manager( - config=browser_config, - hub_id="test_preinitalized", - logger=logger, - max_browsers_per_config=2, - max_pages_per_browser=3, - initial_pool_size=2 - ) - - # Display initial status - status = await browser_hub.get_pool_status() - logger.info( - message="Initial browser hub status: {status}", - tag="TEST", - params={"status": status} - ) - - # Create pipeline with pre-initialized browser hub - pipeline = await create_pipeline( - browser_hub=browser_hub, - logger=logger - ) - - # Create crawler config - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - wait_until="networkidle", - screenshot=True - ) - - # Start timing - results.start_time = asyncio.get_event_loop().time() - - # Process URLs in parallel - async def crawl_url(url): - try: - logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST") - result = await pipeline.crawl(url=url, config=crawler_config) - logger.success(f"Completed crawl of {url}", tag="TEST") - return result - except Exception as e: - logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") - results.errors.append(e) - return None - - # Create tasks for all URLs - tasks = [crawl_url(url) for url in TEST_URLS] - - # Execute all tasks in parallel and collect results - all_results = await asyncio.gather(*tasks) - results.results = [r for r in all_results if r is not None] - - # End timing - results.end_time = asyncio.get_event_loop().time() - - # Display final status - status = await browser_hub.get_pool_status() - logger.info( - message="Final browser hub status: {status}", - tag="TEST", - params={"status": status} - ) - - except Exception as e: - logger.error(f"Test failed with error: {str(e)}", tag="TEST") - results.errors.append(e) - - # Log summary - results.log_summary(logger) - - return results, browser_hub - -# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ======== -async def test_parallel_pipelines(): - """ - Test Scenario 4: Multiple parallel pipelines sharing browser hub - - This tests the case where multiple pipelines share the same browser hub, - demonstrating resource sharing and parallel operation. - """ - logger = AsyncLogger(verbose=True) - results = TestResults("Parallel Pipelines") - - # We'll reuse the browser hub from the previous test - _, browser_hub = await test_preinitalized_browser_hub() - - try: - # Create 3 pipelines that all share the same browser hub - pipelines = [] - for i in range(3): - pipeline = await create_pipeline( - browser_hub=browser_hub, - logger=logger - ) - pipelines.append(pipeline) - - logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST") - - # Create crawler configs with different settings - configs = [ - CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False), - CrawlerRunConfig(wait_until="networkidle", screenshot=True), - CrawlerRunConfig(wait_until="load", scan_full_page=True) - ] - - # Start timing - results.start_time = asyncio.get_event_loop().time() - - # Function to process URLs with a specific pipeline - async def process_with_pipeline(pipeline_idx, urls): - pipeline_results = [] - for url in urls: - try: - logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST") - result = await pipelines[pipeline_idx].crawl( - url=url, - config=configs[pipeline_idx] - ) - pipeline_results.append(result) - logger.success( - message="Pipeline {idx} completed: url={url}, success={success}", - tag="TEST", - params={ - "idx": pipeline_idx, - "url": url, - "success": result.success - } - ) - except Exception as e: - logger.error( - message="Pipeline {idx} error: {error}", - tag="TEST", - params={ - "idx": pipeline_idx, - "error": str(e) - } - ) - results.errors.append(e) - return pipeline_results - - # Distribute URLs among pipelines - pipeline_urls = [ - TEST_URLS[:2], - TEST_URLS[2:4], - TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3 - ] - - # Execute all pipelines in parallel - tasks = [ - process_with_pipeline(i, urls) - for i, urls in enumerate(pipeline_urls) - ] - - pipeline_results = await asyncio.gather(*tasks) - - # Flatten results - for res_list in pipeline_results: - results.results.extend(res_list) - - # End timing - results.end_time = asyncio.get_event_loop().time() - - # Display browser hub status - status = await browser_hub.get_pool_status() - logger.info( - message="Browser hub status after parallel pipelines: {status}", - tag="TEST", - params={"status": status} - ) - - except Exception as e: - logger.error(f"Test failed with error: {str(e)}", tag="TEST") - results.errors.append(e) - - # Log summary - results.log_summary(logger) - - return results - -# ======== TEST SCENARIO 5: Browser hub with connection string ======== -async def test_connection_string(): - """ - Test Scenario 5: Browser hub with connection string - - This tests the case where a browser hub is initialized from a connection string, - simulating connecting to a running browser hub service. - """ - logger = AsyncLogger(verbose=True) - results = TestResults("Connection String") - - try: - # Create pipeline with connection string - # Note: In a real implementation, this would connect to an existing service - # For this test, we're using a simulated connection - connection_string = "localhost:9222" # Simulated connection string - - pipeline = await create_pipeline( - browser_hub_connection=connection_string, - logger=logger - ) - - # Create crawler config - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - wait_until="networkidle" - ) - - # Start timing - results.start_time = asyncio.get_event_loop().time() - - # Test with a single URL - url = TEST_URLS[0] - try: - logger.info(f"Crawling {url} with connection string hub", tag="TEST") - result = await pipeline.crawl(url=url, config=crawler_config) - results.results.append(result) - - logger.success( - message="Result: url={url}, success={success}, content_length={length}", - tag="TEST", - params={ - "url": url, - "success": result.success, - "length": len(result.html) if result.html else 0 - } - ) - except Exception as e: - logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") - results.errors.append(e) - - # End timing - results.end_time = asyncio.get_event_loop().time() - - except Exception as e: - logger.error(f"Test failed with error: {str(e)}", tag="TEST") - results.errors.append(e) - - # Log summary - results.log_summary(logger) - - return results - -# ======== RUN ALL TESTS ======== -async def run_all_tests(): - """Run all test scenarios""" - logger = AsyncLogger(verbose=True) - logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN") - - try: - # Run each test scenario - await test_default_configuration() - # await test_custom_configuration() - # await test_preinitalized_browser_hub() - # await test_parallel_pipelines() - # await test_connection_string() - - except Exception as e: - logger.error(f"Test suite failed: {str(e)}", tag="MAIN") - finally: - # Clean up all browser hubs - logger.info("Shutting down all browser hubs...", tag="MAIN") - await BrowserHub.shutdown_all() - logger.success("All tests completed", tag="MAIN") - -if __name__ == "__main__": - asyncio.run(run_all_tests()) \ No newline at end of file diff --git a/crawl4ai/pipeline/pipeline.py b/crawl4ai/pipeline/pipeline.py index d4fc1aeb..ca9c3a51 100644 --- a/crawl4ai/pipeline/pipeline.py +++ b/crawl4ai/pipeline/pipeline.py @@ -1,14 +1,30 @@ import time -from typing import Callable, Dict, List, Any, Optional, Awaitable +import asyncio +from typing import Callable, Dict, List, Any, Optional, Awaitable, Union, TypedDict, Tuple, Coroutine -from middlewares import create_default_middleware_list, handle_error_middleware -from crawl4ai.models import CrawlResultContainer +from .middlewares import create_default_middleware_list, handle_error_middleware +from crawl4ai.models import CrawlResultContainer, CrawlResult from crawl4ai.async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig from crawl4ai.async_logger import AsyncLogger +class CrawlSpec(TypedDict, total=False): + """Specification for a single crawl operation in batch_crawl.""" + url: str + config: Optional[CrawlerRunConfig] + browser_config: Optional[BrowserConfig] + +class BatchStatus(TypedDict, total=False): + """Status information for batch crawl operations.""" + total: int + processed: int + succeeded: int + failed: int + in_progress: int + duration: float + class Pipeline: """ A pipeline processor that executes a series of async middleware functions. diff --git a/tests/pipeline/test_batch_crawl.py b/tests/pipeline/test_batch_crawl.py index 84d7a63f..b149967e 100644 --- a/tests/pipeline/test_batch_crawl.py +++ b/tests/pipeline/test_batch_crawl.py @@ -1,405 +1,163 @@ -# test_batch_crawl.py +"""Test the Crawler class for batch crawling capabilities.""" import asyncio -import unittest -from unittest.mock import Mock, patch, AsyncMock import pytest +from typing import List, Dict, Any, Optional, Tuple -from crawl4ai.pipeline import Pipeline, create_pipeline, batch_crawl -from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai import Crawler +from crawl4ai import BrowserConfig, CrawlerRunConfig from crawl4ai.async_logger import AsyncLogger from crawl4ai.models import CrawlResult, CrawlResultContainer -from crawl4ai.browser_hub_manager import BrowserHubManager +from crawl4ai.browser import BrowserHub +from crawl4ai.cache_context import CacheMode +# Test URLs for crawling +SAFE_URLS = [ + "https://example.com", + "https://httpbin.org/html", + "https://httpbin.org/headers", + "https://httpbin.org/ip", + "https://httpbin.org/user-agent", + "https://httpstat.us/200", + "https://jsonplaceholder.typicode.com/posts/1", + "https://jsonplaceholder.typicode.com/comments/1", + "https://iana.org", + "https://www.python.org" +] -# Utility function for tests -async def create_mock_result(url, success=True, status_code=200, html=""): - """Create a mock crawl result for testing""" - result = CrawlResult( - url=url, - html=html, - success=success, - status_code=status_code, - error_message="" if success else f"Error crawling {url}" +# Simple test for batch crawling +@pytest.mark.asyncio +async def test_batch_crawl_simple(): + """Test simple batch crawling with multiple URLs.""" + # Use a few test URLs + urls = SAFE_URLS[:3] + + # Custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="domcontentloaded" ) - return CrawlResultContainer(result) + + # Crawl multiple URLs using batch crawl + results = await Crawler.crawl( + urls, + crawler_config=crawler_config + ) + + # Verify the results + assert isinstance(results, dict) + assert len(results) == len(urls) + + for url in urls: + assert url in results + assert results[url].success + assert results[url].html is not None +# Test parallel batch crawling +@pytest.mark.asyncio +async def test_parallel_batch_crawl(): + """Test parallel batch crawling with multiple URLs.""" + # Use several URLs for parallel crawling + urls = SAFE_URLS[:5] + + # Basic crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="domcontentloaded" + ) + + # Crawl in parallel + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl( + urls, + crawler_config=crawler_config + ) + end_time = asyncio.get_event_loop().time() + + # Verify results + assert len(results) == len(urls) + successful = sum(1 for r in results.values() if r.success) + + print(f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s") + print(f"Success rate: {successful}/{len(urls)}") + + # At least 80% should succeed + assert successful / len(urls) >= 0.8 -class TestBatchCrawl(unittest.IsolatedAsyncioTestCase): - """Test cases for the batch_crawl function""" +# Test batch crawling with different configurations +@pytest.mark.asyncio +async def test_batch_crawl_mixed_configs(): + """Test batch crawling with different configurations for different URLs.""" + # Create URL batches with different configurations + batch1 = (SAFE_URLS[:2], CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False)) + batch2 = (SAFE_URLS[2:4], CrawlerRunConfig(wait_until="networkidle", screenshot=True)) - async def asyncSetUp(self): - """Set up test environment""" - self.logger = AsyncLogger(verbose=False) - self.browser_config = BrowserConfig(headless=True) - self.crawler_config = CrawlerRunConfig() - - # URLs for testing - self.test_urls = [ - "https://example.com/1", - "https://example.com/2", - "https://example.com/3", - "https://example.com/4", - "https://example.com/5" - ] - - # Mock pipeline to avoid actual crawling - self.mock_pipeline = AsyncMock() - self.mock_pipeline.crawl = AsyncMock() - - # Set up pipeline to return success for most URLs, but failure for one - async def mock_crawl(url, config=None): - if url == "https://example.com/3": - return await create_mock_result(url, success=False, status_code=404) - return await create_mock_result(url, success=True) - - self.mock_pipeline.crawl.side_effect = mock_crawl - - # Patch the create_pipeline function - self.create_pipeline_patch = patch( - 'crawl4ai.pipeline.create_pipeline', - return_value=self.mock_pipeline - ) - self.mock_create_pipeline = self.create_pipeline_patch.start() + # Crawl with mixed configurations + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl([batch1, batch2]) + end_time = asyncio.get_event_loop().time() - async def asyncTearDown(self): - """Clean up after tests""" - self.create_pipeline_patch.stop() - await BrowserHubManager.shutdown_all() + # Extract all URLs + all_urls = batch1[0] + batch2[0] - # === Basic Functionality Tests === + # Verify results + assert len(results) == len(all_urls) - async def test_simple_batch_with_single_config(self): - """Test basic batch crawling with one configuration for all URLs""" - # Call the batch_crawl function with a list of URLs and single config - results = await batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config - ) - - # Verify we got results for all URLs - self.assertEqual(len(results), len(self.test_urls)) - - # Check that pipeline.crawl was called for each URL - self.assertEqual(self.mock_pipeline.crawl.call_count, len(self.test_urls)) - - # Check success/failure as expected - success_count = sum(1 for r in results if r.success) - self.assertEqual(success_count, len(self.test_urls) - 1) # All except URL 3 - - # Verify URLs in results match input URLs - result_urls = sorted([r.url for r in results]) - self.assertEqual(result_urls, sorted(self.test_urls)) + # Check that screenshots are present only for batch2 + for url in batch1[0]: + assert results[url].screenshot is None - async def test_batch_with_crawl_specs(self): - """Test batch crawling with different configurations per URL""" - # Create different configs for each URL - crawl_specs = [ - {"url": url, "crawler_config": CrawlerRunConfig(screenshot=i % 2 == 0)} - for i, url in enumerate(self.test_urls) - ] - - # Call batch_crawl with crawl specs - results = await batch_crawl( - crawl_specs=crawl_specs, - browser_config=self.browser_config - ) - - # Verify results - self.assertEqual(len(results), len(crawl_specs)) - - # Verify each URL was crawled with its specific config - for i, spec in enumerate(crawl_specs): - call_args = self.mock_pipeline.crawl.call_args_list[i] - self.assertEqual(call_args[1]['url'], spec['url']) - self.assertEqual( - call_args[1]['config'].screenshot, - spec['crawler_config'].screenshot - ) + for url in batch2[0]: + assert results[url].screenshot is not None - # === Advanced Configuration Tests === + print(f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s") + +# Test shared browser hub +@pytest.mark.asyncio +async def test_batch_crawl_shared_hub(): + """Test batch crawling with a shared browser hub.""" + # Create and initialize a browser hub + browser_config = BrowserConfig( + browser_type="chromium", + headless=True + ) - async def test_with_multiple_browser_configs(self): - """Test using different browser configurations for different URLs""" - # Create different browser configs - browser_config1 = BrowserConfig(headless=True, browser_type="chromium") - browser_config2 = BrowserConfig(headless=True, browser_type="firefox") - - # Create crawl specs with different browser configs - crawl_specs = [ - { - "url": self.test_urls[0], - "browser_config": browser_config1, - "crawler_config": self.crawler_config - }, - { - "url": self.test_urls[1], - "browser_config": browser_config2, - "crawler_config": self.crawler_config - } - ] - - # Call batch_crawl with mixed browser configs - results = await batch_crawl(crawl_specs=crawl_specs) - - # Verify results - self.assertEqual(len(results), len(crawl_specs)) - - # Verify create_pipeline was called with different browser configs - self.assertEqual(self.mock_create_pipeline.call_count, 2) - - # Check call arguments for create_pipeline - call_args_list = self.mock_create_pipeline.call_args_list - self.assertEqual(call_args_list[0][1]['browser_config'], browser_config1) - self.assertEqual(call_args_list[1][1]['browser_config'], browser_config2) + browser_hub = await BrowserHub.get_browser_manager( + config=browser_config, + max_browsers_per_config=3, + max_pages_per_browser=4, + initial_pool_size=1 + ) - async def test_with_existing_browser_hub(self): - """Test using a pre-initialized browser hub""" - # Create a mock browser hub - mock_hub = AsyncMock() + try: + # Use the hub for parallel crawling + urls = SAFE_URLS[:3] - # Call batch_crawl with browser hub - results = await batch_crawl( - urls=self.test_urls, - browser_hub=mock_hub, - crawler_config=self.crawler_config - ) - - # Verify create_pipeline was called with the browser hub - self.mock_create_pipeline.assert_called_with( - browser_hub=mock_hub, - logger=self.logger - ) - - # Verify results - self.assertEqual(len(results), len(self.test_urls)) - - # === Error Handling and Retry Tests === - - async def test_retry_on_failure(self): - """Test retrying failed URLs up to max_tries""" - # Modify mock to fail first 2 times for URL 3, then succeed - attempt_counts = {url: 0 for url in self.test_urls} - - async def mock_crawl_with_retries(url, config=None): - attempt_counts[url] += 1 - if url == "https://example.com/3" and attempt_counts[url] <= 2: - return await create_mock_result(url, success=False, status_code=500) - return await create_mock_result(url, success=True) - - self.mock_pipeline.crawl.side_effect = mock_crawl_with_retries - - # Call batch_crawl with retry configuration - results = await batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config, - max_tries=3 - ) - - # Verify all URLs succeeded after retries - self.assertTrue(all(r.success for r in results)) - - # Check retry count for URL 3 - self.assertEqual(attempt_counts["https://example.com/3"], 3) - - # Check other URLs were only tried once - for url in self.test_urls: - if url != "https://example.com/3": - self.assertEqual(attempt_counts[url], 1) - - async def test_give_up_after_max_tries(self): - """Test that crawling gives up after max_tries""" - # Modify mock to always fail for URL 3 - async def mock_crawl_always_fail(url, config=None): - if url == "https://example.com/3": - return await create_mock_result(url, success=False, status_code=500) - return await create_mock_result(url, success=True) - - self.mock_pipeline.crawl.side_effect = mock_crawl_always_fail - - # Call batch_crawl with retry configuration - results = await batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config, - max_tries=3 - ) - - # Find result for URL 3 - url3_result = next(r for r in results if r.url == "https://example.com/3") - - # Verify URL 3 still failed after max retries - self.assertFalse(url3_result.success) - - # Verify retry metadata is present (assuming we add this to the result) - self.assertEqual(url3_result.attempt_count, 3) - self.assertTrue(hasattr(url3_result, 'retry_error_messages')) - - async def test_exception_during_crawl(self): - """Test handling exceptions during crawling""" - # Modify mock to raise exception for URL 4 - async def mock_crawl_with_exception(url, config=None): - if url == "https://example.com/4": - raise RuntimeError("Simulated crawler exception") - return await create_mock_result(url, success=True) - - self.mock_pipeline.crawl.side_effect = mock_crawl_with_exception - - # Call batch_crawl - results = await batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config - ) - - # Verify we still get results for all URLs - self.assertEqual(len(results), len(self.test_urls)) - - # Find result for URL 4 - url4_result = next(r for r in results if r.url == "https://example.com/4") - - # Verify URL 4 is marked as failed - self.assertFalse(url4_result.success) - - # Verify exception info is captured - self.assertIn("Simulated crawler exception", url4_result.error_message) - - # === Performance and Control Tests === - - async def test_concurrency_limit(self): - """Test limiting concurrent crawls""" - # Create a slow mock crawl function to test concurrency - crawl_started = {url: asyncio.Event() for url in self.test_urls} - crawl_proceed = {url: asyncio.Event() for url in self.test_urls} - - async def slow_mock_crawl(url, config=None): - crawl_started[url].set() - await crawl_proceed[url].wait() - return await create_mock_result(url) - - self.mock_pipeline.crawl.side_effect = slow_mock_crawl - - # Start batch_crawl with concurrency limit of 2 - task = asyncio.create_task( - batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config, - concurrency=2 + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl( + urls, + browser_hub=browser_hub, + crawler_config=CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="domcontentloaded" ) ) + end_time = asyncio.get_event_loop().time() - # Wait for first 2 crawls to start - await asyncio.wait( - [crawl_started[self.test_urls[0]].wait(), - crawl_started[self.test_urls[1]].wait()], - timeout=1 - ) + # Verify results + assert len(results) == len(urls) + successful = sum(1 for r in results.values() if r.success) - # Verify only 2 crawls started - started_count = sum(1 for url in self.test_urls if crawl_started[url].is_set()) - self.assertEqual(started_count, 2) + print(f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s") + print(f"Success rate: {successful}/{len(urls)}") - # Allow first crawl to complete - crawl_proceed[self.test_urls[0]].set() + # Get browser hub statistics + hub_stats = await browser_hub.get_pool_status() + print(f"Browser hub stats: {hub_stats}") - # Wait for next crawl to start - await asyncio.wait([crawl_started[self.test_urls[2]].wait()], timeout=1) + # At least 80% should succeed + assert successful / len(urls) >= 0.8 - # Now 3 total should have started (2 running, 1 completed) - started_count = sum(1 for url in self.test_urls if crawl_started[url].is_set()) - self.assertEqual(started_count, 3) - - # Allow all remaining crawls to complete - for url in self.test_urls: - crawl_proceed[url].set() - - # Wait for batch_crawl to complete - results = await task - - # Verify all URLs were crawled - self.assertEqual(len(results), len(self.test_urls)) - - async def test_cancel_batch_crawl(self): - """Test cancelling a batch crawl operation""" - # Create a crawl function that won't complete unless signaled - crawl_started = {url: asyncio.Event() for url in self.test_urls} - - async def endless_mock_crawl(url, config=None): - crawl_started[url].set() - # This will wait forever unless cancelled - await asyncio.Future() - - self.mock_pipeline.crawl.side_effect = endless_mock_crawl - - # Start batch_crawl - task = asyncio.create_task( - batch_crawl( - urls=self.test_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config - ) - ) - - # Wait for at least one crawl to start - await asyncio.wait( - [crawl_started[self.test_urls[0]].wait()], - timeout=1 - ) - - # Cancel the task - task.cancel() - - # Verify task was cancelled - with self.assertRaises(asyncio.CancelledError): - await task - - # === Edge Cases Tests === - - async def test_empty_url_list(self): - """Test behavior with empty URL list""" - results = await batch_crawl( - urls=[], - browser_config=self.browser_config, - crawler_config=self.crawler_config - ) - - # Should return empty list - self.assertEqual(results, []) - - # Verify crawl wasn't called - self.mock_pipeline.crawl.assert_not_called() - - async def test_mix_of_valid_and_invalid_urls(self): - """Test with a mix of valid and invalid URLs""" - # Include some invalid URLs - mixed_urls = [ - "https://example.com/valid", - "invalid-url", - "http:/missing-slash", - "https://example.com/valid2" - ] - - # Call batch_crawl - results = await batch_crawl( - urls=mixed_urls, - browser_config=self.browser_config, - crawler_config=self.crawler_config - ) - - # Should have results for all URLs - self.assertEqual(len(results), len(mixed_urls)) - - # Check invalid URLs were marked as failed - for result in results: - if result.url in ["invalid-url", "http:/missing-slash"]: - self.assertFalse(result.success) - self.assertIn("Invalid URL", result.error_message) - else: - self.assertTrue(result.success) - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file + finally: + # Clean up the browser hub + await browser_hub.close() \ No newline at end of file diff --git a/tests/pipeline/test_crawler.py b/tests/pipeline/test_crawler.py new file mode 100644 index 00000000..f2a8d945 --- /dev/null +++ b/tests/pipeline/test_crawler.py @@ -0,0 +1,447 @@ +# test_crawler.py +import asyncio +import warnings +import pytest +import pytest_asyncio +from typing import Optional, Tuple + +# Define test fixtures +@pytest_asyncio.fixture +async def clean_browser_hub(): + """Fixture to ensure clean browser hub state between tests.""" + # Yield control to the test + yield + + # After test, cleanup all browser hubs + from crawl4ai.browser import BrowserHub + try: + await BrowserHub.shutdown_all() + except Exception as e: + print(f"Error during browser cleanup: {e}") + +from crawl4ai import Crawler +from crawl4ai import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.models import CrawlResultContainer +from crawl4ai.browser import BrowserHub +from crawl4ai.cache_context import CacheMode + +import warnings +from pydantic import PydanticDeprecatedSince20 + + + +# Test URLs for crawling +SAFE_URLS = [ + "https://example.com", + "https://httpbin.org/html", + "https://httpbin.org/headers", + "https://httpbin.org/ip", + "https://httpbin.org/user-agent", + "https://httpstat.us/200", + "https://jsonplaceholder.typicode.com/posts/1", + "https://jsonplaceholder.typicode.com/comments/1", + "https://iana.org", + "https://www.python.org", +] + + +class TestCrawlerBasic: + """Basic tests for the Crawler utility class""" + + @pytest.mark.asyncio + async def test_simple_crawl_single_url(self, clean_browser_hub): + """Test crawling a single URL with default configuration""" + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=Warning) + # Basic logger + logger = AsyncLogger(verbose=True) + + # Basic single URL crawl with default configuration + url = "https://example.com" + result = await Crawler.crawl(url) + + # Verify the result + assert isinstance(result, CrawlResultContainer) + assert result.success + assert result.url == url + assert result.html is not None + assert len(result.html) > 0 + + @pytest.mark.asyncio + async def test_crawl_with_custom_config(self, clean_browser_hub): + """Test crawling with custom browser and crawler configuration""" + # Custom browser config + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, + viewport_width=1280, + viewport_height=800, + ) + + # Custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, wait_until="networkidle", screenshot=True + ) + + # Crawl with custom configuration + url = "https://httpbin.org/html" + result = await Crawler.crawl( + url, browser_config=browser_config, crawler_config=crawler_config + ) + + # Verify the result + assert result.success + assert result.url == url + assert result.screenshot is not None + + @pytest.mark.asyncio + async def test_crawl_multiple_urls_sequential(self, clean_browser_hub): + """Test crawling multiple URLs sequentially""" + # Use a few test URLs + urls = SAFE_URLS[:3] + + # Custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded" + ) + + # Crawl multiple URLs sequentially + results = await Crawler.crawl(urls, crawler_config=crawler_config) + + # Verify the results + assert isinstance(results, dict) + assert len(results) == len(urls) + + for url in urls: + assert url in results + assert results[url].success + assert results[url].html is not None + + @pytest.mark.asyncio + async def test_crawl_with_error_handling(self, clean_browser_hub): + """Test error handling during crawling""" + # Include a valid URL and a non-existent URL + urls = ["https://example.com", "https://non-existent-domain-123456789.com"] + + # Crawl with retries + results = await Crawler.crawl(urls, max_retries=2, retry_delay=1.0) + + # Verify results for both URLs + assert len(results) == 2 + + # Valid URL should succeed + assert results[urls[0]].success + + # Invalid URL should fail but be in results + assert urls[1] in results + assert not results[urls[1]].success + assert results[urls[1]].error_message is not None + + +class TestCrawlerParallel: + """Tests for the parallel crawling capabilities of Crawler""" + + @pytest.mark.asyncio + async def test_parallel_crawl_simple(self, clean_browser_hub): + """Test basic parallel crawling with same configuration""" + # Use several URLs for parallel crawling + urls = SAFE_URLS[:5] + + # Basic crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded" + ) + + # Crawl in parallel with default concurrency + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl(urls, crawler_config=crawler_config) + end_time = asyncio.get_event_loop().time() + + # Verify results + assert len(results) == len(urls) + successful = sum(1 for r in results.values() if r.success) + + print( + f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s" + ) + print(f"Success rate: {successful}/{len(urls)}") + + # At least 80% should succeed + assert successful / len(urls) >= 0.8 + + @pytest.mark.asyncio + async def test_parallel_crawl_with_concurrency_limit(self, clean_browser_hub): + """Test parallel crawling with concurrency limit""" + # Use more URLs to test concurrency control + urls = SAFE_URLS[:8] + + # Custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded" + ) + + # Limited concurrency + concurrency = 2 + + # Time the crawl + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl( + urls, crawler_config=crawler_config, concurrency=concurrency + ) + end_time = asyncio.get_event_loop().time() + + # Verify results + assert len(results) == len(urls) + successful = sum(1 for r in results.values() if r.success) + + print( + f"Parallel crawl with concurrency={concurrency} of {len(urls)} URLs completed in {end_time - start_time:.2f}s" + ) + print(f"Success rate: {successful}/{len(urls)}") + + # At least 80% should succeed + assert successful / len(urls) >= 0.8 + + @pytest.mark.asyncio + async def test_parallel_crawl_with_different_configs(self, clean_browser_hub): + """Test parallel crawling with different configurations for different URLs""" + # Create URL batches with different configurations + batch1 = ( + SAFE_URLS[:2], + CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False), + ) + batch2 = ( + SAFE_URLS[2:4], + CrawlerRunConfig(wait_until="networkidle", screenshot=True), + ) + batch3 = ( + SAFE_URLS[4:6], + CrawlerRunConfig(wait_until="load", scan_full_page=True), + ) + + # Crawl with mixed configurations + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl([batch1, batch2, batch3]) + end_time = asyncio.get_event_loop().time() + + # Extract all URLs + all_urls = batch1[0] + batch2[0] + batch3[0] + + # Verify results + assert len(results) == len(all_urls) + + # Check that screenshots are present only for batch2 + for url in batch1[0]: + assert not results[url].screenshot + + for url in batch2[0]: + assert results[url].screenshot + + print( + f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s" + ) + + @pytest.mark.asyncio + async def test_parallel_crawl_with_shared_browser_hub(self, clean_browser_hub): + """Test parallel crawling with a shared browser hub""" + # Create and initialize a browser hub + browser_config = BrowserConfig(browser_type="chromium", headless=True) + + browser_hub = await BrowserHub.get_browser_manager( + config=browser_config, + max_browsers_per_config=3, + max_pages_per_browser=4, + initial_pool_size=1, + ) + + try: + # Use the hub for parallel crawling + urls = SAFE_URLS[:6] + + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl( + urls, + browser_hub=browser_hub, + crawler_config=CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded" + ), + ) + end_time = asyncio.get_event_loop().time() + + # Verify results + # assert (len(results), len(urls)) + assert len(results) == len(urls) + successful = sum(1 for r in results.values() if r.success) + + print( + f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s" + ) + print(f"Success rate: {successful}/{len(urls)}") + + # Get browser hub statistics + hub_stats = await browser_hub.get_pool_status() + print(f"Browser hub stats: {hub_stats}") + + # At least 80% should succeed + # assert (successful / len(urls), 0.8) + assert successful / len(urls) >= 0.8 + + finally: + # Clean up the browser hub + await browser_hub.close() + + +class TestCrawlerAdvanced: + """Advanced tests for the Crawler utility class""" + + @pytest.mark.asyncio + async def test_crawl_with_customized_batch_config(self, clean_browser_hub): + """Test crawling with fully customized batch configuration""" + # Create URL batches with different browser and crawler configurations + browser_config1 = BrowserConfig(browser_type="chromium", headless=True) + browser_config2 = BrowserConfig( + browser_type="chromium", headless=False, viewport_width=1920 + ) + + crawler_config1 = CrawlerRunConfig(wait_until="domcontentloaded") + crawler_config2 = CrawlerRunConfig(wait_until="networkidle", screenshot=True) + + batch1 = (SAFE_URLS[:2], browser_config1, crawler_config1) + batch2 = (SAFE_URLS[2:4], browser_config2, crawler_config2) + + # Crawl with mixed configurations + results = await Crawler.parallel_crawl([batch1, batch2]) + + # Extract all URLs + all_urls = batch1[0] + batch2[0] + + # Verify results + # assert (len(results), len(all_urls)) + assert len(results) == len(all_urls) + + # Verify batch-specific processing + for url in batch1[0]: + assert results[url].screenshot is None # No screenshots for batch1 + + for url in batch2[0]: + assert results[url].screenshot is not None # Should have screenshots for batch2 + + @pytest.mark.asyncio + async def test_crawl_with_progress_callback(self, clean_browser_hub): + """Test crawling with progress callback""" + # Use several URLs + urls = SAFE_URLS[:5] + + # Track progress + progress_data = {"started": 0, "completed": 0, "failed": 0, "updates": []} + + # Progress callback + async def on_progress( + status: str, url: str, result: Optional[CrawlResultContainer] = None + ): + if status == "started": + progress_data["started"] += 1 + elif status == "completed": + progress_data["completed"] += 1 + if not result.success: + progress_data["failed"] += 1 + + progress_data["updates"].append((status, url)) + print(f"Progress: {status} - {url}") + + # Crawl with progress tracking + results = await Crawler.parallel_crawl( + urls, + crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"), + progress_callback=on_progress, + ) + + # Verify progress tracking + assert progress_data["started"] == len(urls) + assert progress_data["completed"] == len(urls) + assert len(progress_data["updates"]) == len(urls) * 2 # start + complete events + + @pytest.mark.asyncio + async def test_crawl_with_dynamic_retry_strategy(self, clean_browser_hub): + """Test crawling with a dynamic retry strategy""" + # Include URLs that might fail + urls = [ + "https://example.com", + "https://httpstat.us/500", + "https://httpstat.us/404", + ] + + # Custom retry strategy + async def retry_strategy( + url: str, attempt: int, error: Exception + ) -> Tuple[bool, float]: + # Only retry 500 errors, not 404s + if "500" in url: + return True, 1.0 # Retry with 1 second delay + return False, 0.0 # Don't retry other errors + + # Crawl with custom retry strategy + results = await Crawler.parallel_crawl( + urls, + crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"), + retry_strategy=retry_strategy, + max_retries=3, + ) + + # Verify results + assert len(results) == len(urls) + + # Example.com should succeed + assert results[urls[0]].success + + # httpstat.us pages return content even for error status codes + # so our crawler marks them as successful since it got HTML content + # Verify that we got the expected status code + assert results[urls[1]].status_code == 500 + + # 404 should have the correct status code + assert results[urls[2]].status_code == 404 + + @pytest.mark.asyncio + async def test_crawl_with_very_large_batch(self, clean_browser_hub): + """Test crawling with a very large batch of URLs""" + # Create a batch by repeating our safe URLs + # Note: In a real test, we'd use more URLs, but for simplicity we'll use a smaller set + large_batch = list(dict.fromkeys(SAFE_URLS[:5] * 2)) # ~10 unique URLs + + # Set a reasonable concurrency limit + concurrency = 10 + + # Time the crawl + start_time = asyncio.get_event_loop().time() + results = await Crawler.parallel_crawl( + large_batch, + crawler_config=CrawlerRunConfig( + wait_until="domcontentloaded", + page_timeout=10000, # Shorter timeout for large batch + ), + concurrency=concurrency, + ) + end_time = asyncio.get_event_loop().time() + + # Verify results + # assert (len(results), len(large_batch)) + assert len(results) == len(large_batch) + successful = sum(1 for r in results.values() if r.success) + + print( + f"Large batch crawl of {len(large_batch)} URLs completed in {end_time - start_time:.2f}s" + ) + print(f"Success rate: {successful}/{len(large_batch)}") + print( + f"Average time per URL: {(end_time - start_time) / len(large_batch):.2f}s" + ) + + # At least 80% should succeed (from our unique URLs) + assert successful / len(results) >= 0.8 + + +if __name__ == "__main__": + # Use pytest for async tests + pytest.main(["-xvs", __file__])