feat(browser): implement browser pooling and page pre-warming

Adds a new BrowserManager implementation with browser pooling and page pre-warming capabilities:
- Adds support for managing multiple browser instances per configuration
- Implements page pre-warming for improved performance
- Adds configurable behavior for when no browsers are available
- Includes comprehensive status reporting and monitoring
- Maintains backward compatibility with existing API
- Adds demo script showcasing new features

BREAKING CHANGE: BrowserManager API now returns a strategy instance along with page and context
This commit is contained in:
UncleCode
2025-03-31 21:55:07 +08:00
parent bb02398086
commit 555455d710
6 changed files with 1484 additions and 98 deletions

View File

@@ -270,7 +270,7 @@ class BrowserConfig:
host: str = "localhost",
):
self.browser_type = browser_type
self.headless = headless and "new" or False
self.headless = headless or True
self.browser_mode = browser_mode
self.use_managed_browser = use_managed_browser
self.cdp_url = cdp_url

View File

@@ -0,0 +1,177 @@
"""Browser manager module for Crawl4AI.
This module provides a central browser management class that uses the
strategy pattern internally while maintaining the existing API.
It also implements a page pooling mechanism for improved performance.
"""
from typing import Optional, Tuple, List
from playwright.async_api import Page, BrowserContext
from ..async_logger import AsyncLogger
from ..async_configs import BrowserConfig, CrawlerRunConfig
from .strategies import (
BaseBrowserStrategy,
PlaywrightBrowserStrategy,
CDPBrowserStrategy,
BuiltinBrowserStrategy,
DockerBrowserStrategy
)
class BrowserManager:
"""Main interface for browser management in Crawl4AI.
This class maintains backward compatibility with the existing implementation
while using the strategy pattern internally for different browser types.
Attributes:
config (BrowserConfig): Configuration object containing all browser settings
logger: Logger instance for recording events and errors
browser: The browser instance
default_context: The default browser context
managed_browser: The managed browser instance
playwright: The Playwright instance
sessions: Dictionary to store session information
session_ttl: Session timeout in seconds
"""
def __init__(self, browser_config: Optional[BrowserConfig] = None, logger: Optional[AsyncLogger] = None):
"""Initialize the BrowserManager with a browser configuration.
Args:
browser_config: Configuration object containing all browser settings
logger: Logger instance for recording events and errors
"""
self.config = browser_config or BrowserConfig()
self.logger = logger
# Create strategy based on configuration
self.strategy = self._create_strategy()
# Initialize state variables for compatibility with existing code
self.browser = None
self.default_context = None
self.managed_browser = None
self.playwright = None
# For session management (from existing implementation)
self.sessions = {}
self.session_ttl = 1800 # 30 minutes
def _create_strategy(self) -> BaseBrowserStrategy:
"""Create appropriate browser strategy based on configuration.
Returns:
BaseBrowserStrategy: The selected browser strategy
"""
if self.config.browser_mode == "builtin":
return BuiltinBrowserStrategy(self.config, self.logger)
elif self.config.browser_mode == "docker":
if DockerBrowserStrategy is None:
if self.logger:
self.logger.error(
"Docker browser strategy requested but not available. "
"Falling back to PlaywrightBrowserStrategy.",
tag="BROWSER"
)
return PlaywrightBrowserStrategy(self.config, self.logger)
return DockerBrowserStrategy(self.config, self.logger)
elif self.config.browser_mode == "cdp" or self.config.cdp_url or self.config.use_managed_browser:
return CDPBrowserStrategy(self.config, self.logger)
else:
return PlaywrightBrowserStrategy(self.config, self.logger)
async def start(self):
"""Start the browser instance and set up the default context.
Returns:
self: For method chaining
"""
# Start the strategy
await self.strategy.start()
# Update legacy references
self.browser = self.strategy.browser
self.default_context = self.strategy.default_context
# Set browser process reference (for CDP strategy)
if hasattr(self.strategy, 'browser_process'):
self.managed_browser = self.strategy
# Set Playwright reference
self.playwright = self.strategy.playwright
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
self.session_ttl = self.strategy.session_ttl
return self
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
Tuple of (Page, BrowserContext)
"""
# Delegate to strategy
page, context = await self.strategy.get_page(crawlerRunConfig)
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return page, context
async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]:
"""Get multiple pages with the same configuration.
This method efficiently creates multiple browser pages using the same configuration,
which is useful for parallel crawling of multiple URLs.
Args:
crawlerRunConfig: Configuration for the pages
count: Number of pages to create
Returns:
List of (Page, Context) tuples
"""
# Delegate to strategy
pages = await self.strategy.get_pages(crawlerRunConfig, count)
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return pages
# Just for legacy compatibility
async def kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
Args:
session_id: The session ID to kill
"""
# Handle kill_session via our strategy if it supports it
await self.strategy.kill_session(session_id)
# sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
async def close(self):
"""Close the browser and clean up resources."""
# Delegate to strategy
await self.strategy.close()
# Reset legacy references
self.browser = None
self.default_context = None
self.managed_browser = None
self.playwright = None
self.sessions = {}

View File

@@ -2,12 +2,15 @@
This module provides a central browser management class that uses the
strategy pattern internally while maintaining the existing API.
It also implements a page pooling mechanism for improved performance.
It also implements browser pooling for improved performance.
"""
import asyncio
import time
from typing import Optional, Tuple, List
import hashlib
import json
import math
from enum import Enum
from typing import Dict, List, Optional, Tuple, Any
from playwright.async_api import Page, BrowserContext
@@ -22,55 +25,111 @@ from .strategies import (
DockerBrowserStrategy
)
class UnavailableBehavior(Enum):
"""Behavior when no browser is available."""
ON_DEMAND = "on_demand" # Create new browser on demand
PENDING = "pending" # Wait until a browser is available
EXCEPTION = "exception" # Raise an exception
class BrowserManager:
"""Main interface for browser management in Crawl4AI.
"""Main interface for browser management and pooling in Crawl4AI.
This class maintains backward compatibility with the existing implementation
while using the strategy pattern internally for different browser types.
It also implements browser pooling for improved performance.
Attributes:
config (BrowserConfig): Configuration object containing all browser settings
logger: Logger instance for recording events and errors
browser: The browser instance
default_context: The default browser context
managed_browser: The managed browser instance
playwright: The Playwright instance
sessions: Dictionary to store session information
session_ttl: Session timeout in seconds
config (BrowserConfig): Default configuration object for browsers
logger (AsyncLogger): Logger instance for recording events and errors
browser_pool (Dict): Dictionary to store browser instances by configuration
browser_in_use (Dict): Dictionary to track which browsers are in use
request_queues (Dict): Queues for pending requests by configuration
unavailable_behavior (UnavailableBehavior): Behavior when no browser is available
"""
def __init__(self, browser_config: Optional[BrowserConfig] = None, logger: Optional[AsyncLogger] = None):
def __init__(
self,
browser_config: Optional[BrowserConfig] = None,
logger: Optional[AsyncLogger] = None,
unavailable_behavior: UnavailableBehavior = UnavailableBehavior.EXCEPTION,
max_browsers_per_config: int = 10,
max_pages_per_browser: int = 5
):
"""Initialize the BrowserManager with a browser configuration.
Args:
browser_config: Configuration object containing all browser settings
logger: Logger instance for recording events and errors
unavailable_behavior: Behavior when no browser is available
max_browsers_per_config: Maximum number of browsers per configuration
max_pages_per_browser: Maximum number of pages per browser
"""
self.config = browser_config or BrowserConfig()
self.logger = logger
self.unavailable_behavior = unavailable_behavior
self.max_browsers_per_config = max_browsers_per_config
self.max_pages_per_browser = max_pages_per_browser
# Create strategy based on configuration
self.strategy = self._create_strategy()
# Browser pool management
self.browser_pool = {} # config_hash -> list of browser strategies
self.browser_in_use = {} # strategy instance -> Boolean
self.request_queues = {} # config_hash -> asyncio.Queue()
self._browser_locks = {} # config_hash -> asyncio.Lock()
self._browser_pool_lock = asyncio.Lock() # Global lock for pool modifications
# Initialize state variables for compatibility with existing code
# Page pool management
self.page_pool = {} # (browser_config_hash, crawler_config_hash) -> list of (page, context, strategy)
self._page_pool_lock = asyncio.Lock()
self.browser_page_counts = {} # strategy instance -> current page count
self._page_count_lock = asyncio.Lock() # Lock for thread-safe access to page counts
# For session management (from existing implementation)
self.sessions = {}
self.session_ttl = 1800 # 30 minutes
# For legacy compatibility
self.browser = None
self.default_context = None
self.managed_browser = None
self.playwright = None
# For session management (from existing implementation)
self.sessions = {}
self.session_ttl = 1800 # 30 minutes
self.strategy = None
def _create_strategy(self) -> BaseBrowserStrategy:
def _create_browser_config_hash(self, browser_config: BrowserConfig) -> str:
"""Create a hash of the browser configuration for browser pooling.
Args:
browser_config: Browser configuration
Returns:
str: Hash of the browser configuration
"""
# Convert config to dictionary, excluding any callable objects
config_dict = browser_config.__dict__.copy()
for key in list(config_dict.keys()):
if callable(config_dict[key]):
del config_dict[key]
# Convert to canonical JSON string
config_json = json.dumps(config_dict, sort_keys=True, default=str)
# Hash the JSON
config_hash = hashlib.sha256(config_json.encode()).hexdigest()
return config_hash
def _create_strategy(self, browser_config: BrowserConfig) -> BaseBrowserStrategy:
"""Create appropriate browser strategy based on configuration.
Args:
browser_config: Browser configuration
Returns:
BaseBrowserStrategy: The selected browser strategy
"""
if self.config.browser_mode == "builtin":
return BuiltinBrowserStrategy(self.config, self.logger)
elif self.config.browser_mode == "docker":
if browser_config.browser_mode == "builtin":
return BuiltinBrowserStrategy(browser_config, self.logger)
elif browser_config.browser_mode == "docker":
if DockerBrowserStrategy is None:
if self.logger:
self.logger.error(
@@ -78,102 +137,718 @@ class BrowserManager:
"Falling back to PlaywrightBrowserStrategy.",
tag="BROWSER"
)
return PlaywrightBrowserStrategy(self.config, self.logger)
return DockerBrowserStrategy(self.config, self.logger)
elif self.config.browser_mode == "cdp" or self.config.cdp_url or self.config.use_managed_browser:
return CDPBrowserStrategy(self.config, self.logger)
return PlaywrightBrowserStrategy(browser_config, self.logger)
return DockerBrowserStrategy(browser_config, self.logger)
elif browser_config.browser_mode == "cdp" or browser_config.cdp_url or browser_config.use_managed_browser:
return CDPBrowserStrategy(browser_config, self.logger)
else:
return PlaywrightBrowserStrategy(self.config, self.logger)
return PlaywrightBrowserStrategy(browser_config, self.logger)
async def initialize_pool(
self,
browser_configs: List[BrowserConfig] = None,
browsers_per_config: int = 1,
page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None
):
"""Initialize the browser pool with multiple browser configurations.
Args:
browser_configs: List of browser configurations to initialize
browsers_per_config: Number of browser instances per configuration
page_configs: Optional list of (browser_config, crawler_run_config, count) tuples
for pre-warming pages
Returns:
self: For method chaining
"""
if not browser_configs:
browser_configs = [self.config]
# Calculate how many browsers we'll need based on page_configs
browsers_needed = {}
if page_configs:
for browser_config, _, page_count in page_configs:
config_hash = self._create_browser_config_hash(browser_config)
# Calculate browsers based on max_pages_per_browser
browsers_needed_for_config = math.ceil(page_count / self.max_pages_per_browser)
browsers_needed[config_hash] = max(
browsers_needed.get(config_hash, 0),
browsers_needed_for_config
)
# Adjust browsers_per_config if needed to ensure enough capacity
config_browsers_needed = {}
for browser_config in browser_configs:
config_hash = self._create_browser_config_hash(browser_config)
# Estimate browsers needed based on page requirements
browsers_for_config = browsers_per_config
if config_hash in browsers_needed:
browsers_for_config = max(browsers_for_config, browsers_needed[config_hash])
config_browsers_needed[config_hash] = browsers_for_config
# Update max_browsers_per_config if needed
if browsers_for_config > self.max_browsers_per_config:
self.max_browsers_per_config = browsers_for_config
if self.logger:
self.logger.info(
f"Increased max_browsers_per_config to {browsers_for_config} to accommodate page requirements",
tag="POOL"
)
# Initialize locks and queues for each config
async with self._browser_pool_lock:
for browser_config in browser_configs:
config_hash = self._create_browser_config_hash(browser_config)
# Initialize lock for this config if needed
if config_hash not in self._browser_locks:
self._browser_locks[config_hash] = asyncio.Lock()
# Initialize queue for this config if needed
if config_hash not in self.request_queues:
self.request_queues[config_hash] = asyncio.Queue()
# Initialize pool for this config if needed
if config_hash not in self.browser_pool:
self.browser_pool[config_hash] = []
# Create browser instances for each configuration in parallel
browser_tasks = []
for browser_config in browser_configs:
config_hash = self._create_browser_config_hash(browser_config)
browsers_to_create = config_browsers_needed.get(
config_hash,
browsers_per_config
) - len(self.browser_pool.get(config_hash, []))
if browsers_to_create <= 0:
continue
for _ in range(browsers_to_create):
# Create a task for each browser initialization
task = self._create_and_add_browser(browser_config, config_hash)
browser_tasks.append(task)
# Wait for all browser initializations to complete
if browser_tasks:
if self.logger:
self.logger.info(f"Initializing {len(browser_tasks)} browsers in parallel...", tag="POOL")
await asyncio.gather(*browser_tasks)
# Pre-warm pages if requested
if page_configs:
page_tasks = []
for browser_config, crawler_run_config, count in page_configs:
task = self._prewarm_pages(browser_config, crawler_run_config, count)
page_tasks.append(task)
if page_tasks:
if self.logger:
self.logger.info(f"Pre-warming pages with {len(page_tasks)} configurations...", tag="POOL")
await asyncio.gather(*page_tasks)
# Update legacy references
if self.browser_pool and next(iter(self.browser_pool.values()), []):
strategy = next(iter(self.browser_pool.values()))[0]
self.strategy = strategy
self.browser = strategy.browser
self.default_context = strategy.default_context
self.playwright = strategy.playwright
return self
async def _create_and_add_browser(self, browser_config: BrowserConfig, config_hash: str):
"""Create and add a browser to the pool.
Args:
browser_config: Browser configuration
config_hash: Hash of the configuration
"""
try:
strategy = self._create_strategy(browser_config)
await strategy.start()
async with self._browser_pool_lock:
if config_hash not in self.browser_pool:
self.browser_pool[config_hash] = []
self.browser_pool[config_hash].append(strategy)
self.browser_in_use[strategy] = False
if self.logger:
self.logger.debug(
f"Added browser to pool: {browser_config.browser_type} "
f"({browser_config.browser_mode})",
tag="POOL"
)
except Exception as e:
if self.logger:
self.logger.error(
f"Failed to create browser: {str(e)}",
tag="POOL"
)
raise
def _make_config_signature(self, crawlerRunConfig: CrawlerRunConfig) -> str:
"""Create a signature hash from crawler configuration.
Args:
crawlerRunConfig: Crawler run configuration
Returns:
str: Hash of the crawler configuration
"""
config_dict = crawlerRunConfig.__dict__.copy()
# Exclude items that do not affect page creation
ephemeral_keys = [
"session_id",
"js_code",
"scraping_strategy",
"extraction_strategy",
"chunking_strategy",
"cache_mode",
"content_filter",
"semaphore_count",
"url"
]
for key in ephemeral_keys:
if key in config_dict:
del config_dict[key]
# Convert to canonical JSON string
config_json = json.dumps(config_dict, sort_keys=True, default=str)
# Hash the JSON
config_hash = hashlib.sha256(config_json.encode("utf-8")).hexdigest()
return config_hash
async def _prewarm_pages(
self,
browser_config: BrowserConfig,
crawler_run_config: CrawlerRunConfig,
count: int
):
"""Pre-warm pages for a specific configuration.
Args:
browser_config: Browser configuration
crawler_run_config: Crawler run configuration
count: Number of pages to pre-warm
"""
try:
# Create individual page tasks and run them in parallel
browser_config_hash = self._create_browser_config_hash(browser_config)
crawler_config_hash = self._make_config_signature(crawler_run_config)
async def get_single_page():
strategy = await self.get_available_browser(browser_config)
try:
page, context = await strategy.get_page(crawler_run_config)
# Store config hashes on the page object for later retrieval
setattr(page, "_browser_config_hash", browser_config_hash)
setattr(page, "_crawler_config_hash", crawler_config_hash)
return page, context, strategy
except Exception as e:
# Release the browser back to the pool
await self.release_browser(strategy, browser_config)
raise e
# Create tasks for parallel execution
page_tasks = [get_single_page() for _ in range(count)]
# Execute all page creation tasks in parallel
pages_contexts_strategies = await asyncio.gather(*page_tasks)
# Add pages to the page pool
browser_config_hash = self._create_browser_config_hash(browser_config)
crawler_config_hash = self._make_config_signature(crawler_run_config)
pool_key = (browser_config_hash, crawler_config_hash)
async with self._page_pool_lock:
if pool_key not in self.page_pool:
self.page_pool[pool_key] = []
# Add all pages to the pool
self.page_pool[pool_key].extend(pages_contexts_strategies)
if self.logger:
self.logger.debug(
f"Pre-warmed {count} pages in parallel with config {crawler_run_config}",
tag="POOL"
)
except Exception as e:
if self.logger:
self.logger.error(
f"Failed to pre-warm pages: {str(e)}",
tag="POOL"
)
raise
async def get_available_browser(
self,
browser_config: Optional[BrowserConfig] = None
) -> BaseBrowserStrategy:
"""Get an available browser from the pool for the given configuration.
Args:
browser_config: Browser configuration to match
Returns:
BaseBrowserStrategy: An available browser strategy
Raises:
Exception: If no browser is available and behavior is EXCEPTION
"""
browser_config = browser_config or self.config
config_hash = self._create_browser_config_hash(browser_config)
async with self._browser_locks.get(config_hash, asyncio.Lock()):
# Check if we have browsers for this config
if config_hash not in self.browser_pool or not self.browser_pool[config_hash]:
if self.unavailable_behavior == UnavailableBehavior.ON_DEMAND:
# Create a new browser on demand
if self.logger:
self.logger.info(
f"1> Creating new browser on demand for config {config_hash[:8]}",
tag="POOL"
)
# Initialize pool for this config if needed
async with self._browser_pool_lock:
if config_hash not in self.browser_pool:
self.browser_pool[config_hash] = []
strategy = self._create_strategy(browser_config)
await strategy.start()
self.browser_pool[config_hash].append(strategy)
self.browser_in_use[strategy] = False
elif self.unavailable_behavior == UnavailableBehavior.EXCEPTION:
raise Exception(f"No browsers available for configuration {config_hash[:8]}")
# Check for an available browser with capacity in the pool
for strategy in self.browser_pool[config_hash]:
# Check if this browser has capacity for more pages
async with self._page_count_lock:
current_pages = self.browser_page_counts.get(strategy, 0)
if current_pages < self.max_pages_per_browser:
# Increment the page count
self.browser_page_counts[strategy] = current_pages + 1
self.browser_in_use[strategy] = True
# Get browser information for better logging
browser_type = getattr(strategy.config, 'browser_type', 'unknown')
browser_mode = getattr(strategy.config, 'browser_mode', 'unknown')
strategy_id = id(strategy) # Use object ID as a unique identifier
if self.logger:
self.logger.debug(
f"Selected browser #{strategy_id} ({browser_type}/{browser_mode}) - "
f"pages: {current_pages+1}/{self.max_pages_per_browser}",
tag="POOL"
)
return strategy
# All browsers are at capacity or in use
if self.unavailable_behavior == UnavailableBehavior.ON_DEMAND:
# Check if we've reached the maximum number of browsers
if len(self.browser_pool[config_hash]) >= self.max_browsers_per_config:
if self.logger:
self.logger.warning(
f"Maximum browsers reached for config {config_hash[:8]} and all at page capacity",
tag="POOL"
)
if self.unavailable_behavior == UnavailableBehavior.EXCEPTION:
raise Exception("Maximum browsers reached and all at page capacity")
# Create a new browser on demand
if self.logger:
self.logger.info(
f"2> Creating new browser on demand for config {config_hash[:8]}",
tag="POOL"
)
strategy = self._create_strategy(browser_config)
await strategy.start()
async with self._browser_pool_lock:
self.browser_pool[config_hash].append(strategy)
self.browser_in_use[strategy] = True
return strategy
# If we get here, either behavior is EXCEPTION or PENDING
if self.unavailable_behavior == UnavailableBehavior.EXCEPTION:
raise Exception(f"All browsers in use or at page capacity for configuration {config_hash[:8]}")
# For PENDING behavior, set up waiting mechanism
if config_hash not in self.request_queues:
self.request_queues[config_hash] = asyncio.Queue()
# Create a future to wait on
future = asyncio.Future()
await self.request_queues[config_hash].put(future)
if self.logger:
self.logger.debug(
f"Waiting for available browser for config {config_hash[:8]}",
tag="POOL"
)
# Wait for a browser to become available
strategy = await future
return strategy
async def get_page(
self,
crawlerRunConfig: CrawlerRunConfig,
browser_config: Optional[BrowserConfig] = None
) -> Tuple[Page, BrowserContext, BaseBrowserStrategy]:
"""Get a page from the browser pool."""
browser_config = browser_config or self.config
# Check if we have a pre-warmed page available
browser_config_hash = self._create_browser_config_hash(browser_config)
crawler_config_hash = self._make_config_signature(crawlerRunConfig)
pool_key = (browser_config_hash, crawler_config_hash)
# Try to get a page from the pool
async with self._page_pool_lock:
if pool_key in self.page_pool and self.page_pool[pool_key]:
# Get a page from the pool
page, context, strategy = self.page_pool[pool_key].pop()
# Mark browser as in use (it already is, but ensure consistency)
self.browser_in_use[strategy] = True
if self.logger:
self.logger.debug(
f"Using pre-warmed page for config {crawler_config_hash[:8]}",
tag="POOL"
)
# Note: We don't increment page count since it was already counted when created
return page, context, strategy
# No pre-warmed page available, create a new one
# get_available_browser already increments the page count
strategy = await self.get_available_browser(browser_config)
try:
# Get a page from the browser
page, context = await strategy.get_page(crawlerRunConfig)
# Store config hashes on the page object for later retrieval
setattr(page, "_browser_config_hash", browser_config_hash)
setattr(page, "_crawler_config_hash", crawler_config_hash)
return page, context, strategy
except Exception as e:
# Release the browser back to the pool and decrement the page count
await self.release_browser(strategy, browser_config, decrement_page_count=True)
raise e
async def release_page(
self,
page: Page,
strategy: BaseBrowserStrategy,
browser_config: Optional[BrowserConfig] = None,
keep_alive: bool = True,
return_to_pool: bool = True
):
"""Release a page back to the pool."""
browser_config = browser_config or self.config
page_url = page.url if page else None
# If not keeping the page alive, close it and decrement count
if not keep_alive:
try:
await page.close()
except Exception as e:
if self.logger:
self.logger.error(
f"Error closing page: {str(e)}",
tag="POOL"
)
# Release the browser with page count decrement
await self.release_browser(strategy, browser_config, decrement_page_count=True)
return
# If returning to pool
if return_to_pool:
# Get the configuration hashes from the page object
browser_config_hash = getattr(page, "_browser_config_hash", None)
crawler_config_hash = getattr(page, "_crawler_config_hash", None)
if browser_config_hash and crawler_config_hash:
pool_key = (browser_config_hash, crawler_config_hash)
async with self._page_pool_lock:
if pool_key not in self.page_pool:
self.page_pool[pool_key] = []
# Add page back to the pool
self.page_pool[pool_key].append((page, page.context, strategy))
if self.logger:
self.logger.debug(
f"Returned page to pool for config {crawler_config_hash[:8]}, url: {page_url}",
tag="POOL"
)
# Note: We don't decrement the page count here since the page is still "in use"
# from the browser's perspective, just in our pool
return
else:
# If we can't identify the configuration, log a warning
if self.logger:
self.logger.warning(
"Cannot return page to pool - missing configuration hashes",
tag="POOL"
)
# If we got here, we couldn't return to pool, so just release the browser
await self.release_browser(strategy, browser_config, decrement_page_count=True)
async def release_browser(
self,
strategy: BaseBrowserStrategy,
browser_config: Optional[BrowserConfig] = None,
decrement_page_count: bool = True
):
"""Release a browser back to the pool."""
browser_config = browser_config or self.config
config_hash = self._create_browser_config_hash(browser_config)
# Decrement page count
if decrement_page_count:
async with self._page_count_lock:
current_count = self.browser_page_counts.get(strategy, 1)
self.browser_page_counts[strategy] = max(0, current_count - 1)
if self.logger:
self.logger.debug(
f"Decremented page count for browser (now: {self.browser_page_counts[strategy]})",
tag="POOL"
)
# Mark as not in use
self.browser_in_use[strategy] = False
# Process any waiting requests
if config_hash in self.request_queues and not self.request_queues[config_hash].empty():
future = await self.request_queues[config_hash].get()
if not future.done():
future.set_result(strategy)
async def get_pages(
self,
crawlerRunConfig: CrawlerRunConfig,
count: int = 1,
browser_config: Optional[BrowserConfig] = None
) -> List[Tuple[Page, BrowserContext, BaseBrowserStrategy]]:
"""Get multiple pages from the browser pool.
Args:
crawlerRunConfig: Configuration for the crawler run
count: Number of pages to get
browser_config: Browser configuration to use
Returns:
List of (Page, Context, Strategy) tuples
"""
results = []
for _ in range(count):
try:
result = await self.get_page(crawlerRunConfig, browser_config)
results.append(result)
except Exception as e:
# Release any pages we've already gotten
for page, _, strategy in results:
await self.release_page(page, strategy, browser_config)
raise e
return results
async def get_page_pool_status(self) -> Dict[str, Any]:
"""Get information about the page pool status.
Returns:
Dict with page pool status information
"""
status = {
"total_pooled_pages": 0,
"configs": {}
}
async with self._page_pool_lock:
for (browser_hash, crawler_hash), pages in self.page_pool.items():
config_key = f"{browser_hash[:8]}_{crawler_hash[:8]}"
status["configs"][config_key] = len(pages)
status["total_pooled_pages"] += len(pages)
if self.logger:
self.logger.debug(
f"Page pool status: {status['total_pooled_pages']} pages available",
tag="POOL"
)
return status
async def get_pool_status(self) -> Dict[str, Any]:
"""Get information about the browser pool status.
Returns:
Dict with pool status information
"""
status = {
"total_browsers": 0,
"browsers_in_use": 0,
"total_pages": 0,
"configs": {}
}
for config_hash, strategies in self.browser_pool.items():
config_pages = 0
in_use = 0
for strategy in strategies:
is_in_use = self.browser_in_use.get(strategy, False)
if is_in_use:
in_use += 1
# Get page count for this browser
try:
page_count = len(await strategy.get_opened_pages())
config_pages += page_count
except Exception as e:
if self.logger:
self.logger.error(f"Error getting page count: {str(e)}", tag="POOL")
config_status = {
"total_browsers": len(strategies),
"browsers_in_use": in_use,
"pages_open": config_pages,
"waiting_requests": self.request_queues.get(config_hash, asyncio.Queue()).qsize(),
"max_capacity": len(strategies) * self.max_pages_per_browser,
"utilization_pct": round((config_pages / (len(strategies) * self.max_pages_per_browser)) * 100, 1)
if strategies else 0
}
status["configs"][config_hash] = config_status
status["total_browsers"] += config_status["total_browsers"]
status["browsers_in_use"] += config_status["browsers_in_use"]
status["total_pages"] += config_pages
# Add overall utilization
if status["total_browsers"] > 0:
max_capacity = status["total_browsers"] * self.max_pages_per_browser
status["overall_utilization_pct"] = round((status["total_pages"] / max_capacity) * 100, 1)
else:
status["overall_utilization_pct"] = 0
return status
async def start(self):
"""Start the browser instance and set up the default context.
"""Start at least one browser instance in the pool.
This method is kept for backward compatibility.
Returns:
self: For method chaining
"""
# Start the strategy
await self.strategy.start()
# Update legacy references
self.browser = self.strategy.browser
self.default_context = self.strategy.default_context
# Set browser process reference (for CDP strategy)
if hasattr(self.strategy, 'browser_process'):
self.managed_browser = self.strategy
# Set Playwright reference
self.playwright = self.strategy.playwright
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
self.session_ttl = self.strategy.session_ttl
await self.initialize_pool([self.config], 1)
return self
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
Tuple of (Page, BrowserContext)
"""
# Delegate to strategy
page, context = await self.strategy.get_page(crawlerRunConfig)
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return page, context
async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]:
"""Get multiple pages with the same configuration.
This method efficiently creates multiple browser pages using the same configuration,
which is useful for parallel crawling of multiple URLs.
Args:
crawlerRunConfig: Configuration for the pages
count: Number of pages to create
Returns:
List of (Page, Context) tuples
"""
# Delegate to strategy
pages = await self.strategy.get_pages(crawlerRunConfig, count)
# Sync sessions if needed
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return pages
# Just for legacy compatibility
async def kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
Delegated to the strategy. This method is kept for backward compatibility.
Args:
session_id: The session ID to kill
"""
# Handle kill_session via our strategy if it supports it
if not self.strategy:
return
await self.strategy.kill_session(session_id)
# sync sessions if needed
# Sync sessions
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
async def close(self):
"""Close the browser and clean up resources."""
# Delegate to strategy
await self.strategy.close()
"""Close all browsers in the pool and clean up resources."""
# Close all browsers in the pool
for strategies in self.browser_pool.values():
for strategy in strategies:
try:
await strategy.close()
except Exception as e:
if self.logger:
self.logger.error(
f"Error closing browser: {str(e)}",
tag="POOL"
)
# Clear pool data
self.browser_pool = {}
self.browser_in_use = {}
# Reset legacy references
self.browser = None
self.default_context = None
self.managed_browser = None
self.playwright = None
self.strategy = None
self.sessions = {}
async def create_browser_manager(
browser_config: Optional[BrowserConfig] = None,
logger: Optional[AsyncLogger] = None,
unavailable_behavior: UnavailableBehavior = UnavailableBehavior.EXCEPTION,
max_browsers_per_config: int = 10,
initial_pool_size: int = 1,
page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None
) -> BrowserManager:
"""Factory function to create and initialize a BrowserManager.
Args:
browser_config: Configuration for the browsers
logger: Logger for recording events
unavailable_behavior: Behavior when no browser is available
max_browsers_per_config: Maximum browsers per configuration
initial_pool_size: Initial number of browsers per configuration
page_configs: Optional configurations for pre-warming pages
Returns:
Initialized BrowserManager
"""
manager = BrowserManager(
browser_config=browser_config,
logger=logger,
unavailable_behavior=unavailable_behavior,
max_browsers_per_config=max_browsers_per_config
)
await manager.initialize_pool(
[browser_config] if browser_config else None,
initial_pool_size,
page_configs
)
return manager

View File

@@ -109,6 +109,9 @@ class BaseBrowserStrategy(ABC):
page, context = await self._generate_page(crawlerRunConfig)
import uuid
setattr(page, "guid", uuid.uuid4())
# If a session_id is specified, store this session so we can reuse later
if crawlerRunConfig.session_id:
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
@@ -132,6 +135,12 @@ class BaseBrowserStrategy(ABC):
pages.append((page, context))
return pages
async def get_opened_pages(self) -> List[Page]:
"""Get all opened pages in the
browser.
"""
return [page for context in self.contexts_by_config.values() for page in context.pages]
def _build_browser_args(self) -> dict:
"""Build browser launch arguments from config.

View File

@@ -122,7 +122,7 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
else:
raise NotImplementedError(f"Browser type {self.config.browser_type} not supported")
args = base_args + browser_args + args
args = base_args + browser_args['args'] + args
# Start browser process
try:

View File

@@ -0,0 +1,525 @@
"""Demo script for testing the enhanced BrowserManager.
This script demonstrates the browser pooling capabilities of the enhanced
BrowserManager with various configurations and usage patterns.
"""
import asyncio
import time
import random
from crawl4ai.browser.manager import BrowserManager, UnavailableBehavior
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
import playwright
SAFE_URLS = [
"https://example.com",
"https://example.com/page1",
"https://httpbin.org/get",
"https://httpbin.org/html",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
"https://httpbin.org/cookies",
"https://httpstat.us/200",
"https://httpstat.us/301",
"https://httpstat.us/404",
"https://httpstat.us/500",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
"https://jsonplaceholder.typicode.com/posts/4",
"https://jsonplaceholder.typicode.com/posts/5",
"https://jsonplaceholder.typicode.com/comments/1",
"https://jsonplaceholder.typicode.com/comments/2",
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/users/2",
"https://jsonplaceholder.typicode.com/albums/1",
"https://jsonplaceholder.typicode.com/albums/2",
"https://jsonplaceholder.typicode.com/photos/1",
"https://jsonplaceholder.typicode.com/photos/2",
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://www.iana.org",
"https://www.iana.org/domains",
"https://www.iana.org/numbers",
"https://www.iana.org/protocols",
"https://www.iana.org/about",
"https://www.iana.org/time-zones",
"https://www.data.gov",
"https://catalog.data.gov/dataset",
"https://www.archives.gov",
"https://www.usa.gov",
"https://www.loc.gov",
"https://www.irs.gov",
"https://www.census.gov",
"https://www.bls.gov",
"https://www.gpo.gov",
"https://www.w3.org",
"https://www.w3.org/standards",
"https://www.w3.org/WAI",
"https://www.rfc-editor.org",
"https://www.ietf.org",
"https://www.icann.org",
"https://www.internetsociety.org",
"https://www.python.org"
]
async def basic_pooling_demo():
"""Demonstrate basic browser pooling functionality."""
print("\n=== Basic Browser Pooling Demo ===")
# Create logger
logger = AsyncLogger(verbose=True)
# Create browser configurations
config1 = BrowserConfig(
browser_type="chromium",
headless=True,
browser_mode="playwright"
)
config2 = BrowserConfig(
browser_type="chromium",
headless=True,
browser_mode="cdp"
)
# Create browser manager with on-demand behavior
manager = BrowserManager(
browser_config=config1,
logger=logger,
unavailable_behavior=UnavailableBehavior.ON_DEMAND,
max_browsers_per_config=3
)
try:
# Initialize pool with both configurations
print("Initializing browser pool...")
await manager.initialize_pool(
browser_configs=[config1, config2],
browsers_per_config=2
)
# Display initial pool status
status = await manager.get_pool_status()
print(f"Initial pool status: {status}")
# Create crawler run configurations
run_config1 = CrawlerRunConfig()
run_config2 = CrawlerRunConfig()
# Simulate concurrent page requests
print("\nGetting pages for parallel crawling...")
# Function to simulate crawling
async def simulate_crawl(index: int, config: BrowserConfig, run_config: CrawlerRunConfig):
print(f"Crawler {index}: Requesting page...")
page, context, strategy = await manager.get_page(run_config, config)
print(f"Crawler {index}: Got page, navigating to example.com...")
try:
await page.goto("https://example.com")
title = await page.title()
print(f"Crawler {index}: Page title: {title}")
# Simulate work
await asyncio.sleep(random.uniform(1, 3))
print(f"Crawler {index}: Work completed, releasing page...")
# Check dynamic page content
content = await page.content()
content_length = len(content)
print(f"Crawler {index}: Page content length: {content_length}")
except Exception as e:
print(f"Crawler {index}: Error: {str(e)}")
finally:
# Release the page
await manager.release_page(page, strategy, config)
print(f"Crawler {index}: Page released")
# Create 5 parallel crawls
crawl_tasks = []
for i in range(5):
# Alternate between configurations
config = config1 if i % 2 == 0 else config2
run_config = run_config1 if i % 2 == 0 else run_config2
task = asyncio.create_task(simulate_crawl(i+1, config, run_config))
crawl_tasks.append(task)
# Wait for all crawls to complete
await asyncio.gather(*crawl_tasks)
# Display final pool status
status = await manager.get_pool_status()
print(f"\nFinal pool status: {status}")
finally:
# Clean up
print("\nClosing browser manager...")
await manager.close()
print("Browser manager closed")
async def prewarm_pages_demo():
"""Demonstrate page pre-warming functionality."""
print("\n=== Page Pre-warming Demo ===")
# Create logger
logger = AsyncLogger(verbose=True)
# Create browser configuration
config = BrowserConfig(
browser_type="chromium",
headless=True,
browser_mode="playwright"
)
# Create crawler run configurations for pre-warming
run_config1 = CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
run_config2 = CrawlerRunConfig(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15"
)
# Create page pre-warm configurations
page_configs = [
(config, run_config1, 2), # 2 pages with run_config1
(config, run_config2, 3) # 3 pages with run_config2
]
# Create browser manager
manager = BrowserManager(
browser_config=config,
logger=logger,
unavailable_behavior=UnavailableBehavior.EXCEPTION
)
try:
# Initialize pool with pre-warmed pages
print("Initializing browser pool with pre-warmed pages...")
await manager.initialize_pool(
browser_configs=[config],
browsers_per_config=2,
page_configs=page_configs
)
# Display pool status
status = await manager.get_pool_status()
print(f"Pool status after pre-warming: {status}")
# Simulate using pre-warmed pages
print("\nUsing pre-warmed pages...")
async def use_prewarm_page(index: int, run_config: CrawlerRunConfig):
print(f"Task {index}: Requesting pre-warmed page...")
page, context, strategy = await manager.get_page(run_config, config)
try:
print(f"Task {index}: Got page, navigating to example.com...")
await page.goto("https://example.com")
# Verify user agent was applied correctly
user_agent = await page.evaluate("() => navigator.userAgent")
print(f"Task {index}: User agent: {user_agent}")
# Get page title
title = await page.title()
print(f"Task {index}: Page title: {title}")
# Simulate work
await asyncio.sleep(1)
finally:
# Release the page
print(f"Task {index}: Releasing page...")
await manager.release_page(page, strategy, config)
# Create tasks to use pre-warmed pages
tasks = []
# Use run_config1 pages
for i in range(2):
tasks.append(asyncio.create_task(use_prewarm_page(i+1, run_config1)))
# Use run_config2 pages
for i in range(3):
tasks.append(asyncio.create_task(use_prewarm_page(i+3, run_config2)))
# Wait for all tasks to complete
await asyncio.gather(*tasks)
# Try to use more pages than we pre-warmed (should raise exception)
print("\nTrying to use more pages than pre-warmed...")
try:
page, context, strategy = await manager.get_page(run_config1, config)
try:
print("Got extra page (unexpected)")
await page.goto("https://example.com")
finally:
await manager.release_page(page, strategy, config)
except Exception as e:
print(f"Expected exception when requesting more pages: {str(e)}")
finally:
# Clean up
print("\nClosing browser manager...")
await manager.close()
print("Browser manager closed")
async def prewarm_on_demand_demo():
"""Demonstrate pre-warming with on-demand browser creation."""
print("\n=== Pre-warming with On-Demand Browser Creation Demo ===")
# Create logger
logger = AsyncLogger(verbose=True)
# Create browser configuration
config = BrowserConfig(
browser_type="chromium",
headless=True,
browser_mode="playwright"
)
# Create crawler run configurations
run_config = CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
# Create page pre-warm configurations - just pre-warm 2 pages
page_configs = [
(config, run_config, 2)
]
# Create browser manager with ON_DEMAND behavior
manager = BrowserManager(
browser_config=config,
logger=logger,
unavailable_behavior=UnavailableBehavior.ON_DEMAND,
max_browsers_per_config=5 # Allow up to 5 browsers
)
try:
# Initialize pool with pre-warmed pages
print("Initializing browser pool with pre-warmed pages...")
await manager.initialize_pool(
browser_configs=[config],
browsers_per_config=1, # Start with just 1 browser
page_configs=page_configs
)
# Display initial pool status
status = await manager.get_pool_status()
print(f"Initial pool status: {status}")
# Simulate using more pages than pre-warmed - should create browsers on demand
print("\nUsing more pages than pre-warmed (should create on demand)...")
async def use_page(index: int):
print(f"Task {index}: Requesting page...")
page, context, strategy = await manager.get_page(run_config, config)
try:
print(f"Task {index}: Got page, navigating to example.com...")
await page.goto("https://example.com")
# Get page title
title = await page.title()
print(f"Task {index}: Page title: {title}")
# Simulate work for a varying amount of time
work_time = 1 + (index * 0.5) # Stagger completion times
print(f"Task {index}: Working for {work_time} seconds...")
await asyncio.sleep(work_time)
print(f"Task {index}: Work completed")
finally:
# Release the page
print(f"Task {index}: Releasing page...")
await manager.release_page(page, strategy, config)
# Create more tasks than pre-warmed pages
tasks = []
for i in range(5): # Try to use 5 pages when only 2 are pre-warmed
tasks.append(asyncio.create_task(use_page(i+1)))
# Wait for all tasks to complete
await asyncio.gather(*tasks)
# Display final pool status - should show on-demand created browsers
status = await manager.get_pool_status()
print(f"\nFinal pool status: {status}")
finally:
# Clean up
print("\nClosing browser manager...")
await manager.close()
print("Browser manager closed")
async def high_volume_demo():
"""Demonstrate high-volume access to pre-warmed pages."""
print("\n=== High Volume Pre-warmed Pages Demo ===")
# Create logger
logger = AsyncLogger(verbose=True)
# Create browser configuration
config = BrowserConfig(
browser_type="chromium",
headless=True,
browser_mode="playwright"
)
# Create crawler run configuration
run_config = CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
# Set up dimensions
browser_count = 10
pages_per_browser = 5
total_pages = browser_count * pages_per_browser
# Create page pre-warm configuration
page_configs = [
(config, run_config, total_pages)
]
print(f"Preparing {browser_count} browsers with {pages_per_browser} pages each ({total_pages} total pages)")
# Create browser manager with ON_DEMAND behavior as fallback
# No need to specify max_browsers_per_config as it will be calculated automatically
manager = BrowserManager(
browser_config=config,
logger=logger,
unavailable_behavior=UnavailableBehavior.ON_DEMAND
)
try:
# Initialize pool with browsers and pre-warmed pages
print(f"Pre-warming {total_pages} pages...")
start_time = time.time()
await manager.initialize_pool(
browser_configs=[config],
browsers_per_config=browser_count,
page_configs=page_configs
)
warmup_time = time.time() - start_time
print(f"Pre-warming completed in {warmup_time:.2f} seconds")
# Display pool status
status = await manager.get_pool_status()
print(f"Pool status after pre-warming: {status}")
# Simulate using all pre-warmed pages simultaneously
print(f"\nSending {total_pages} crawl requests simultaneously...")
async def crawl_page(index: int):
# url = f"https://example.com/page{index}"
url = SAFE_URLS[index % len(SAFE_URLS)]
print(f"Page {index}: Requesting page...")
# Measure time to acquire page
page_start = time.time()
page, context, strategy = await manager.get_page(run_config, config)
page_acquisition_time = time.time() - page_start
try:
# Navigate to the URL
nav_start = time.time()
await page.goto(url, timeout=5000)
navigation_time = time.time() - nav_start
# Get the page title
title = await page.title()
return {
"index": index,
"url": url,
"title": title,
"page_acquisition_time": page_acquisition_time,
"navigation_time": navigation_time
}
except playwright._impl._errors.TimeoutError as e:
# print(f"Page {index}: Navigation timed out - {e}")
return {
"index": index,
"url": url,
"title": "Navigation timed out",
"page_acquisition_time": page_acquisition_time,
"navigation_time": 0
}
finally:
# Release the page
await manager.release_page(page, strategy, config)
# Create and execute all tasks simultaneously
start_time = time.time()
# Non-parallel way
# for i in range(total_pages):
# await crawl_page(i+1)
tasks = [crawl_page(i+1) for i in range(total_pages)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# # Print all titles
# for result in results:
# print(f"Page {result['index']} ({result['url']}): Title: {result['title']}")
# print(f" Page acquisition time: {result['page_acquisition_time']:.4f}s")
# print(f" Navigation time: {result['navigation_time']:.4f}s")
# print(f" Total time: {result['page_acquisition_time'] + result['navigation_time']:.4f}s")
# print("-" * 40)
# Report results
print(f"\nAll {total_pages} crawls completed in {total_time:.2f} seconds")
# Calculate statistics
acquisition_times = [r["page_acquisition_time"] for r in results]
navigation_times = [r["navigation_time"] for r in results]
avg_acquisition = sum(acquisition_times) / len(acquisition_times)
max_acquisition = max(acquisition_times)
min_acquisition = min(acquisition_times)
avg_navigation = sum(navigation_times) / len(navigation_times)
max_navigation = max(navigation_times)
min_navigation = min(navigation_times)
print("\nPage acquisition times:")
print(f" Average: {avg_acquisition:.4f}s")
print(f" Min: {min_acquisition:.4f}s")
print(f" Max: {max_acquisition:.4f}s")
print("\nPage navigation times:")
print(f" Average: {avg_navigation:.4f}s")
print(f" Min: {min_navigation:.4f}s")
print(f" Max: {max_navigation:.4f}s")
# Display final pool status
status = await manager.get_pool_status()
print(f"\nFinal pool status: {status}")
finally:
# Clean up
print("\nClosing browser manager...")
await manager.close()
print("Browser manager closed")
async def main():
"""Run all demos."""
# await basic_pooling_demo()
# await prewarm_pages_demo()
# await prewarm_on_demand_demo()
await high_volume_demo()
# Additional demo functions can be added here
if __name__ == "__main__":
asyncio.run(main())