diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 0606c656..2306a0a6 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -156,41 +156,6 @@ def is_empty_value(value: Any) -> bool: return False -class PagePoolConfig: - """Configuration for browser page pooling. - - This class configures the page pooling mechanism that maintains pre-warmed - browser pages ready for immediate use, improving performance for scenarios - where multiple URLs need to be processed in sequence. - - Attributes: - mode (str): Pooling mode - "static" or "adaptive". - "static" uses a fixed pool size defined by static_size. - "adaptive" calculates optimal size based on available system memory. - Default: "static". - static_size (int): Number of pages to maintain in the pool when mode is "static". - Default: 10. - memory_per_page (int): Estimated memory used by a single page in MB. - Used for "adaptive" mode calculations. - Default: 200. - memory_threshold (float): Maximum percentage of system memory to use in "adaptive" mode. - Default: 0.7 (70% of available memory). - timeout (float): Seconds to wait for a page from the pool before creating a new one. - Default: 5.0. - """ - - def __init__(self, - mode="static", - static_size=10, - memory_per_page=200, - memory_threshold=0.7, - timeout=5.0): - self.mode = mode - self.static_size = static_size - self.memory_per_page = memory_per_page - self.memory_threshold = memory_threshold - self.timeout = timeout - class BrowserConfig: """ Configuration class for setting up a browser instance and its context in AsyncPlaywrightCrawlerStrategy. @@ -235,7 +200,7 @@ class BrowserConfig: Default: False. downloads_path (str or None): Directory to store downloaded files. If None and accept_downloads is True, a default path will be created. Default: None. - storage_state (str or dict or None): Path or object describing storage state (cookies, localStorage). + storage_state (str or dict or None): An in-memory storage state (cookies, localStorage). Default: None. ignore_https_errors (bool): Ignore HTTPS certificate errors. Default: True. java_script_enabled (bool): Enable JavaScript execution in pages. Default: True. @@ -255,9 +220,6 @@ class BrowserConfig: light_mode (bool): Disables certain background features for performance gains. Default: False. extra_args (list): Additional command-line arguments passed to the browser. Default: []. - page_pool_config (PagePoolConfig or None): Configuration for page pooling mechanism. - If None, page pooling is disabled. - Default: None. """ def __init__( @@ -298,7 +260,6 @@ class BrowserConfig: extra_args: list = None, debugging_port: int = 9222, host: str = "localhost", - page_pool_config: Optional[PagePoolConfig] = None, ): self.browser_type = browser_type self.headless = headless @@ -337,7 +298,6 @@ class BrowserConfig: self.verbose = verbose self.debugging_port = debugging_port self.host = host - self.page_pool_config = page_pool_config fa_user_agenr_generator = ValidUAGenerator() if self.user_agent_mode == "random": @@ -368,12 +328,6 @@ class BrowserConfig: @staticmethod def from_kwargs(kwargs: dict) -> "BrowserConfig": - # Handle page_pool_config - page_pool_config = kwargs.get("page_pool_config") - if isinstance(page_pool_config, dict): - # If it's a dict, convert to PagePoolConfig - page_pool_config = PagePoolConfig(**page_pool_config) - return BrowserConfig( browser_type=kwargs.get("browser_type", "chromium"), headless=kwargs.get("headless", True), @@ -407,7 +361,6 @@ class BrowserConfig: extra_args=kwargs.get("extra_args", []), debugging_port=kwargs.get("debugging_port", 9222), host=kwargs.get("host", "localhost"), - page_pool_config=page_pool_config, ) def to_dict(self): @@ -442,7 +395,6 @@ class BrowserConfig: "verbose": self.verbose, "debugging_port": self.debugging_port, "host": self.host, - "page_pool_config": self.page_pool_config, } def clone(self, **kwargs): diff --git a/crawl4ai/browser/manager.py b/crawl4ai/browser/manager.py index 4ebee637..3a37efcb 100644 --- a/crawl4ai/browser/manager.py +++ b/crawl4ai/browser/manager.py @@ -2,11 +2,14 @@ This module provides a central browser management class that uses the strategy pattern internally while maintaining the existing API. +It also implements a page pooling mechanism for improved performance. """ import asyncio import time -from typing import Optional, Tuple, Dict, Any +import os +import psutil +from typing import Optional, Tuple, Dict, Any, List, Set from playwright.async_api import Page, BrowserContext @@ -117,6 +120,28 @@ class BrowserManager: self.sessions = self._strategy.sessions return page, context + + async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]: + """Get multiple pages with the same configuration. + + This method efficiently creates multiple browser pages using the same configuration, + which is useful for parallel crawling of multiple URLs. + + Args: + crawlerRunConfig: Configuration for the pages + count: Number of pages to create + + Returns: + List of (Page, Context) tuples + """ + # Delegate to strategy + pages = await self._strategy.get_pages(crawlerRunConfig, count) + + # Sync sessions if needed + if hasattr(self._strategy, 'sessions'): + self.sessions = self._strategy.sessions + + return pages async def kill_session(self, session_id: str): """Kill a browser session and clean up resources. diff --git a/crawl4ai/browser/strategies.py b/crawl4ai/browser/strategies.py index fd47f30e..85feef36 100644 --- a/crawl4ai/browser/strategies.py +++ b/crawl4ai/browser/strategies.py @@ -23,7 +23,7 @@ from ..async_configs import BrowserConfig, CrawlerRunConfig from ..config import DOWNLOAD_PAGE_TIMEOUT from ..js_snippet import load_js_script from ..utils import get_home_folder -from .utils import get_playwright, get_browser_executable, get_browser_disable_options, create_temp_directory, is_windows +from .utils import get_playwright, get_browser_executable, get_browser_disable_options, create_temp_directory, is_windows, is_browser_running from playwright_stealth import StealthConfig @@ -85,6 +85,22 @@ class BaseBrowserStrategy(ABC): """ pass + async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]: + """Get multiple pages with the same configuration. + + Args: + crawlerRunConfig: Configuration for the pages + count: Number of pages to create + + Returns: + List of (Page, Context) tuples + """ + pages = [] + for _ in range(count): + page, context = await self.get_page(crawlerRunConfig) + pages.append((page, context)) + return pages + @abstractmethod async def close(self): """Close the browser and clean up resources.""" @@ -136,9 +152,6 @@ class BaseBrowserStrategy(ABC): if self.config.cookies: await context.add_cookies(self.config.cookies) - if self.config.storage_state: - await context.storage_state(path=None) - if self.config.accept_downloads: context.set_default_timeout(DOWNLOAD_PAGE_TIMEOUT) context.set_default_navigation_timeout(DOWNLOAD_PAGE_TIMEOUT) @@ -161,7 +174,7 @@ class BaseBrowserStrategy(ABC): { "name": "cookiesEnabled", "value": "true", - "url": crawlerRunConfig.url if crawlerRunConfig else "https://crawl4ai.com/", + "url": crawlerRunConfig and crawlerRunConfig.url or "https://crawl4ai.com/", } ] ) @@ -324,12 +337,31 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy): "viewport": viewport_settings, "proxy": proxy_settings, "accept_downloads": self.config.accept_downloads, - "storage_state": self.config.storage_state, "ignore_https_errors": self.config.ignore_https_errors, "device_scale_factor": 1.0, "java_script_enabled": self.config.java_script_enabled, } + # Handle storage state properly - this is key for persistence + if self.config.storage_state: + context_settings["storage_state"] = self.config.storage_state + if self.logger: + if isinstance(self.config.storage_state, str): + self.logger.debug(f"Using storage state from file: {self.config.storage_state}", tag="BROWSER") + else: + self.logger.debug("Using storage state from config object", tag="BROWSER") + + if self.config.user_data_dir: + context_settings["storage_state"] = os.path.join( + self.config.user_data_dir, "Default", "storage_state.json" + ) + # Create the file if it doesn't exist + if not os.path.exists(context_settings["storage_state"]): + os.makedirs(os.path.dirname(context_settings["storage_state"]), exist_ok=True) + with open(context_settings["storage_state"], "w") as f: + json.dump({}, f) + + if crawlerRunConfig: # Check if there is value for crawlerRunConfig.proxy_config set add that to context if crawlerRunConfig.proxy_config: @@ -428,6 +460,21 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy): if self.config.sleep_on_close: await asyncio.sleep(0.5) + # If we have a user_data_dir configured, ensure persistence of storage state + if self.config.user_data_dir and self.browser and self.default_context: + for context in self.browser.contexts: + try: + await context.storage_state(path=os.path.join(self.config.user_data_dir, "Default", "storage_state.json")) + if self.logger: + self.logger.debug("Ensuring storage state is persisted before closing browser", tag="BROWSER") + except Exception as e: + if self.logger: + self.logger.warning( + message="Failed to ensure storage persistence: {error}", + tag="BROWSER", + params={"error": str(e)} + ) + # Close all sessions session_ids = list(self.sessions.keys()) for session_id in session_ids: @@ -582,7 +629,7 @@ class CDPBrowserStrategy(BaseBrowserStrategy): Returns: List of command-line arguments for the browser """ - browser_path = get_browser_executable(self.config.browser_type) + browser_path = await get_browser_executable(self.config.browser_type) base_args = [browser_path] if self.config.browser_type == "chromium": @@ -727,6 +774,22 @@ class CDPBrowserStrategy(BaseBrowserStrategy): if self.config.sleep_on_close: await asyncio.sleep(0.5) + # If we have a user_data_dir configured, ensure persistence of storage state + if self.config.user_data_dir and self.browser: + try: + # Create a brief sleep to allow the browser to flush any pending operations + # This helps ensure all storage state (localStorage, cookies, etc.) gets saved + await asyncio.sleep(0.3) + if self.logger: + self.logger.debug("Ensuring storage state is persisted before closing CDP browser", tag="BROWSER") + except Exception as e: + if self.logger: + self.logger.warning( + message="Failed to ensure storage persistence: {error}", + tag="BROWSER", + params={"error": str(e)} + ) + # Close all sessions session_ids = list(self.sessions.keys()) for session_id in session_ids: @@ -775,19 +838,46 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): logger: Logger for recording events and errors """ super().__init__(config, logger) - self.builtin_browser_dir = os.path.join(get_home_folder(), "builtin-browser") + self.builtin_browser_dir = os.path.join(get_home_folder(), "builtin-browser") if not self.config.user_data_dir else self.config.user_data_dir self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json") + + # Raise error if user data dir is already engaged + if self._check_user_dir_is_engaged(self.builtin_browser_dir): + raise Exception(f"User data directory {self.builtin_browser_dir} is already engaged by another browser instance.") + os.makedirs(self.builtin_browser_dir, exist_ok=True) + def _check_user_dir_is_engaged(self, user_data_dir: str) -> bool: + """Check if the user data directory is already in use. + + Returns: + bool: True if the directory is engaged, False otherwise + """ + # Load browser config file, then iterate in port_map values, check "user_data_dir" key if it matches + # the current user data directory + if os.path.exists(self.builtin_config_file): + try: + with open(self.builtin_config_file, 'r') as f: + browser_info_dict = json.load(f) + + # Check if user data dir is already engaged + for port_str, browser_info in browser_info_dict.get("port_map", {}).items(): + if browser_info.get("user_data_dir") == user_data_dir: + return True + except Exception as e: + if self.logger: + self.logger.error(f"Error reading built-in browser config: {str(e)}", tag="BUILTIN") + return False + async def start(self): """Start or connect to the built-in browser. Returns: self: For method chaining """ - # Check for existing built-in browser - browser_info = self.get_builtin_browser_info() - if browser_info and self._is_browser_running(browser_info.get('pid')): + # Check for existing built-in browser (get_browser_info already checks if running) + browser_info = self.get_browser_info() + if browser_info: if self.logger: self.logger.info(f"Using existing built-in browser at {browser_info.get('cdp_url')}", tag="BROWSER") self.config.cdp_url = browser_info.get('cdp_url') @@ -797,7 +887,7 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): cdp_url = await self.launch_builtin_browser( browser_type=self.config.browser_type, debugging_port=self.config.debugging_port, - headless=self.config.headless + headless=self.config.headless, ) if not cdp_url: if self.logger: @@ -808,55 +898,62 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): # Call parent class implementation with updated CDP URL return await super().start() - def get_builtin_browser_info(self) -> Optional[Dict[str, Any]]: - """Get information about the built-in browser. - - Returns: - dict: Browser information or None if no built-in browser is configured - """ - if not os.path.exists(self.builtin_config_file): - return None - - try: - with open(self.builtin_config_file, 'r') as f: - browser_info = json.load(f) - - # Check if the browser is still running - if not self._is_browser_running(browser_info.get('pid')): - if self.logger: - self.logger.warning("Built-in browser is not running", tag="BUILTIN") - return None - - return browser_info - except Exception as e: - if self.logger: - self.logger.error(f"Error reading built-in browser config: {str(e)}", tag="BUILTIN") - return None - - def _is_browser_running(self, pid: Optional[int]) -> bool: - """Check if a process with the given PID is running. + @classmethod + def get_builtin_browser_info(cls, debugging_port: int, config_file: str, logger: Optional[AsyncLogger] = None) -> Optional[Dict[str, Any]]: + """Get information about the built-in browser for a specific debugging port. Args: - pid: Process ID to check + debugging_port: The debugging port to look for + config_file: Path to the config file + logger: Optional logger for recording events Returns: - bool: True if the process is running, False otherwise + dict: Browser information or None if no running browser is configured for this port """ - if not pid: - return False + if not os.path.exists(config_file): + return None try: - # Check if the process exists - if is_windows(): - process = subprocess.run(["tasklist", "/FI", f"PID eq {pid}"], - capture_output=True, text=True) - return str(pid) in process.stdout - else: - # Unix-like systems - os.kill(pid, 0) # This doesn't actually kill the process, just checks if it exists - return True - except (ProcessLookupError, PermissionError, OSError): - return False + with open(config_file, 'r') as f: + browser_info_dict = json.load(f) + + # Get browser info from port map + if isinstance(browser_info_dict, dict) and "port_map" in browser_info_dict: + port_str = str(debugging_port) + if port_str in browser_info_dict["port_map"]: + browser_info = browser_info_dict["port_map"][port_str] + + # Check if the browser is still running + if not is_browser_running(browser_info.get('pid')): + if logger: + logger.warning(f"Built-in browser on port {debugging_port} is not running", tag="BUILTIN") + # Remove this port from the dictionary + del browser_info_dict["port_map"][port_str] + with open(config_file, 'w') as f: + json.dump(browser_info_dict, f, indent=2) + return None + + return browser_info + + return None + + except Exception as e: + if logger: + logger.error(f"Error reading built-in browser config: {str(e)}", tag="BUILTIN") + return None + + def get_browser_info(self) -> Optional[Dict[str, Any]]: + """Get information about the current built-in browser instance. + + Returns: + dict: Browser information or None if no running browser is configured + """ + return self.get_builtin_browser_info( + debugging_port=self.config.debugging_port, + config_file=self.builtin_config_file, + logger=self.logger + ) + async def launch_builtin_browser(self, browser_type: str = "chromium", @@ -873,18 +970,27 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): str: CDP URL for the browser, or None if launch failed """ # Check if there's an existing browser still running - browser_info = self.get_builtin_browser_info() - if browser_info and self._is_browser_running(browser_info.get('pid')): + browser_info = self.get_builtin_browser_info( + debugging_port=debugging_port, + config_file=self.builtin_config_file, + logger=self.logger + ) + if browser_info: if self.logger: - self.logger.info("Built-in browser is already running", tag="BUILTIN") + self.logger.info(f"Built-in browser is already running on port {debugging_port}", tag="BUILTIN") return browser_info.get('cdp_url') # Create a user data directory for the built-in browser user_data_dir = os.path.join(self.builtin_browser_dir, "user_data") + # Raise error if user data dir is already engaged + if self._check_user_dir_is_engaged(user_data_dir): + raise Exception(f"User data directory {user_data_dir} is already engaged by another browser instance.") + + # Create the user data directory if it doesn't exist os.makedirs(user_data_dir, exist_ok=True) # Prepare browser launch arguments - browser_path = get_browser_executable(browser_type) + browser_path = await get_browser_executable(browser_type) if browser_type == "chromium": args = [ browser_path, @@ -957,7 +1063,7 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): if self.logger: self.logger.warning(f"Could not verify browser: {str(e)}", tag="BUILTIN") - # Save browser info + # Create browser info browser_info = { 'pid': process.pid, 'cdp_url': cdp_url, @@ -968,8 +1074,31 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): 'config': config_json } + # Read existing config file if it exists + port_map = {} + if os.path.exists(self.builtin_config_file): + try: + with open(self.builtin_config_file, 'r') as f: + existing_data = json.load(f) + + # Check if it already uses port mapping + if isinstance(existing_data, dict) and "port_map" in existing_data: + port_map = existing_data["port_map"] + # Convert legacy format to port mapping + elif isinstance(existing_data, dict) and "debugging_port" in existing_data: + old_port = str(existing_data.get("debugging_port")) + if self._is_browser_running(existing_data.get("pid")): + port_map[old_port] = existing_data + except Exception as e: + if self.logger: + self.logger.warning(f"Could not read existing config: {str(e)}", tag="BUILTIN") + + # Add/update this browser in the port map + port_map[str(debugging_port)] = browser_info + + # Write updated config with open(self.builtin_config_file, 'w') as f: - json.dump(browser_info, f, indent=2) + json.dump({"port_map": port_map}, f, indent=2) # Detach from the browser process - don't keep any references # This is important to allow the Python script to exit while the browser continues running @@ -990,10 +1119,10 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): Returns: bool: True if the browser was killed, False otherwise """ - browser_info = self.get_builtin_browser_info() + browser_info = self.get_browser_info() if not browser_info: if self.logger: - self.logger.warning("No built-in browser found", tag="BUILTIN") + self.logger.warning(f"No built-in browser found on port {self.config.debugging_port}", tag="BUILTIN") return False pid = browser_info.get('pid') @@ -1007,16 +1136,29 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): os.kill(pid, signal.SIGTERM) # Wait for termination for _ in range(5): - if not self._is_browser_running(pid): + if not is_browser_running(pid): break await asyncio.sleep(0.5) else: # Force kill if still running os.kill(pid, signal.SIGKILL) - # Remove config file - if os.path.exists(self.builtin_config_file): - os.unlink(self.builtin_config_file) + # Update config file to remove this browser + with open(self.builtin_config_file, 'r') as f: + browser_info_dict = json.load(f) + # Remove this port from the dictionary + port_str = str(self.config.debugging_port) + if port_str in browser_info_dict.get("port_map", {}): + del browser_info_dict["port_map"][port_str] + with open(self.builtin_config_file, 'w') as f: + json.dump(browser_info_dict, f, indent=2) + # Remove user data directory if it exists + if os.path.exists(self.builtin_browser_dir): + shutil.rmtree(self.builtin_browser_dir) + # Clear the browser info cache + self.browser = None + self.temp_dir = None + self.shutting_down = True if self.logger: self.logger.success("Built-in browser terminated", tag="BUILTIN") @@ -1032,17 +1174,29 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy): Returns: dict: Status information with running, cdp_url, and info fields """ - browser_info = self.get_builtin_browser_info() + browser_info = self.get_browser_info() if not browser_info: return { 'running': False, 'cdp_url': None, - 'info': None + 'info': None, + 'port': self.config.debugging_port } return { 'running': True, 'cdp_url': browser_info.get('cdp_url'), - 'info': browser_info + 'info': browser_info, + 'port': self.config.debugging_port } + + # Override the close method to handle built-in browser cleanup + async def close(self): + """Close the built-in browser and clean up resources.""" + # Call parent class close method + await super().close() + + # Clean up built-in browser if we created it + if self.shutting_down: + await self.kill_builtin_browser() diff --git a/crawl4ai/browser/utils.py b/crawl4ai/browser/utils.py index 2dff0924..74d2ea12 100644 --- a/crawl4ai/browser/utils.py +++ b/crawl4ai/browser/utils.py @@ -8,14 +8,18 @@ and Playwright instance management. import asyncio import os import sys -import platform +import time import tempfile -from typing import Optional, Any +import subprocess +from typing import Optional from playwright.async_api import async_playwright -from ..async_logger import AsyncLogger from ..utils import get_chromium_path +from ..async_configs import BrowserConfig, CrawlerRunConfig + +from ..async_logger import AsyncLogger + _playwright_instance = None @@ -30,7 +34,7 @@ async def get_playwright(): _playwright_instance = await async_playwright().start() return _playwright_instance -def get_browser_executable(browser_type: str) -> str: +async def get_browser_executable(browser_type: str) -> str: """Get the path to browser executable, with platform-specific handling. Args: @@ -39,7 +43,7 @@ def get_browser_executable(browser_type: str) -> str: Returns: Path to browser executable """ - return get_chromium_path(browser_type) + return await get_chromium_path(browser_type) def create_temp_directory(prefix="browser-profile-") -> str: """Create a temporary directory for browser data. @@ -75,6 +79,31 @@ def is_linux() -> bool: True if Linux, False otherwise """ return not (is_windows() or is_macos()) + +def is_browser_running(pid: Optional[int]) -> bool: + """Check if a process with the given PID is running. + + Args: + pid: Process ID to check + + Returns: + bool: True if the process is running, False otherwise + """ + if not pid: + return False + + try: + # Check if the process exists + if is_windows(): + process = subprocess.run(["tasklist", "/FI", f"PID eq {pid}"], + capture_output=True, text=True) + return str(pid) in process.stdout + else: + # Unix-like systems + os.kill(pid, 0) # This doesn't actually kill the process, just checks if it exists + return True + except (ProcessLookupError, PermissionError, OSError): + return False def get_browser_disable_options() -> list: """Get standard list of browser disable options for performance. @@ -103,3 +132,197 @@ def get_browser_disable_options() -> list: "--password-store=basic", "--use-mock-keychain", ] + + +async def find_optimal_browser_config(total_urls=50, verbose=True, rate_limit_delay=0.2): + """Find optimal browser configuration for crawling a specific number of URLs. + + Args: + total_urls: Number of URLs to crawl + verbose: Whether to print progress + rate_limit_delay: Delay between page loads to avoid rate limiting + + Returns: + dict: Contains fastest, lowest_memory, and optimal configurations + """ + from .manager import BrowserManager + if verbose: + print(f"\n=== Finding optimal configuration for crawling {total_urls} URLs ===\n") + + # Generate test URLs with timestamp to avoid caching + timestamp = int(time.time()) + urls = [f"https://example.com/page_{i}?t={timestamp}" for i in range(total_urls)] + + # Limit browser configurations to test (1 browser to max 10) + max_browsers = min(10, total_urls) + configs_to_test = [] + + # Generate configurations (browser count, pages distribution) + for num_browsers in range(1, max_browsers + 1): + base_pages = total_urls // num_browsers + remainder = total_urls % num_browsers + + # Create distribution array like [3, 3, 2, 2] (some browsers get one more page) + if remainder > 0: + distribution = [base_pages + 1] * remainder + [base_pages] * (num_browsers - remainder) + else: + distribution = [base_pages] * num_browsers + + configs_to_test.append((num_browsers, distribution)) + + results = [] + + # Test each configuration + for browser_count, page_distribution in configs_to_test: + if verbose: + print(f"Testing {browser_count} browsers with distribution {tuple(page_distribution)}") + + try: + # Track memory if possible + try: + import psutil + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) # MB + except ImportError: + if verbose: + print("Memory tracking not available (psutil not installed)") + start_memory = 0 + + # Start browsers in parallel + managers = [] + start_tasks = [] + start_time = time.time() + + logger = AsyncLogger(verbose=True, log_file=None) + + for i in range(browser_count): + config = BrowserConfig(headless=True) + manager = BrowserManager(browser_config=config, logger=logger) + start_tasks.append(manager.start()) + managers.append(manager) + + await asyncio.gather(*start_tasks) + + # Distribute URLs among browsers + urls_per_manager = {} + url_index = 0 + + for i, manager in enumerate(managers): + pages_for_this_browser = page_distribution[i] + end_index = url_index + pages_for_this_browser + urls_per_manager[manager] = urls[url_index:end_index] + url_index = end_index + + # Create pages for each browser + all_pages = [] + for manager, manager_urls in urls_per_manager.items(): + if not manager_urls: + continue + pages = await manager.get_pages(CrawlerRunConfig(), count=len(manager_urls)) + all_pages.extend(zip(pages, manager_urls)) + + # Crawl pages with delay to avoid rate limiting + async def crawl_page(page_ctx, url): + page, _ = page_ctx + try: + await page.goto(url) + if rate_limit_delay > 0: + await asyncio.sleep(rate_limit_delay) + title = await page.title() + return title + finally: + await page.close() + + crawl_start = time.time() + crawl_tasks = [crawl_page(page_ctx, url) for page_ctx, url in all_pages] + await asyncio.gather(*crawl_tasks) + crawl_time = time.time() - crawl_start + total_time = time.time() - start_time + + # Measure final memory usage + if start_memory > 0: + end_memory = process.memory_info().rss / (1024 * 1024) + memory_used = end_memory - start_memory + else: + memory_used = 0 + + # Close all browsers + for manager in managers: + await manager.close() + + # Calculate metrics + pages_per_second = total_urls / crawl_time + + # Calculate efficiency score (higher is better) + # This balances speed vs memory + if memory_used > 0: + efficiency = pages_per_second / (memory_used + 1) + else: + efficiency = pages_per_second + + # Store result + result = { + "browser_count": browser_count, + "distribution": tuple(page_distribution), + "crawl_time": crawl_time, + "total_time": total_time, + "memory_used": memory_used, + "pages_per_second": pages_per_second, + "efficiency": efficiency + } + + results.append(result) + + if verbose: + print(f" ✓ Crawled {total_urls} pages in {crawl_time:.2f}s ({pages_per_second:.1f} pages/sec)") + if memory_used > 0: + print(f" ✓ Memory used: {memory_used:.1f}MB ({memory_used/total_urls:.1f}MB per page)") + print(f" ✓ Efficiency score: {efficiency:.4f}") + + except Exception as e: + if verbose: + print(f" ✗ Error: {str(e)}") + + # Clean up + for manager in managers: + try: + await manager.close() + except: + pass + + # If no successful results, return None + if not results: + return None + + # Find best configurations + fastest = sorted(results, key=lambda x: x["crawl_time"])[0] + + # Only consider memory if available + memory_results = [r for r in results if r["memory_used"] > 0] + if memory_results: + lowest_memory = sorted(memory_results, key=lambda x: x["memory_used"])[0] + else: + lowest_memory = fastest + + # Find most efficient (balanced speed vs memory) + optimal = sorted(results, key=lambda x: x["efficiency"], reverse=True)[0] + + # Print summary + if verbose: + print("\n=== OPTIMAL CONFIGURATIONS ===") + print(f"⚡ Fastest: {fastest['browser_count']} browsers {fastest['distribution']}") + print(f" {fastest['crawl_time']:.2f}s, {fastest['pages_per_second']:.1f} pages/sec") + + print(f"💾 Memory-efficient: {lowest_memory['browser_count']} browsers {lowest_memory['distribution']}") + if lowest_memory["memory_used"] > 0: + print(f" {lowest_memory['memory_used']:.1f}MB, {lowest_memory['memory_used']/total_urls:.2f}MB per page") + + print(f"🌟 Balanced optimal: {optimal['browser_count']} browsers {optimal['distribution']}") + print(f" {optimal['crawl_time']:.2f}s, {optimal['pages_per_second']:.1f} pages/sec, score: {optimal['efficiency']:.4f}") + + return { + "fastest": fastest, + "lowest_memory": lowest_memory, + "optimal": optimal, + "all_configs": results + } diff --git a/tests/browser/test_browser_manager.py b/tests/browser/test_browser_manager.py index 2293b90d..d8f9376d 100644 --- a/tests/browser/test_browser_manager.py +++ b/tests/browser/test_browser_manager.py @@ -171,9 +171,9 @@ async def run_tests(): """Run all tests sequentially.""" results = [] - # results.append(await test_basic_browser_manager()) - # results.append(await test_custom_browser_config()) - # results.append(await test_multiple_pages()) + results.append(await test_basic_browser_manager()) + results.append(await test_custom_browser_config()) + results.append(await test_multiple_pages()) results.append(await test_session_management()) # Print summary diff --git a/tests/browser/test_builtin_browser.py b/tests/browser/test_builtin_browser.py index 9a273ef7..013da637 100644 --- a/tests/browser/test_builtin_browser.py +++ b/tests/browser/test_builtin_browser.py @@ -1,12 +1,12 @@ """ -Test script for browser_profiler and builtin browser functionality. +Test script for builtin browser functionality in the browser module. This script tests: 1. Creating a builtin browser 2. Getting browser information 3. Killing the browser 4. Restarting the browser -5. Testing crawling with different browser modes +5. Testing operations with different browser strategies 6. Testing edge cases """ @@ -14,13 +14,20 @@ import asyncio import os import sys import time -from colorama import Fore, init +from typing import List, Dict, Any +from colorama import Fore, Style, init # Add the project root to the path for imports -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) -from crawl4ai.browser_profiler import BrowserProfiler -from crawl4ai.async_webcrawler import AsyncWebCrawler +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from rich.text import Text +from rich.box import Box, SIMPLE + +from crawl4ai.browser import BrowserManager +from crawl4ai.browser.strategies import BuiltinBrowserStrategy from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig from crawl4ai.async_logger import AsyncLogger @@ -37,264 +44,765 @@ RESET = Fore.RESET # Create logger logger = AsyncLogger(verbose=True) -async def test_browser_profiler(): - """Test the BrowserProfiler class functionality""" - print(f"\n{INFO}========== Testing BrowserProfiler =========={RESET}") - - # Initialize browser profiler - profiler = BrowserProfiler(logger=logger) - - # Step 1: Check if builtin browser exists and kill it if it does - print(f"\n{INFO}1. Checking if builtin browser exists{RESET}") - browser_info = profiler.get_builtin_browser_info() - if browser_info: - print(f"{SUCCESS}Builtin browser found: {browser_info['cdp_url']}{RESET}") - # Kill it to start with a clean state - print(f"{INFO}Killing existing browser...{RESET}") - await profiler.kill_builtin_browser() - browser_info = profiler.get_builtin_browser_info() - if not browser_info: - print(f"{SUCCESS}Browser successfully killed{RESET}") - else: - print(f"{ERROR}Failed to kill browser{RESET}") + +async def test_builtin_browser_creation(): + """Test creating a builtin browser using the BrowserManager with BuiltinBrowserStrategy""" + print(f"\n{INFO}========== Testing Builtin Browser Creation =========={RESET}") + + # Step 1: Create a BrowserManager with builtin mode + print(f"\n{INFO}1. Creating BrowserManager with builtin mode{RESET}") + browser_config = BrowserConfig(browser_mode="builtin", headless=True, verbose=True) + manager = BrowserManager(browser_config=browser_config, logger=logger) + + # Step 2: Check if we have a BuiltinBrowserStrategy + print(f"\n{INFO}2. Checking if we have a BuiltinBrowserStrategy{RESET}") + if isinstance(manager._strategy, BuiltinBrowserStrategy): + print( + f"{SUCCESS}Correct strategy type: {manager._strategy.__class__.__name__}{RESET}" + ) else: - print(f"{WARNING}No builtin browser found{RESET}") - - # Step 2: Launch a new builtin browser - print(f"\n{INFO}2. Launching new builtin browser{RESET}") - cdp_url = await profiler.launch_builtin_browser(headless=True) - if cdp_url: - print(f"{SUCCESS}Builtin browser launched at: {cdp_url}{RESET}") - else: - print(f"{ERROR}Failed to launch builtin browser{RESET}") - return - - # Step 3: Get and display browser information - print(f"\n{INFO}3. Getting browser information{RESET}") - browser_info = profiler.get_builtin_browser_info() + print( + f"{ERROR}Wrong strategy type: {manager._strategy.__class__.__name__}{RESET}" + ) + return None + + # Step 3: Start the manager to launch or connect to builtin browser + print(f"\n{INFO}3. Starting the browser manager{RESET}") + try: + await manager.start() + print(f"{SUCCESS}Browser manager started successfully{RESET}") + except Exception as e: + print(f"{ERROR}Failed to start browser manager: {str(e)}{RESET}") + return None + + # Step 4: Get browser info from the strategy + print(f"\n{INFO}4. Getting browser information{RESET}") + browser_info = manager._strategy.get_builtin_browser_info() if browser_info: print(f"{SUCCESS}Browser info retrieved:{RESET}") for key, value in browser_info.items(): - if key != 'config': # Skip the verbose config section + if key != "config": # Skip the verbose config section print(f" {key}: {value}") + + cdp_url = browser_info.get("cdp_url") + print(f"{SUCCESS}CDP URL: {cdp_url}{RESET}") else: print(f"{ERROR}Failed to get browser information{RESET}") - - # Step 4: Get browser status - print(f"\n{INFO}4. Getting browser status{RESET}") - status = await profiler.get_builtin_browser_status() - print(f"Running: {status['running']}") - print(f"CDP URL: {status['cdp_url']}") - - # Pause to let the browser run for a moment - print(f"\n{INFO}Waiting for 2 seconds...{RESET}") - await asyncio.sleep(2) - - return cdp_url # Return the CDP URL for the crawling tests + cdp_url = None -async def test_crawling_with_builtin_browser(cdp_url): - """Test crawling with the builtin browser""" - print(f"\n{INFO}========== Testing Crawling with Builtin Browser =========={RESET}") - - # Step 1: Create a crawler with 'builtin' browser mode - print(f"\n{INFO}1. Creating crawler with 'builtin' browser mode{RESET}") - browser_config = BrowserConfig( - browser_mode="builtin", - headless=True + # Save manager for later tests + return manager, cdp_url + + +async def test_page_operations(manager: BrowserManager): + """Test page operations with the builtin browser""" + print( + f"\n{INFO}========== Testing Page Operations with Builtin Browser =========={RESET}" ) - crawler = AsyncWebCrawler(config=browser_config) - - # Step 2: Test crawling without explicitly starting (should auto-start) - print(f"\n{INFO}2. Testing auto-start with arun{RESET}") + + # Step 1: Get a single page + print(f"\n{INFO}1. Getting a single page{RESET}") try: - result = await crawler.arun("https://crawl4ai.com") - print(f"{SUCCESS}Auto-start crawling successful!{RESET}") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") + crawler_config = CrawlerRunConfig() + page, context = await manager.get_page(crawler_config) + print(f"{SUCCESS}Got page successfully{RESET}") + + # Navigate to a test URL + await page.goto("https://example.com") + title = await page.title() + print(f"{SUCCESS}Page title: {title}{RESET}") + + # Close the page + await page.close() + print(f"{SUCCESS}Page closed successfully{RESET}") except Exception as e: - print(f"{ERROR}Auto-start crawling failed: {str(e)}{RESET}") - - # Close the crawler - await crawler.close() - - # Step 3: Test with explicit start - print(f"\n{INFO}3. Testing with explicit start{RESET}") - crawler = AsyncWebCrawler(config=browser_config) + print(f"{ERROR}Page operation failed: {str(e)}{RESET}") + return False + + # Step 2: Get multiple pages + print(f"\n{INFO}2. Getting multiple pages with get_pages(){RESET}") try: - await crawler.start() - print(f"{SUCCESS}Explicit start successful!{RESET}") - result = await crawler.arun("https://example.com") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") - # Try second time, no start needed - print(f"{INFO}Testing second arun call without start{RESET}") - result = await crawler.arun("https://example.com") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") + # Request 3 pages + crawler_config = CrawlerRunConfig() + pages = await manager.get_pages(crawler_config, count=3) + print(f"{SUCCESS}Got {len(pages)} pages{RESET}") + + # Test each page + for i, (page, context) in enumerate(pages): + await page.goto(f"https://example.com?test={i}") + title = await page.title() + print(f"{SUCCESS}Page {i + 1} title: {title}{RESET}") + await page.close() + + print(f"{SUCCESS}All pages tested and closed successfully{RESET}") except Exception as e: - print(f"{ERROR}Explicit start crawling failed: {str(e)}{RESET}") - - # Close the crawler - await crawler.close() - - # Step 4: Test with context manager - print(f"\n{INFO}4. Testing with context manager{RESET}") - try: - async with AsyncWebCrawler(config=browser_config) as crawler: - result = await crawler.arun("https://httpbin.org/html") - print(f"{SUCCESS}Context manager crawling successful!{RESET}") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") - except Exception as e: - print(f"{ERROR}Context manager crawling failed: {str(e)}{RESET}") - + print(f"{ERROR}Multiple page operation failed: {str(e)}{RESET}") + return False + return True -async def test_crawling_without_builtin_browser(): - """Test crawling after killing the builtin browser""" - print(f"\n{INFO}========== Testing Crawling Without Builtin Browser =========={RESET}") - - # Step 1: Kill the builtin browser - print(f"\n{INFO}1. Killing the builtin browser{RESET}") - profiler = BrowserProfiler(logger=logger) - await profiler.kill_builtin_browser() - - # Step 2: Create a crawler with 'builtin' mode (should fall back to dedicated) - print(f"\n{INFO}2. Creating crawler with 'builtin' mode (should fall back){RESET}") - browser_config = BrowserConfig( - browser_mode="builtin", - headless=True - ) - + +async def test_browser_status_management(manager: BrowserManager): + """Test browser status and management operations""" + print(f"\n{INFO}========== Testing Browser Status and Management =========={RESET}") + + # Step 1: Get browser status + print(f"\n{INFO}1. Getting browser status{RESET}") try: - async with AsyncWebCrawler(config=browser_config) as crawler: - result = await crawler.arun("https://httpbin.org/get") - print(f"{SUCCESS}Fallback to dedicated browser successful!{RESET}") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") + status = await manager._strategy.get_builtin_browser_status() + print(f"{SUCCESS}Browser status:{RESET}") + print(f" Running: {status['running']}") + print(f" CDP URL: {status['cdp_url']}") except Exception as e: - print(f"{ERROR}Fallback crawler failed: {str(e)}{RESET}") - - # Step 3: Test with direct CDP URL - print(f"\n{INFO}3. Testing with direct CDP URL connection{RESET}") - - # Launch a standalone browser to get a CDP URL - print(f"{INFO}Launching standalone browser...{RESET}") - cdp_url = await profiler.launch_standalone_browser(headless=True) - if not cdp_url: - print(f"{ERROR}Failed to launch standalone browser{RESET}") - return - - print(f"{SUCCESS}Got CDP URL: {cdp_url}{RESET}") - - # Create a crawler with the CDP URL - browser_config = BrowserConfig( - browser_mode="dedicated", - cdp_url=cdp_url, - use_managed_browser=True, - headless=True - ) - + print(f"{ERROR}Failed to get browser status: {str(e)}{RESET}") + return False + + # Step 2: Test killing the browser + print(f"\n{INFO}2. Testing killing the browser{RESET}") try: - async with AsyncWebCrawler(config=browser_config) as crawler: - result = await crawler.arun("https://httpbin.org/ip") - print(f"{SUCCESS}Direct CDP URL crawling successful!{RESET}") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") + result = await manager._strategy.kill_builtin_browser() + if result: + print(f"{SUCCESS}Browser killed successfully{RESET}") + else: + print(f"{ERROR}Failed to kill browser{RESET}") except Exception as e: - print(f"{ERROR}Direct CDP URL crawling failed: {str(e)}{RESET}") - + print(f"{ERROR}Browser kill operation failed: {str(e)}{RESET}") + return False + + # Step 3: Check status after kill + print(f"\n{INFO}3. Checking status after kill{RESET}") + try: + status = await manager._strategy.get_builtin_browser_status() + if not status["running"]: + print(f"{SUCCESS}Browser is correctly reported as not running{RESET}") + else: + print(f"{ERROR}Browser is incorrectly reported as still running{RESET}") + except Exception as e: + print(f"{ERROR}Failed to get browser status: {str(e)}{RESET}") + return False + + # Step 4: Launch a new browser + print(f"\n{INFO}4. Launching a new browser{RESET}") + try: + cdp_url = await manager._strategy.launch_builtin_browser( + browser_type="chromium", headless=True + ) + if cdp_url: + print(f"{SUCCESS}New browser launched at: {cdp_url}{RESET}") + else: + print(f"{ERROR}Failed to launch new browser{RESET}") + return False + except Exception as e: + print(f"{ERROR}Browser launch failed: {str(e)}{RESET}") + return False + return True + +async def test_multiple_managers(): + """Test creating multiple BrowserManagers that use the same builtin browser""" + print(f"\n{INFO}========== Testing Multiple Browser Managers =========={RESET}") + + # Step 1: Create first manager + print(f"\n{INFO}1. Creating first browser manager{RESET}") + browser_config1 = (BrowserConfig(browser_mode="builtin", headless=True),) + manager1 = BrowserManager(browser_config=browser_config1, logger=logger) + + # Step 2: Create second manager + print(f"\n{INFO}2. Creating second browser manager{RESET}") + browser_config2 = BrowserConfig(browser_mode="builtin", headless=True) + manager2 = BrowserManager(browser_config=browser_config2, logger=logger) + + # Step 3: Start both managers (should connect to the same builtin browser) + print(f"\n{INFO}3. Starting both managers{RESET}") + try: + await manager1.start() + print(f"{SUCCESS}First manager started{RESET}") + + await manager2.start() + print(f"{SUCCESS}Second manager started{RESET}") + + # Check if they got the same CDP URL + cdp_url1 = manager1._strategy.config.cdp_url + cdp_url2 = manager2._strategy.config.cdp_url + + if cdp_url1 == cdp_url2: + print( + f"{SUCCESS}Both managers connected to the same browser: {cdp_url1}{RESET}" + ) + else: + print( + f"{WARNING}Managers connected to different browsers: {cdp_url1} and {cdp_url2}{RESET}" + ) + except Exception as e: + print(f"{ERROR}Failed to start managers: {str(e)}{RESET}") + return False + + # Step 4: Test using both managers + print(f"\n{INFO}4. Testing operations with both managers{RESET}") + try: + # First manager creates a page + page1, ctx1 = await manager1.get_page(CrawlerRunConfig()) + await page1.goto("https://example.com") + title1 = await page1.title() + print(f"{SUCCESS}Manager 1 page title: {title1}{RESET}") + + # Second manager creates a page + page2, ctx2 = await manager2.get_page(CrawlerRunConfig()) + await page2.goto("https://example.org") + title2 = await page2.title() + print(f"{SUCCESS}Manager 2 page title: {title2}{RESET}") + + # Clean up + await page1.close() + await page2.close() + except Exception as e: + print(f"{ERROR}Failed to use both managers: {str(e)}{RESET}") + return False + + # Step 5: Close both managers + print(f"\n{INFO}5. Closing both managers{RESET}") + try: + await manager1.close() + print(f"{SUCCESS}First manager closed{RESET}") + + await manager2.close() + print(f"{SUCCESS}Second manager closed{RESET}") + except Exception as e: + print(f"{ERROR}Failed to close managers: {str(e)}{RESET}") + return False + + return True + + async def test_edge_cases(): - """Test edge cases like multiple starts, killing browser during crawl, etc.""" + """Test edge cases like multiple starts, killing browser during operations, etc.""" print(f"\n{INFO}========== Testing Edge Cases =========={RESET}") - - # Step 1: Launch the builtin browser if it doesn't exist - print(f"\n{INFO}1. Ensuring builtin browser exists{RESET}") - profiler = BrowserProfiler(logger=logger) - browser_info = profiler.get_builtin_browser_info() - if not browser_info: - cdp_url = await profiler.launch_builtin_browser(headless=True) - if cdp_url: - print(f"{SUCCESS}Builtin browser launched at: {cdp_url}{RESET}") - else: - print(f"{ERROR}Failed to launch builtin browser{RESET}") - return - else: - print(f"{SUCCESS}Using existing builtin browser: {browser_info['cdp_url']}{RESET}") - - # Step 2: Test multiple starts with the same crawler - print(f"\n{INFO}2. Testing multiple starts with the same crawler{RESET}") + + # Step 1: Test multiple starts with the same manager + print(f"\n{INFO}1. Testing multiple starts with the same manager{RESET}") browser_config = BrowserConfig(browser_mode="builtin", headless=True) - crawler = AsyncWebCrawler(config=browser_config) - - await crawler.start() - print(f"{SUCCESS}First start successful!{RESET}") - + manager = BrowserManager(browser_config=browser_config, logger=logger) + try: - await crawler.start() - print(f"{SUCCESS}Second start didn't cause errors!{RESET}") + await manager.start() + print(f"{SUCCESS}First start successful{RESET}") + + # Try to start again + await manager.start() + print(f"{SUCCESS}Second start completed without errors{RESET}") + + # Test if it's still functional + page, context = await manager.get_page(CrawlerRunConfig()) + await page.goto("https://example.com") + title = await page.title() + print( + f"{SUCCESS}Page operations work after multiple starts. Title: {title}{RESET}" + ) + await page.close() except Exception as e: - print(f"{ERROR}Second start failed: {str(e)}{RESET}") - - # Run a crawl to verify functionality + print(f"{ERROR}Multiple starts test failed: {str(e)}{RESET}") + return False + finally: + await manager.close() + + # Step 2: Test killing the browser while manager is active + print(f"\n{INFO}2. Testing killing the browser while manager is active{RESET}") + manager = BrowserManager(browser_config=browser_config, logger=logger) + try: - result = await crawler.arun("https://httpbin.org/user-agent") - print(f"{SUCCESS}Crawling after multiple starts successful!{RESET}") - print(f" Got {len(result.markdown.raw_markdown)} chars of markdown content") + await manager.start() + print(f"{SUCCESS}Manager started{RESET}") + + # Kill the browser directly + print(f"{INFO}Killing the browser...{RESET}") + await manager._strategy.kill_builtin_browser() + print(f"{SUCCESS}Browser killed{RESET}") + + # Try to get a page (should fail or launch a new browser) + try: + page, context = await manager.get_page(CrawlerRunConfig()) + print( + f"{WARNING}Page request succeeded despite killed browser (might have auto-restarted){RESET}" + ) + title = await page.title() + print(f"{SUCCESS}Got page title: {title}{RESET}") + await page.close() + except Exception as e: + print( + f"{SUCCESS}Page request failed as expected after browser was killed: {str(e)}{RESET}" + ) except Exception as e: - print(f"{ERROR}Crawling after multiple starts failed: {str(e)}{RESET}") - - await crawler.close() - - # Step 3: Test killing browser while crawler is active - print(f"\n{INFO}3. Testing killing browser while crawler is active{RESET}") - - # Create and start a crawler - browser_config = BrowserConfig(browser_mode="builtin", headless=True) - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - # Kill the browser - print(f"{INFO}Killing the browser...{RESET}") - await profiler.kill_builtin_browser() - - # Try to crawl (should fail) - try: - result = await crawler.arun("https://httpbin.org/get") - print(f"{WARNING}Crawling succeeded despite killed browser!{RESET}") - except Exception as e: - print(f"{SUCCESS}Crawling failed as expected: {str(e)}{RESET}") - - await crawler.close() - + print(f"{ERROR}Kill during operation test failed: {str(e)}{RESET}") + return False + finally: + await manager.close() + return True + +async def cleanup_browsers(): + """Clean up any remaining builtin browsers""" + print(f"\n{INFO}========== Cleaning Up Builtin Browsers =========={RESET}") + + browser_config = BrowserConfig(browser_mode="builtin", headless=True) + manager = BrowserManager(browser_config=browser_config, logger=logger) + + try: + # No need to start, just access the strategy directly + strategy = manager._strategy + if isinstance(strategy, BuiltinBrowserStrategy): + result = await strategy.kill_builtin_browser() + if result: + print(f"{SUCCESS}Successfully killed all builtin browsers{RESET}") + else: + print(f"{WARNING}No builtin browsers found to kill{RESET}") + else: + print(f"{ERROR}Wrong strategy type: {strategy.__class__.__name__}{RESET}") + except Exception as e: + print(f"{ERROR}Cleanup failed: {str(e)}{RESET}") + finally: + # Just to be safe + try: + await manager.close() + except: + pass + + +async def test_performance_scaling(): + """Test performance with multiple browsers and pages. + + This test creates multiple browsers on different ports, + spawns multiple pages per browser, and measures performance metrics. + """ + print(f"\n{INFO}========== Testing Performance Scaling =========={RESET}") + + # Configuration parameters + num_browsers = 10 + pages_per_browser = 10 + total_pages = num_browsers * pages_per_browser + base_port = 9222 + + # Set up a measuring mechanism for memory + import psutil + import gc + + # Force garbage collection before starting + gc.collect() + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 # in MB + peak_memory = initial_memory + + # Report initial configuration + print( + f"{INFO}Test configuration: {num_browsers} browsers × {pages_per_browser} pages = {total_pages} total crawls{RESET}" + ) + + # List to track managers + managers: List[BrowserManager] = [] + all_pages = [] + + + + # Get crawl4ai home directory + crawl4ai_home = os.path.expanduser("~/.crawl4ai") + temp_dir = os.path.join(crawl4ai_home, "temp") + os.makedirs(temp_dir, exist_ok=True) + + # Create all managers but don't start them yet + manager_configs = [] + for i in range(num_browsers): + port = base_port + i + browser_config = BrowserConfig( + browser_mode="builtin", + headless=True, + debugging_port=port, + user_data_dir=os.path.join(temp_dir, f"browser_profile_{i}"), + ) + manager = BrowserManager(browser_config=browser_config, logger=logger) + manager._strategy.shutting_down = True + manager_configs.append((manager, i, port)) + + # Define async function to start a single manager + async def start_manager(manager, index, port): + try: + await manager.start() + return manager + except Exception as e: + print( + f"{ERROR}Failed to start browser {index + 1} on port {port}: {str(e)}{RESET}" + ) + return None + + # Start all managers in parallel + start_tasks = [ + start_manager(manager, i, port) for manager, i, port in manager_configs + ] + started_managers = await asyncio.gather(*start_tasks) + + # Filter out None values (failed starts) and add to managers list + managers = [m for m in started_managers if m is not None] + + if len(managers) == 0: + print(f"{ERROR}All browser managers failed to start. Aborting test.{RESET}") + return False + + if len(managers) < num_browsers: + print( + f"{WARNING}Only {len(managers)} out of {num_browsers} browser managers started successfully{RESET}" + ) + + # Create pages for each browser + for i, manager in enumerate(managers): + try: + pages = await manager.get_pages(CrawlerRunConfig(), count=pages_per_browser) + all_pages.extend(pages) + except Exception as e: + print(f"{ERROR}Failed to create pages for browser {i + 1}: {str(e)}{RESET}") + + # Check memory after page creation + gc.collect() + current_memory = process.memory_info().rss / 1024 / 1024 + peak_memory = max(peak_memory, current_memory) + + # Ask for confirmation before loading + confirmation = input( + f"{WARNING}Do you want to proceed with loading pages? (y/n): {RESET}" + ) + # Step 1: Create and start multiple browser managers in parallel + start_time = time.time() + + if confirmation.lower() == "y": + load_start_time = time.time() + + # Function to load a single page + async def load_page(page_ctx, index): + page, _ = page_ctx + try: + await page.goto(f"https://example.com/page{index}", timeout=30000) + title = await page.title() + return title + except Exception as e: + return f"Error: {str(e)}" + + # Load all pages concurrently + load_tasks = [load_page(page_ctx, i) for i, page_ctx in enumerate(all_pages)] + load_results = await asyncio.gather(*load_tasks, return_exceptions=True) + + # Count successes and failures + successes = sum( + 1 for r in load_results if isinstance(r, str) and not r.startswith("Error") + ) + failures = len(load_results) - successes + + load_time = time.time() - load_start_time + total_test_time = time.time() - start_time + + # Check memory after loading (peak memory) + gc.collect() + current_memory = process.memory_info().rss / 1024 / 1024 + peak_memory = max(peak_memory, current_memory) + + # Calculate key metrics + memory_per_page = peak_memory / successes if successes > 0 else 0 + time_per_crawl = total_test_time / successes if successes > 0 else 0 + crawls_per_second = successes / total_test_time if total_test_time > 0 else 0 + crawls_per_minute = crawls_per_second * 60 + crawls_per_hour = crawls_per_minute * 60 + + # Print simplified performance summary + from rich.console import Console + from rich.table import Table + + console = Console() + + # Create a simple summary table + table = Table(title="CRAWL4AI PERFORMANCE SUMMARY") + + table.add_column("Metric", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Total Crawls Completed", f"{successes}") + table.add_row("Total Time", f"{total_test_time:.2f} seconds") + table.add_row("Time Per Crawl", f"{time_per_crawl:.2f} seconds") + table.add_row("Crawling Speed", f"{crawls_per_second:.2f} crawls/second") + table.add_row("Projected Rate (1 minute)", f"{crawls_per_minute:.0f} crawls") + table.add_row("Projected Rate (1 hour)", f"{crawls_per_hour:.0f} crawls") + table.add_row("Peak Memory Usage", f"{peak_memory:.2f} MB") + table.add_row("Memory Per Crawl", f"{memory_per_page:.2f} MB") + + # Display the table + console.print(table) + + # Ask confirmation before cleanup + confirmation = input( + f"{WARNING}Do you want to proceed with cleanup? (y/n): {RESET}" + ) + if confirmation.lower() != "y": + print(f"{WARNING}Cleanup aborted by user{RESET}") + return False + + # Close all pages + for page, _ in all_pages: + try: + await page.close() + except: + pass + + # Close all managers + for manager in managers: + try: + await manager.close() + except: + pass + + # Remove the temp directory + import shutil + + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + return True + + +async def test_performance_scaling_lab( num_browsers: int = 10, pages_per_browser: int = 10): + """Test performance with multiple browsers and pages. + + This test creates multiple browsers on different ports, + spawns multiple pages per browser, and measures performance metrics. + """ + print(f"\n{INFO}========== Testing Performance Scaling =========={RESET}") + + # Configuration parameters + num_browsers = num_browsers + pages_per_browser = pages_per_browser + total_pages = num_browsers * pages_per_browser + base_port = 9222 + + # Set up a measuring mechanism for memory + import psutil + import gc + + # Force garbage collection before starting + gc.collect() + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 # in MB + peak_memory = initial_memory + + # Report initial configuration + print( + f"{INFO}Test configuration: {num_browsers} browsers × {pages_per_browser} pages = {total_pages} total crawls{RESET}" + ) + + # List to track managers + managers: List[BrowserManager] = [] + all_pages = [] + + # Get crawl4ai home directory + crawl4ai_home = os.path.expanduser("~/.crawl4ai") + temp_dir = os.path.join(crawl4ai_home, "temp") + os.makedirs(temp_dir, exist_ok=True) + + # Create all managers but don't start them yet + manager_configs = [] + for i in range(num_browsers): + port = base_port + i + browser_config = BrowserConfig( + browser_mode="builtin", + headless=True, + debugging_port=port, + user_data_dir=os.path.join(temp_dir, f"browser_profile_{i}"), + ) + manager = BrowserManager(browser_config=browser_config, logger=logger) + manager._strategy.shutting_down = True + manager_configs.append((manager, i, port)) + + # Define async function to start a single manager + async def start_manager(manager, index, port): + try: + await manager.start() + return manager + except Exception as e: + print( + f"{ERROR}Failed to start browser {index + 1} on port {port}: {str(e)}{RESET}" + ) + return None + + # Start all managers in parallel + start_tasks = [ + start_manager(manager, i, port) for manager, i, port in manager_configs + ] + started_managers = await asyncio.gather(*start_tasks) + + # Filter out None values (failed starts) and add to managers list + managers = [m for m in started_managers if m is not None] + + if len(managers) == 0: + print(f"{ERROR}All browser managers failed to start. Aborting test.{RESET}") + return False + + if len(managers) < num_browsers: + print( + f"{WARNING}Only {len(managers)} out of {num_browsers} browser managers started successfully{RESET}" + ) + + # Create pages for each browser + for i, manager in enumerate(managers): + try: + pages = await manager.get_pages(CrawlerRunConfig(), count=pages_per_browser) + all_pages.extend(pages) + except Exception as e: + print(f"{ERROR}Failed to create pages for browser {i + 1}: {str(e)}{RESET}") + + # Check memory after page creation + gc.collect() + current_memory = process.memory_info().rss / 1024 / 1024 + peak_memory = max(peak_memory, current_memory) + + # Ask for confirmation before loading + confirmation = input( + f"{WARNING}Do you want to proceed with loading pages? (y/n): {RESET}" + ) + # Step 1: Create and start multiple browser managers in parallel + start_time = time.time() + + if confirmation.lower() == "y": + load_start_time = time.time() + + # Function to load a single page + async def load_page(page_ctx, index): + page, _ = page_ctx + try: + await page.goto(f"https://example.com/page{index}", timeout=30000) + title = await page.title() + return title + except Exception as e: + return f"Error: {str(e)}" + + # Load all pages concurrently + load_tasks = [load_page(page_ctx, i) for i, page_ctx in enumerate(all_pages)] + load_results = await asyncio.gather(*load_tasks, return_exceptions=True) + + # Count successes and failures + successes = sum( + 1 for r in load_results if isinstance(r, str) and not r.startswith("Error") + ) + failures = len(load_results) - successes + + load_time = time.time() - load_start_time + total_test_time = time.time() - start_time + + # Check memory after loading (peak memory) + gc.collect() + current_memory = process.memory_info().rss / 1024 / 1024 + peak_memory = max(peak_memory, current_memory) + + # Calculate key metrics + memory_per_page = peak_memory / successes if successes > 0 else 0 + time_per_crawl = total_test_time / successes if successes > 0 else 0 + crawls_per_second = successes / total_test_time if total_test_time > 0 else 0 + crawls_per_minute = crawls_per_second * 60 + crawls_per_hour = crawls_per_minute * 60 + + # Print simplified performance summary + from rich.console import Console + from rich.table import Table + + console = Console() + + # Create a simple summary table + table = Table(title="CRAWL4AI PERFORMANCE SUMMARY") + + table.add_column("Metric", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Total Crawls Completed", f"{successes}") + table.add_row("Total Time", f"{total_test_time:.2f} seconds") + table.add_row("Time Per Crawl", f"{time_per_crawl:.2f} seconds") + table.add_row("Crawling Speed", f"{crawls_per_second:.2f} crawls/second") + table.add_row("Projected Rate (1 minute)", f"{crawls_per_minute:.0f} crawls") + table.add_row("Projected Rate (1 hour)", f"{crawls_per_hour:.0f} crawls") + table.add_row("Peak Memory Usage", f"{peak_memory:.2f} MB") + table.add_row("Memory Per Crawl", f"{memory_per_page:.2f} MB") + + # Display the table + console.print(table) + + # Ask confirmation before cleanup + confirmation = input( + f"{WARNING}Do you want to proceed with cleanup? (y/n): {RESET}" + ) + if confirmation.lower() != "y": + print(f"{WARNING}Cleanup aborted by user{RESET}") + return False + + # Close all pages + for page, _ in all_pages: + try: + await page.close() + except: + pass + + # Close all managers + for manager in managers: + try: + await manager.close() + except: + pass + + # Remove the temp directory + import shutil + + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + return True + + + async def main(): """Run all tests""" try: - print(f"{INFO}Starting browser_profiler and builtin browser tests{RESET}") - - # Run browser profiler tests - cdp_url = await test_browser_profiler() - - # Run crawling tests with builtin browser - if cdp_url: - await test_crawling_with_builtin_browser(cdp_url) - - # Run tests without builtin browser - # await test_crawling_without_builtin_browser() - - # Run edge case tests + print(f"{INFO}Starting builtin browser tests with browser module{RESET}") + + # # Run browser creation test + # manager, cdp_url = await test_builtin_browser_creation() + # if not manager: + # print(f"{ERROR}Browser creation failed, cannot continue tests{RESET}") + # return + + # # Run page operations test + # await test_page_operations(manager) + + # # Run browser status and management test + # await test_browser_status_management(manager) + + # # Close manager before multiple manager test + # await manager.close() + + # Run multiple managers test + # await test_multiple_managers() + + # Run performance scaling test + await test_performance_scaling() + # Run cleanup test + # await cleanup_browsers() + + # Run edge cases test # await test_edge_cases() - + print(f"\n{SUCCESS}All tests completed!{RESET}") - + except Exception as e: print(f"\n{ERROR}Test failed with error: {str(e)}{RESET}") import traceback + traceback.print_exc() finally: - # Clean up: kill any remaining builtin browser - print(f"\n{INFO}Cleaning up: killing any remaining builtin browser{RESET}") - profiler = BrowserProfiler(logger=logger) - await profiler.kill_builtin_browser() + # Clean up: kill any remaining builtin browsers + await cleanup_browsers() print(f"{SUCCESS}Test cleanup complete{RESET}") + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/tests/browser/test_parallel_crawling.py b/tests/browser/test_parallel_crawling.py new file mode 100644 index 00000000..9e72f06e --- /dev/null +++ b/tests/browser/test_parallel_crawling.py @@ -0,0 +1,902 @@ +""" +Test examples for parallel crawling with the browser module. + +These examples demonstrate the functionality of parallel page creation +and serve as functional tests for multi-page crawling performance. +""" + +import asyncio +import os +import sys +import time +from typing import List + +# Add the project root to Python path if running directly +if __name__ == "__main__": + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) + +from crawl4ai.browser import BrowserManager +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger + +# Create a logger for clear terminal output +logger = AsyncLogger(verbose=True, log_file=None) + +async def test_get_pages_basic(): + """Test basic functionality of get_pages method.""" + logger.info("Testing basic get_pages functionality", tag="TEST") + + browser_config = BrowserConfig(headless=True) + manager = BrowserManager(browser_config=browser_config, logger=logger) + + try: + await manager.start() + + # Request 3 pages + crawler_config = CrawlerRunConfig() + pages = await manager.get_pages(crawler_config, count=3) + + # Verify we got the correct number of pages + assert len(pages) == 3, f"Expected 3 pages, got {len(pages)}" + + # Verify each page is valid + for i, (page, context) in enumerate(pages): + await page.goto("https://example.com") + title = await page.title() + logger.info(f"Page {i+1} title: {title}", tag="TEST") + assert title, f"Page {i+1} has no title" + + await manager.close() + logger.success("Basic get_pages test completed successfully", tag="TEST") + return True + except Exception as e: + logger.error(f"Test failed: {str(e)}", tag="TEST") + try: + await manager.close() + except: + pass + return False + +async def test_parallel_approaches_comparison(): + """Compare two parallel crawling approaches: + 1. Create a page for each URL on-demand (get_page + gather) + 2. Get all pages upfront with get_pages, then use them (get_pages + gather) + """ + logger.info("Comparing different parallel crawling approaches", tag="TEST") + + urls = [ + "https://example.com/page1", + "https://crawl4ai.com", + "https://kidocode.com", + "https://bbc.com", + # "https://example.com/page1", + # "https://example.com/page2", + # "https://example.com/page3", + # "https://example.com/page4", + ] + + browser_config = BrowserConfig(headless=False) + manager = BrowserManager(browser_config=browser_config, logger=logger) + + try: + await manager.start() + + # Approach 1: Create a page for each URL on-demand and run in parallel + logger.info("Testing approach 1: get_page for each URL + gather", tag="TEST") + start_time = time.time() + + async def fetch_title_approach1(url): + """Create a new page for each URL, go to the URL, and get title""" + crawler_config = CrawlerRunConfig(url=url) + page, context = await manager.get_page(crawler_config) + try: + await page.goto(url) + title = await page.title() + return title + finally: + await page.close() + + # Run fetch_title_approach1 for each URL in parallel + tasks = [fetch_title_approach1(url) for url in urls] + approach1_results = await asyncio.gather(*tasks) + + approach1_time = time.time() - start_time + logger.info(f"Approach 1 time (get_page + gather): {approach1_time:.2f}s", tag="TEST") + + # Approach 2: Get all pages upfront with get_pages, then use them in parallel + logger.info("Testing approach 2: get_pages upfront + gather", tag="TEST") + start_time = time.time() + + # Get all pages upfront + crawler_config = CrawlerRunConfig() + pages = await manager.get_pages(crawler_config, count=len(urls)) + + async def fetch_title_approach2(page_ctx, url): + """Use a pre-created page to go to URL and get title""" + page, _ = page_ctx + try: + await page.goto(url) + title = await page.title() + return title + finally: + await page.close() + + # Use the pre-created pages to fetch titles in parallel + tasks = [fetch_title_approach2(page_ctx, url) for page_ctx, url in zip(pages, urls)] + approach2_results = await asyncio.gather(*tasks) + + approach2_time = time.time() - start_time + logger.info(f"Approach 2 time (get_pages + gather): {approach2_time:.2f}s", tag="TEST") + + # Compare results and performance + speedup = approach1_time / approach2_time if approach2_time > 0 else 0 + if speedup > 1: + logger.success(f"Approach 2 (get_pages upfront) was {speedup:.2f}x faster", tag="TEST") + else: + logger.info(f"Approach 1 (get_page + gather) was {1/speedup:.2f}x faster", tag="TEST") + + # Verify same content was retrieved in both approaches + assert len(approach1_results) == len(approach2_results), "Result count mismatch" + + # Sort results for comparison since parallel execution might complete in different order + assert sorted(approach1_results) == sorted(approach2_results), "Results content mismatch" + + await manager.close() + return True + + except Exception as e: + logger.error(f"Test failed: {str(e)}", tag="TEST") + try: + await manager.close() + except: + pass + return False + +async def test_multi_browser_scaling(num_browsers=3, pages_per_browser=5): + """Test performance with multiple browsers and pages per browser. + Compares two approaches: + 1. On-demand page creation (get_page + gather) + 2. Pre-created pages (get_pages + gather) + """ + logger.info(f"Testing multi-browser scaling with {num_browsers} browsers × {pages_per_browser} pages", tag="TEST") + + # Generate test URLs + total_pages = num_browsers * pages_per_browser + urls = [f"https://example.com/page_{i}" for i in range(total_pages)] + + # Create browser managers + managers = [] + base_port = 9222 + + try: + # Start all browsers in parallel + start_tasks = [] + for i in range(num_browsers): + browser_config = BrowserConfig( + headless=True # Using default browser mode like in test_parallel_approaches_comparison + ) + manager = BrowserManager(browser_config=browser_config, logger=logger) + start_tasks.append(manager.start()) + managers.append(manager) + + await asyncio.gather(*start_tasks) + + # Distribute URLs among managers + urls_per_manager = {} + for i, manager in enumerate(managers): + start_idx = i * pages_per_browser + end_idx = min(start_idx + pages_per_browser, len(urls)) + urls_per_manager[manager] = urls[start_idx:end_idx] + + # Approach 1: Create a page for each URL on-demand and run in parallel + logger.info("Testing approach 1: get_page for each URL + gather", tag="TEST") + start_time = time.time() + + async def fetch_title_approach1(manager, url): + """Create a new page for the URL, go to the URL, and get title""" + crawler_config = CrawlerRunConfig(url=url) + page, context = await manager.get_page(crawler_config) + try: + await page.goto(url) + title = await page.title() + return title + finally: + await page.close() + + # Run fetch_title_approach1 for each URL in parallel + tasks = [] + for manager, manager_urls in urls_per_manager.items(): + for url in manager_urls: + tasks.append(fetch_title_approach1(manager, url)) + + approach1_results = await asyncio.gather(*tasks) + + approach1_time = time.time() - start_time + logger.info(f"Approach 1 time (get_page + gather): {approach1_time:.2f}s", tag="TEST") + + # Approach 2: Get all pages upfront with get_pages, then use them in parallel + logger.info("Testing approach 2: get_pages upfront + gather", tag="TEST") + start_time = time.time() + + # Get all pages upfront for each manager + all_pages = [] + for manager, manager_urls in urls_per_manager.items(): + crawler_config = CrawlerRunConfig() + pages = await manager.get_pages(crawler_config, count=len(manager_urls)) + all_pages.extend(zip(pages, manager_urls)) + + async def fetch_title_approach2(page_ctx, url): + """Use a pre-created page to go to URL and get title""" + page, _ = page_ctx + try: + await page.goto(url) + title = await page.title() + return title + finally: + await page.close() + + # Use the pre-created pages to fetch titles in parallel + tasks = [fetch_title_approach2(page_ctx, url) for page_ctx, url in all_pages] + approach2_results = await asyncio.gather(*tasks) + + approach2_time = time.time() - start_time + logger.info(f"Approach 2 time (get_pages + gather): {approach2_time:.2f}s", tag="TEST") + + # Compare results and performance + speedup = approach1_time / approach2_time if approach2_time > 0 else 0 + pages_per_second = total_pages / approach2_time + + # Show a simple summary + logger.info(f"📊 Summary: {num_browsers} browsers × {pages_per_browser} pages = {total_pages} total crawls", tag="TEST") + logger.info(f"⚡ Performance: {pages_per_second:.1f} pages/second ({pages_per_second*60:.0f} pages/minute)", tag="TEST") + logger.info(f"🚀 Total crawl time: {approach2_time:.2f} seconds", tag="TEST") + + if speedup > 1: + logger.success(f"✅ Approach 2 (get_pages upfront) was {speedup:.2f}x faster", tag="TEST") + else: + logger.info(f"✅ Approach 1 (get_page + gather) was {1/speedup:.2f}x faster", tag="TEST") + + # Close all managers + for manager in managers: + await manager.close() + + return True + + except Exception as e: + logger.error(f"Test failed: {str(e)}", tag="TEST") + # Clean up + for manager in managers: + try: + await manager.close() + except: + pass + return False + +async def grid_search_optimal_configuration(total_urls=50): + """Perform a grid search to find the optimal balance between number of browsers and pages per browser. + + This function tests different combinations of browser count and pages per browser, + while keeping the total number of URLs constant. It measures performance metrics + for each configuration to find the "sweet spot" that provides the best speed + with reasonable memory usage. + + Args: + total_urls: Total number of URLs to crawl (default: 50) + """ + logger.info(f"=== GRID SEARCH FOR OPTIMAL CRAWLING CONFIGURATION ({total_urls} URLs) ===", tag="TEST") + + # Generate test URLs once + urls = [f"https://example.com/page_{i}" for i in range(total_urls)] + + # Define grid search configurations + # We'll use more flexible approach: test all browser counts from 1 to min(20, total_urls) + # and distribute pages evenly (some browsers may have 1 more page than others) + configurations = [] + + # Maximum number of browsers to test + max_browsers_to_test = min(20, total_urls) + + # Try configurations with 1 to max_browsers_to_test browsers + for num_browsers in range(1, max_browsers_to_test + 1): + base_pages_per_browser = total_urls // num_browsers + remainder = total_urls % num_browsers + + # Generate exact page distribution array + if remainder > 0: + # First 'remainder' browsers get one more page + page_distribution = [base_pages_per_browser + 1] * remainder + [base_pages_per_browser] * (num_browsers - remainder) + pages_distribution = f"{base_pages_per_browser+1} pages × {remainder} browsers, {base_pages_per_browser} pages × {num_browsers - remainder} browsers" + else: + # All browsers get the same number of pages + page_distribution = [base_pages_per_browser] * num_browsers + pages_distribution = f"{base_pages_per_browser} pages × {num_browsers} browsers" + + # Format the distribution as a tuple string like (4, 4, 3, 3) + distribution_str = str(tuple(page_distribution)) + + configurations.append((num_browsers, base_pages_per_browser, pages_distribution, page_distribution, distribution_str)) + + # Track results + results = [] + + # Test each configuration + for num_browsers, pages_per_browser, pages_distribution, page_distribution, distribution_str in configurations: + logger.info("-" * 80, tag="TEST") + logger.info(f"Testing configuration: {num_browsers} browsers with distribution: {distribution_str}", tag="TEST") + logger.info(f"Details: {pages_distribution}", tag="TEST") + # Sleep a bit for randomness + await asyncio.sleep(0.5) + + try: + # Import psutil for memory tracking + try: + import psutil + process = psutil.Process() + initial_memory = process.memory_info().rss / (1024 * 1024) # MB + except ImportError: + logger.warning("psutil not available, memory metrics will not be tracked", tag="TEST") + initial_memory = 0 + + # Create and start browser managers + managers = [] + start_time = time.time() + + # Start all browsers in parallel + start_tasks = [] + for i in range(num_browsers): + browser_config = BrowserConfig( + headless=True + ) + manager = BrowserManager(browser_config=browser_config, logger=logger) + start_tasks.append(manager.start()) + managers.append(manager) + + await asyncio.gather(*start_tasks) + browser_startup_time = time.time() - start_time + + # Measure memory after browser startup + if initial_memory > 0: + browser_memory = process.memory_info().rss / (1024 * 1024) - initial_memory + else: + browser_memory = 0 + + # Distribute URLs among managers using the exact page distribution + urls_per_manager = {} + total_assigned = 0 + + for i, manager in enumerate(managers): + if i < len(page_distribution): + # Get the exact number of pages for this browser from our distribution + manager_pages = page_distribution[i] + + # Get the URL slice for this manager + start_idx = total_assigned + end_idx = start_idx + manager_pages + urls_per_manager[manager] = urls[start_idx:end_idx] + total_assigned += manager_pages + else: + # If we have more managers than our distribution (should never happen) + urls_per_manager[manager] = [] + + # Use the more efficient approach (pre-created pages) + logger.info("Running page crawling test...", tag="TEST") + crawl_start_time = time.time() + + # Get all pages upfront for each manager + all_pages = [] + for manager, manager_urls in urls_per_manager.items(): + if not manager_urls: # Skip managers with no URLs + continue + crawler_config = CrawlerRunConfig() + pages = await manager.get_pages(crawler_config, count=len(manager_urls)) + all_pages.extend(zip(pages, manager_urls)) + + # Measure memory after page creation + if initial_memory > 0: + pages_memory = process.memory_info().rss / (1024 * 1024) - browser_memory - initial_memory + else: + pages_memory = 0 + + # Function to crawl a URL with a pre-created page + async def fetch_title(page_ctx, url): + page, _ = page_ctx + try: + await page.goto(url) + title = await page.title() + return title + finally: + await page.close() + + # Use the pre-created pages to fetch titles in parallel + tasks = [fetch_title(page_ctx, url) for page_ctx, url in all_pages] + crawl_results = await asyncio.gather(*tasks) + + crawl_time = time.time() - crawl_start_time + total_time = time.time() - start_time + + # Final memory measurement + if initial_memory > 0: + peak_memory = max(browser_memory + pages_memory, process.memory_info().rss / (1024 * 1024) - initial_memory) + else: + peak_memory = 0 + + # Close all managers + for manager in managers: + await manager.close() + + # Calculate metrics + pages_per_second = total_urls / crawl_time + + # Store result metrics + result = { + "num_browsers": num_browsers, + "pages_per_browser": pages_per_browser, + "page_distribution": page_distribution, + "distribution_str": distribution_str, + "total_urls": total_urls, + "browser_startup_time": browser_startup_time, + "crawl_time": crawl_time, + "total_time": total_time, + "browser_memory": browser_memory, + "pages_memory": pages_memory, + "peak_memory": peak_memory, + "pages_per_second": pages_per_second, + # Calculate efficiency score (higher is better) + # This balances speed vs memory usage + "efficiency_score": pages_per_second / (peak_memory + 1) if peak_memory > 0 else pages_per_second, + } + + results.append(result) + + # Log the results + logger.info(f"Browser startup: {browser_startup_time:.2f}s", tag="TEST") + logger.info(f"Crawl time: {crawl_time:.2f}s", tag="TEST") + logger.info(f"Total time: {total_time:.2f}s", tag="TEST") + logger.info(f"Performance: {pages_per_second:.1f} pages/second", tag="TEST") + + if peak_memory > 0: + logger.info(f"Browser memory: {browser_memory:.1f}MB", tag="TEST") + logger.info(f"Pages memory: {pages_memory:.1f}MB", tag="TEST") + logger.info(f"Peak memory: {peak_memory:.1f}MB", tag="TEST") + logger.info(f"Efficiency score: {result['efficiency_score']:.6f}", tag="TEST") + + except Exception as e: + logger.error(f"Error testing configuration: {str(e)}", tag="TEST") + import traceback + traceback.print_exc() + + # Clean up + for manager in managers: + try: + await manager.close() + except: + pass + + # Print summary of all configurations + logger.info("=" * 100, tag="TEST") + logger.info("GRID SEARCH RESULTS SUMMARY", tag="TEST") + logger.info("=" * 100, tag="TEST") + + # Rank configurations by efficiency score + ranked_results = sorted(results, key=lambda x: x["efficiency_score"], reverse=True) + + # Also determine rankings by different metrics + fastest = sorted(results, key=lambda x: x["crawl_time"])[0] + lowest_memory = sorted(results, key=lambda x: x["peak_memory"] if x["peak_memory"] > 0 else float('inf'))[0] + most_efficient = ranked_results[0] + + # Print top performers by category + logger.info("🏆 TOP PERFORMERS BY CATEGORY:", tag="TEST") + logger.info(f"⚡ Fastest: {fastest['num_browsers']} browsers × ~{fastest['pages_per_browser']} pages " + + f"({fastest['crawl_time']:.2f}s, {fastest['pages_per_second']:.1f} pages/s)", tag="TEST") + + if lowest_memory["peak_memory"] > 0: + logger.info(f"💾 Lowest memory: {lowest_memory['num_browsers']} browsers × ~{lowest_memory['pages_per_browser']} pages " + + f"({lowest_memory['peak_memory']:.1f}MB)", tag="TEST") + + logger.info(f"🌟 Most efficient: {most_efficient['num_browsers']} browsers × ~{most_efficient['pages_per_browser']} pages " + + f"(score: {most_efficient['efficiency_score']:.6f})", tag="TEST") + + # Print result table header + logger.info("\n📊 COMPLETE RANKING TABLE (SORTED BY EFFICIENCY SCORE):", tag="TEST") + logger.info("-" * 120, tag="TEST") + + # Define table header + header = f"{'Rank':<5} | {'Browsers':<8} | {'Distribution':<55} | {'Total Time(s)':<12} | {'Speed(p/s)':<12} | {'Memory(MB)':<12} | {'Efficiency':<10} | {'Notes'}" + logger.info(header, tag="TEST") + logger.info("-" * 120, tag="TEST") + + # Print each configuration in ranked order + for rank, result in enumerate(ranked_results, 1): + # Add special notes for top performers + notes = [] + if result == fastest: + notes.append("⚡ Fastest") + if result == lowest_memory: + notes.append("💾 Lowest Memory") + if result == most_efficient: + notes.append("🌟 Most Efficient") + + notes_str = " | ".join(notes) if notes else "" + + # Format memory if available + memory_str = f"{result['peak_memory']:.1f}" if result['peak_memory'] > 0 else "N/A" + + # Get the distribution string + dist_str = result.get('distribution_str', str(tuple([result['pages_per_browser']] * result['num_browsers']))) + + # Build the row + row = f"{rank:<5} | {result['num_browsers']:<8} | {dist_str:<55} | {result['total_time']:.2f}s{' ':<7} | " + row += f"{result['pages_per_second']:.2f}{' ':<6} | {memory_str}{' ':<6} | {result['efficiency_score']:.4f}{' ':<4} | {notes_str}" + + logger.info(row, tag="TEST") + + logger.info("-" * 120, tag="TEST") + + # Generate visualization if matplotlib is available + try: + import matplotlib.pyplot as plt + import numpy as np + + # Extract data for plotting from ranked results + browser_counts = [r["num_browsers"] for r in ranked_results] + efficiency_scores = [r["efficiency_score"] for r in ranked_results] + crawl_times = [r["crawl_time"] for r in ranked_results] + total_times = [r["total_time"] for r in ranked_results] + + # Filter results with memory data + memory_results = [r for r in ranked_results if r["peak_memory"] > 0] + memory_browser_counts = [r["num_browsers"] for r in memory_results] + peak_memories = [r["peak_memory"] for r in memory_results] + + # Create figure with clean design + plt.figure(figsize=(14, 12), facecolor='white') + plt.style.use('ggplot') + + # Create grid for subplots + gs = plt.GridSpec(3, 1, height_ratios=[1, 1, 1], hspace=0.3) + + # Plot 1: Efficiency Score (higher is better) + ax1 = plt.subplot(gs[0]) + bar_colors = ['#3498db'] * len(browser_counts) + + # Highlight the most efficient + most_efficient_idx = browser_counts.index(most_efficient["num_browsers"]) + bar_colors[most_efficient_idx] = '#e74c3c' # Red for most efficient + + bars = ax1.bar(range(len(browser_counts)), efficiency_scores, color=bar_colors) + ax1.set_xticks(range(len(browser_counts))) + ax1.set_xticklabels([f"{bc}" for bc in browser_counts], rotation=45) + ax1.set_xlabel('Number of Browsers') + ax1.set_ylabel('Efficiency Score (higher is better)') + ax1.set_title('Browser Configuration Efficiency (higher is better)') + + # Add value labels on top of bars + for bar, score in zip(bars, efficiency_scores): + height = bar.get_height() + ax1.text(bar.get_x() + bar.get_width()/2., height + 0.02*max(efficiency_scores), + f'{score:.3f}', ha='center', va='bottom', rotation=90, fontsize=8) + + # Highlight best configuration + ax1.text(0.02, 0.90, f"🌟 Most Efficient: {most_efficient['num_browsers']} browsers with ~{most_efficient['pages_per_browser']} pages", + transform=ax1.transAxes, fontsize=12, verticalalignment='top', + bbox=dict(boxstyle='round,pad=0.5', facecolor='yellow', alpha=0.3)) + + # Plot 2: Time Performance + ax2 = plt.subplot(gs[1]) + + # Plot both total time and crawl time + ax2.plot(browser_counts, crawl_times, 'bo-', label='Crawl Time (s)', linewidth=2) + ax2.plot(browser_counts, total_times, 'go--', label='Total Time (s)', linewidth=2, alpha=0.6) + + # Mark the fastest configuration + fastest_idx = browser_counts.index(fastest["num_browsers"]) + ax2.plot(browser_counts[fastest_idx], crawl_times[fastest_idx], 'ro', ms=10, + label=f'Fastest: {fastest["num_browsers"]} browsers') + + ax2.set_xlabel('Number of Browsers') + ax2.set_ylabel('Time (seconds)') + ax2.set_title(f'Time Performance for {total_urls} URLs by Browser Count') + ax2.grid(True, linestyle='--', alpha=0.7) + ax2.legend(loc='upper right') + + # Plot pages per second on second y-axis + pages_per_second = [total_urls/t for t in crawl_times] + ax2_twin = ax2.twinx() + ax2_twin.plot(browser_counts, pages_per_second, 'r^--', label='Pages/second', alpha=0.5) + ax2_twin.set_ylabel('Pages per second') + + # Add note about the fastest configuration + ax2.text(0.02, 0.90, f"⚡ Fastest: {fastest['num_browsers']} browsers with ~{fastest['pages_per_browser']} pages" + + f"\n {fastest['crawl_time']:.2f}s ({fastest['pages_per_second']:.1f} pages/s)", + transform=ax2.transAxes, fontsize=12, verticalalignment='top', + bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.3)) + + # Plot 3: Memory Usage (if available) + if memory_results: + ax3 = plt.subplot(gs[2]) + + # Prepare data for grouped bar chart + memory_per_browser = [m/n for m, n in zip(peak_memories, memory_browser_counts)] + memory_per_page = [m/(n*p) for m, n, p in zip( + [r["peak_memory"] for r in memory_results], + [r["num_browsers"] for r in memory_results], + [r["pages_per_browser"] for r in memory_results])] + + x = np.arange(len(memory_browser_counts)) + width = 0.35 + + # Create grouped bars + ax3.bar(x - width/2, peak_memories, width, label='Total Memory (MB)', color='#9b59b6') + ax3.bar(x + width/2, memory_per_browser, width, label='Memory per Browser (MB)', color='#3498db') + + # Configure axis + ax3.set_xticks(x) + ax3.set_xticklabels([f"{bc}" for bc in memory_browser_counts], rotation=45) + ax3.set_xlabel('Number of Browsers') + ax3.set_ylabel('Memory (MB)') + ax3.set_title('Memory Usage by Browser Configuration') + ax3.legend(loc='upper left') + ax3.grid(True, linestyle='--', alpha=0.7) + + # Add second y-axis for memory per page + ax3_twin = ax3.twinx() + ax3_twin.plot(x, memory_per_page, 'ro-', label='Memory per Page (MB)') + ax3_twin.set_ylabel('Memory per Page (MB)') + + # Get lowest memory configuration + lowest_memory_idx = memory_browser_counts.index(lowest_memory["num_browsers"]) + + # Add note about lowest memory configuration + ax3.text(0.02, 0.90, f"💾 Lowest Memory: {lowest_memory['num_browsers']} browsers with ~{lowest_memory['pages_per_browser']} pages" + + f"\n {lowest_memory['peak_memory']:.1f}MB ({lowest_memory['peak_memory']/total_urls:.2f}MB per page)", + transform=ax3.transAxes, fontsize=12, verticalalignment='top', + bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgreen', alpha=0.3)) + + # Add overall title + plt.suptitle(f'Browser Scaling Grid Search Results for {total_urls} URLs', fontsize=16, y=0.98) + + # Add timestamp and info at the bottom + plt.figtext(0.5, 0.01, f"Generated by Crawl4AI at {time.strftime('%Y-%m-%d %H:%M:%S')}", + ha="center", fontsize=10, style='italic') + + # Get current directory and save the figure there + import os + __current_file = os.path.abspath(__file__) + current_dir = os.path.dirname(__current_file) + output_file = os.path.join(current_dir, 'browser_scaling_grid_search.png') + + # Adjust layout and save figure with high DPI + plt.tight_layout(rect=[0, 0.03, 1, 0.97]) + plt.savefig(output_file, dpi=200, bbox_inches='tight') + logger.success(f"Visualization saved to {output_file}", tag="TEST") + + except ImportError: + logger.warning("matplotlib not available, skipping visualization", tag="TEST") + + return most_efficient["num_browsers"], most_efficient["pages_per_browser"] + +async def find_optimal_browser_config(total_urls=50, verbose=True, rate_limit_delay=0.2): + """Find optimal browser configuration for crawling a specific number of URLs. + + Args: + total_urls: Number of URLs to crawl + verbose: Whether to print progress + rate_limit_delay: Delay between page loads to avoid rate limiting + + Returns: + dict: Contains fastest, lowest_memory, and optimal configurations + """ + if verbose: + print(f"\n=== Finding optimal configuration for crawling {total_urls} URLs ===\n") + + # Generate test URLs with timestamp to avoid caching + timestamp = int(time.time()) + urls = [f"https://example.com/page_{i}?t={timestamp}" for i in range(total_urls)] + + # Limit browser configurations to test (1 browser to max 10) + max_browsers = min(10, total_urls) + configs_to_test = [] + + # Generate configurations (browser count, pages distribution) + for num_browsers in range(1, max_browsers + 1): + base_pages = total_urls // num_browsers + remainder = total_urls % num_browsers + + # Create distribution array like [3, 3, 2, 2] (some browsers get one more page) + if remainder > 0: + distribution = [base_pages + 1] * remainder + [base_pages] * (num_browsers - remainder) + else: + distribution = [base_pages] * num_browsers + + configs_to_test.append((num_browsers, distribution)) + + results = [] + + # Test each configuration + for browser_count, page_distribution in configs_to_test: + if verbose: + print(f"Testing {browser_count} browsers with distribution {tuple(page_distribution)}") + + try: + # Track memory if possible + try: + import psutil + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) # MB + except ImportError: + if verbose: + print("Memory tracking not available (psutil not installed)") + start_memory = 0 + + # Start browsers in parallel + managers = [] + start_tasks = [] + start_time = time.time() + + for i in range(browser_count): + config = BrowserConfig(headless=True) + manager = BrowserManager(browser_config=config, logger=logger) + start_tasks.append(manager.start()) + managers.append(manager) + + await asyncio.gather(*start_tasks) + + # Distribute URLs among browsers + urls_per_manager = {} + url_index = 0 + + for i, manager in enumerate(managers): + pages_for_this_browser = page_distribution[i] + end_index = url_index + pages_for_this_browser + urls_per_manager[manager] = urls[url_index:end_index] + url_index = end_index + + # Create pages for each browser + all_pages = [] + for manager, manager_urls in urls_per_manager.items(): + if not manager_urls: + continue + pages = await manager.get_pages(CrawlerRunConfig(), count=len(manager_urls)) + all_pages.extend(zip(pages, manager_urls)) + + # Crawl pages with delay to avoid rate limiting + async def crawl_page(page_ctx, url): + page, _ = page_ctx + try: + await page.goto(url) + if rate_limit_delay > 0: + await asyncio.sleep(rate_limit_delay) + title = await page.title() + return title + finally: + await page.close() + + crawl_start = time.time() + crawl_tasks = [crawl_page(page_ctx, url) for page_ctx, url in all_pages] + await asyncio.gather(*crawl_tasks) + crawl_time = time.time() - crawl_start + total_time = time.time() - start_time + + # Measure final memory usage + if start_memory > 0: + end_memory = process.memory_info().rss / (1024 * 1024) + memory_used = end_memory - start_memory + else: + memory_used = 0 + + # Close all browsers + for manager in managers: + await manager.close() + + # Calculate metrics + pages_per_second = total_urls / crawl_time + + # Calculate efficiency score (higher is better) + # This balances speed vs memory + if memory_used > 0: + efficiency = pages_per_second / (memory_used + 1) + else: + efficiency = pages_per_second + + # Store result + result = { + "browser_count": browser_count, + "distribution": tuple(page_distribution), + "crawl_time": crawl_time, + "total_time": total_time, + "memory_used": memory_used, + "pages_per_second": pages_per_second, + "efficiency": efficiency + } + + results.append(result) + + if verbose: + print(f" ✓ Crawled {total_urls} pages in {crawl_time:.2f}s ({pages_per_second:.1f} pages/sec)") + if memory_used > 0: + print(f" ✓ Memory used: {memory_used:.1f}MB ({memory_used/total_urls:.1f}MB per page)") + print(f" ✓ Efficiency score: {efficiency:.4f}") + + except Exception as e: + if verbose: + print(f" ✗ Error: {str(e)}") + + # Clean up + for manager in managers: + try: + await manager.close() + except: + pass + + # If no successful results, return None + if not results: + return None + + # Find best configurations + fastest = sorted(results, key=lambda x: x["crawl_time"])[0] + + # Only consider memory if available + memory_results = [r for r in results if r["memory_used"] > 0] + if memory_results: + lowest_memory = sorted(memory_results, key=lambda x: x["memory_used"])[0] + else: + lowest_memory = fastest + + # Find most efficient (balanced speed vs memory) + optimal = sorted(results, key=lambda x: x["efficiency"], reverse=True)[0] + + # Print summary + if verbose: + print("\n=== OPTIMAL CONFIGURATIONS ===") + print(f"⚡ Fastest: {fastest['browser_count']} browsers {fastest['distribution']}") + print(f" {fastest['crawl_time']:.2f}s, {fastest['pages_per_second']:.1f} pages/sec") + + print(f"💾 Memory-efficient: {lowest_memory['browser_count']} browsers {lowest_memory['distribution']}") + if lowest_memory["memory_used"] > 0: + print(f" {lowest_memory['memory_used']:.1f}MB, {lowest_memory['memory_used']/total_urls:.2f}MB per page") + + print(f"🌟 Balanced optimal: {optimal['browser_count']} browsers {optimal['distribution']}") + print(f" {optimal['crawl_time']:.2f}s, {optimal['pages_per_second']:.1f} pages/sec, score: {optimal['efficiency']:.4f}") + + return { + "fastest": fastest, + "lowest_memory": lowest_memory, + "optimal": optimal, + "all_configs": results + } + +async def run_tests(): + """Run all tests sequentially.""" + results = [] + + # Find optimal configuration using our utility function + configs = await find_optimal_browser_config( + total_urls=20, # Use a small number for faster testing + verbose=True, + rate_limit_delay=0.2 # 200ms delay between page loads to avoid rate limiting + ) + + if configs: + # Show the optimal configuration + optimal = configs["optimal"] + print(f"\n🎯 Recommended configuration for production use:") + print(f" {optimal['browser_count']} browsers with distribution {optimal['distribution']}") + print(f" Estimated performance: {optimal['pages_per_second']:.1f} pages/second") + results.append(True) + else: + print("\n❌ Failed to find optimal configuration") + results.append(False) + + # Print summary + total = len(results) + passed = sum(results) + print(f"\nTests complete: {passed}/{total} passed") + + if passed == total: + print("All tests passed!") + else: + print(f"{total - passed} tests failed") + +if __name__ == "__main__": + asyncio.run(run_tests()) \ No newline at end of file