diff --git a/crawl4ai/browser/__init__.py b/crawl4ai/browser/__init__.py deleted file mode 100644 index af4d74c7..00000000 --- a/crawl4ai/browser/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Browser management module for Crawl4AI. - -This module provides browser management capabilities using different strategies -for browser creation and interaction. -""" - -from .manager import BrowserManager -from .profiles import BrowserProfileManager -from .models import DockerConfig -from .docker_registry import DockerRegistry -from .docker_utils import DockerUtils -from .strategies import ( - BaseBrowserStrategy, - PlaywrightBrowserStrategy, - CDPBrowserStrategy, - BuiltinBrowserStrategy, - DockerBrowserStrategy -) - -__all__ = ['BrowserManager', 'BrowserProfileManager', 'DockerConfig', 'DockerRegistry', 'DockerUtils', 'BaseBrowserStrategy', - 'PlaywrightBrowserStrategy', 'CDPBrowserStrategy', 'BuiltinBrowserStrategy', - 'DockerBrowserStrategy'] \ No newline at end of file diff --git a/crawl4ai/browser/browser_hub.py b/crawl4ai/browser/browser_hub.py deleted file mode 100644 index 47b742b5..00000000 --- a/crawl4ai/browser/browser_hub.py +++ /dev/null @@ -1,184 +0,0 @@ -# browser_hub_manager.py -import hashlib -import json -import asyncio -from typing import Dict, Optional, List, Tuple -from .manager import BrowserManager, UnavailableBehavior -from ..async_configs import BrowserConfig, CrawlerRunConfig -from ..async_logger import AsyncLogger - -class BrowserHub: - """ - Manages Browser-Hub instances for sharing across multiple pipelines. - - This class provides centralized management for browser resources, allowing - multiple pipelines to share browser instances efficiently, connect to - existing browser hubs, or create new ones with custom configurations. - """ - _instances: Dict[str, BrowserManager] = {} - _lock = asyncio.Lock() - - @classmethod - async def get_browser_manager( - cls, - config: Optional[BrowserConfig] = None, - hub_id: Optional[str] = None, - connection_info: Optional[str] = None, - logger: Optional[AsyncLogger] = None, - max_browsers_per_config: int = 10, - max_pages_per_browser: int = 5, - initial_pool_size: int = 1, - page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None - ) -> BrowserManager: - """ - Get an existing BrowserManager or create a new one based on parameters. - - Args: - config: Browser configuration for new hub - hub_id: Identifier for the hub instance - connection_info: Connection string for existing hub - logger: Logger for recording events and errors - max_browsers_per_config: Maximum browsers per configuration - max_pages_per_browser: Maximum pages per browser - initial_pool_size: Initial number of browsers to create - page_configs: Optional configurations for pre-warming pages - - Returns: - BrowserManager: The requested browser manager instance - """ - async with cls._lock: - # Scenario 3: Use existing hub via connection info - if connection_info: - instance_key = f"connection:{connection_info}" - if instance_key not in cls._instances: - cls._instances[instance_key] = await cls._connect_to_browser_hub( - connection_info, logger - ) - return cls._instances[instance_key] - - # Scenario 2: Custom configured hub - if config: - config_hash = cls._hash_config(config) - instance_key = hub_id or f"config:{config_hash}" - if instance_key not in cls._instances: - cls._instances[instance_key] = await cls._create_browser_manager( - config, - logger, - max_browsers_per_config, - max_pages_per_browser, - initial_pool_size, - page_configs - ) - return cls._instances[instance_key] - - # Scenario 1: Default hub - instance_key = "default" - if instance_key not in cls._instances: - cls._instances[instance_key] = await cls._create_default_browser_hub( - logger, - max_browsers_per_config, - max_pages_per_browser, - initial_pool_size - ) - return cls._instances[instance_key] - - @classmethod - async def _create_browser_manager( - cls, - config: BrowserConfig, - logger: Optional[AsyncLogger], - max_browsers_per_config: int, - max_pages_per_browser: int, - initial_pool_size: int, - page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None - ) -> BrowserManager: - """Create a new browser hub with the specified configuration.""" - manager = BrowserManager( - browser_config=config, - logger=logger, - unavailable_behavior=UnavailableBehavior.ON_DEMAND, - max_browsers_per_config=max_browsers_per_config, - max_pages_per_browser=max_pages_per_browser, - ) - - # Initialize the pool - await manager.initialize_pool( - browser_configs=[config] if config else None, - browsers_per_config=initial_pool_size, - page_configs=page_configs - ) - - return manager - - @classmethod - async def _create_default_browser_hub( - cls, - logger: Optional[AsyncLogger], - max_browsers_per_config: int, - max_pages_per_browser: int, - initial_pool_size: int - ) -> BrowserManager: - """Create a default browser hub with standard settings.""" - config = BrowserConfig(headless=True) - return await cls._create_browser_manager( - config, - logger, - max_browsers_per_config, - max_pages_per_browser, - initial_pool_size, - None - ) - - @classmethod - async def _connect_to_browser_hub( - cls, - connection_info: str, - logger: Optional[AsyncLogger] - ) -> BrowserManager: - """ - Connect to an existing browser hub. - - Note: This is a placeholder for future remote connection functionality. - Currently creates a local instance. - """ - if logger: - logger.info( - message="Remote browser hub connections not yet implemented. Creating local instance.", - tag="BROWSER_HUB" - ) - # For now, create a default local instance - return await cls._create_default_browser_hub( - logger, - max_browsers_per_config=10, - max_pages_per_browser=5, - initial_pool_size=1 - ) - - @classmethod - def _hash_config(cls, config: BrowserConfig) -> str: - """Create a hash of the browser configuration for identification.""" - # Convert config to dictionary, excluding any callable objects - config_dict = config.__dict__.copy() - for key in list(config_dict.keys()): - if callable(config_dict[key]): - del config_dict[key] - - # Convert to canonical JSON string - config_json = json.dumps(config_dict, sort_keys=True, default=str) - - # Hash the JSON - config_hash = hashlib.sha256(config_json.encode()).hexdigest() - return config_hash - - @classmethod - async def shutdown_all(cls): - """Close all browser hub instances and clear the registry.""" - async with cls._lock: - shutdown_tasks = [] - for hub in cls._instances.values(): - shutdown_tasks.append(hub.close()) - - if shutdown_tasks: - await asyncio.gather(*shutdown_tasks) - - cls._instances.clear() \ No newline at end of file diff --git a/crawl4ai/browser/docker/alpine/connect.Dockerfile b/crawl4ai/browser/docker/alpine/connect.Dockerfile deleted file mode 100644 index 96f77cef..00000000 --- a/crawl4ai/browser/docker/alpine/connect.Dockerfile +++ /dev/null @@ -1,34 +0,0 @@ -# ---------- Dockerfile ---------- - FROM alpine:latest - - # Combine everything in one RUN to keep layers minimal. - RUN apk update && apk upgrade && \ - apk add --no-cache \ - chromium \ - nss \ - freetype \ - harfbuzz \ - ca-certificates \ - ttf-freefont \ - socat \ - curl && \ - addgroup -S chromium && adduser -S chromium -G chromium && \ - mkdir -p /data && chown chromium:chromium /data && \ - rm -rf /var/cache/apk/* - - # Copy start script, then chown/chmod in one step - COPY start.sh /home/chromium/start.sh - RUN chown chromium:chromium /home/chromium/start.sh && \ - chmod +x /home/chromium/start.sh - - USER chromium - WORKDIR /home/chromium - - # Expose port used by socat (mapping 9222→9223 or whichever you prefer) - EXPOSE 9223 - - # Simple healthcheck: is the remote debug endpoint responding? - HEALTHCHECK --interval=30s --timeout=5s --retries=3 CMD curl -f http://localhost:9222/json/version || exit 1 - - CMD ["./start.sh"] - \ No newline at end of file diff --git a/crawl4ai/browser/docker/alpine/launch.Dockerfile b/crawl4ai/browser/docker/alpine/launch.Dockerfile deleted file mode 100644 index 17e3c660..00000000 --- a/crawl4ai/browser/docker/alpine/launch.Dockerfile +++ /dev/null @@ -1,27 +0,0 @@ -# ---------- Dockerfile (Idle Version) ---------- - FROM alpine:latest - - # Install only Chromium and its dependencies in a single layer - RUN apk update && apk upgrade && \ - apk add --no-cache \ - chromium \ - nss \ - freetype \ - harfbuzz \ - ca-certificates \ - ttf-freefont \ - socat \ - curl && \ - addgroup -S chromium && adduser -S chromium -G chromium && \ - mkdir -p /data && chown chromium:chromium /data && \ - rm -rf /var/cache/apk/* - - ENV PATH="/usr/bin:/bin:/usr/sbin:/sbin" - - # Switch to a non-root user for security - USER chromium - WORKDIR /home/chromium - - # Idle: container does nothing except stay alive - CMD ["tail", "-f", "/dev/null"] - \ No newline at end of file diff --git a/crawl4ai/browser/docker/debian/connect.Dockerfile b/crawl4ai/browser/docker/debian/connect.Dockerfile deleted file mode 100644 index ee0f25b4..00000000 --- a/crawl4ai/browser/docker/debian/connect.Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -# Use Debian 12 (Bookworm) slim for a small, stable base image -FROM debian:bookworm-slim - -ENV DEBIAN_FRONTEND=noninteractive - -# Install Chromium, socat, and basic fonts -RUN apt-get update && apt-get install -y --no-install-recommends \ - chromium \ - wget \ - curl \ - socat \ - fonts-freefont-ttf \ - fonts-noto-color-emoji && \ - apt-get clean && rm -rf /var/lib/apt/lists/* - -# Copy start.sh and make it executable -COPY start.sh /start.sh -RUN chmod +x /start.sh - -# Expose socat port (use host mapping, e.g. -p 9225:9223) -EXPOSE 9223 - -ENTRYPOINT ["/start.sh"] diff --git a/crawl4ai/browser/docker_registry.py b/crawl4ai/browser/docker_registry.py deleted file mode 100644 index 03594e2e..00000000 --- a/crawl4ai/browser/docker_registry.py +++ /dev/null @@ -1,264 +0,0 @@ -"""Docker registry module for Crawl4AI. - -This module provides a registry system for tracking and reusing Docker containers -across browser sessions, improving performance and resource utilization. -""" - -import os -import json -import time -from typing import Dict, Optional - -from ..utils import get_home_folder - - -class DockerRegistry: - """Manages a registry of Docker containers used for browser automation. - - This registry tracks containers by configuration hash, allowing reuse of appropriately - configured containers instead of creating new ones for each session. - - Attributes: - registry_file (str): Path to the registry file - containers (dict): Dictionary of container information - port_map (dict): Map of host ports to container IDs - last_port (int): Last port assigned - """ - - def __init__(self, registry_file: Optional[str] = None): - """Initialize the registry with an optional path to the registry file. - - Args: - registry_file: Path to the registry file. If None, uses default path. - """ - # Use the same file path as BuiltinBrowserStrategy by default - self.registry_file = registry_file or os.path.join(get_home_folder(), "builtin-browser", "browser_config.json") - self.containers = {} # Still maintain this for backward compatibility - self.port_map = {} # Will be populated from the shared file - self.last_port = 9222 - self.load() - - def load(self): - """Load container registry from file.""" - if os.path.exists(self.registry_file): - try: - with open(self.registry_file, 'r') as f: - registry_data = json.load(f) - - # Initialize port_map if not present - if "port_map" not in registry_data: - registry_data["port_map"] = {} - - self.port_map = registry_data.get("port_map", {}) - - # Extract container information from port_map entries of type "docker" - self.containers = {} - for port_str, browser_info in self.port_map.items(): - if browser_info.get("browser_type") == "docker" and "container_id" in browser_info: - container_id = browser_info["container_id"] - self.containers[container_id] = { - "host_port": int(port_str), - "config_hash": browser_info.get("config_hash", ""), - "created_at": browser_info.get("created_at", time.time()) - } - - # Get last port if available - if "last_port" in registry_data: - self.last_port = registry_data["last_port"] - else: - # Find highest port in port_map - ports = [int(p) for p in self.port_map.keys() if p.isdigit()] - self.last_port = max(ports + [9222]) - - except Exception as e: - # Reset to defaults on error - print(f"Error loading registry: {e}") - self.containers = {} - self.port_map = {} - self.last_port = 9222 - else: - # Initialize with defaults if file doesn't exist - self.containers = {} - self.port_map = {} - self.last_port = 9222 - - def save(self): - """Save container registry to file.""" - # First load the current file to avoid overwriting other browser types - current_data = {"port_map": {}, "last_port": self.last_port} - if os.path.exists(self.registry_file): - try: - with open(self.registry_file, 'r') as f: - current_data = json.load(f) - except Exception: - pass - - # Create a new port_map dictionary - updated_port_map = {} - - # First, copy all non-docker entries from the existing port_map - for port_str, browser_info in current_data.get("port_map", {}).items(): - if browser_info.get("browser_type") != "docker": - updated_port_map[port_str] = browser_info - - # Then add all current docker container entries - for container_id, container_info in self.containers.items(): - port_str = str(container_info["host_port"]) - updated_port_map[port_str] = { - "browser_type": "docker", - "container_id": container_id, - "cdp_url": f"http://localhost:{port_str}", - "config_hash": container_info["config_hash"], - "created_at": container_info["created_at"] - } - - # Replace the port_map with our updated version - current_data["port_map"] = updated_port_map - - # Update last_port - current_data["last_port"] = self.last_port - - # Ensure directory exists - os.makedirs(os.path.dirname(self.registry_file), exist_ok=True) - - # Save the updated data - with open(self.registry_file, 'w') as f: - json.dump(current_data, f, indent=2) - - def register_container(self, container_id: str, host_port: int, config_hash: str, cdp_json_config: Optional[str] = None): - """Register a container with its configuration hash and port mapping. - - Args: - container_id: Docker container ID - host_port: Host port mapped to container - config_hash: Hash of configuration used to create container - cdp_json_config: CDP JSON configuration if available - """ - self.containers[container_id] = { - "host_port": host_port, - "config_hash": config_hash, - "created_at": time.time() - } - - # Update port_map to maintain compatibility with BuiltinBrowserStrategy - port_str = str(host_port) - self.port_map[port_str] = { - "browser_type": "docker", - "container_id": container_id, - "cdp_url": f"http://localhost:{port_str}", - "config_hash": config_hash, - "created_at": time.time() - } - - if cdp_json_config: - self.port_map[port_str]["cdp_json_config"] = cdp_json_config - - self.save() - - def unregister_container(self, container_id: str): - """Unregister a container. - - Args: - container_id: Docker container ID to unregister - """ - if container_id in self.containers: - host_port = self.containers[container_id]["host_port"] - port_str = str(host_port) - - # Remove from port_map - if port_str in self.port_map: - del self.port_map[port_str] - - # Remove from containers - del self.containers[container_id] - - self.save() - - async def find_container_by_config(self, config_hash: str, docker_utils) -> Optional[str]: - """Find a container that matches the given configuration hash. - - Args: - config_hash: Hash of configuration to match - docker_utils: DockerUtils instance to check running containers - - Returns: - Container ID if found, None otherwise - """ - # Search through port_map for entries with matching config_hash - for port_str, browser_info in self.port_map.items(): - if (browser_info.get("browser_type") == "docker" and - browser_info.get("config_hash") == config_hash and - "container_id" in browser_info): - - container_id = browser_info["container_id"] - if await docker_utils.is_container_running(container_id): - return container_id - - return None - - def get_container_host_port(self, container_id: str) -> Optional[int]: - """Get the host port mapped to the container. - - Args: - container_id: Docker container ID - - Returns: - Host port if container is registered, None otherwise - """ - if container_id in self.containers: - return self.containers[container_id]["host_port"] - return None - - def get_next_available_port(self, docker_utils) -> int: - """Get the next available host port for Docker mapping. - - Args: - docker_utils: DockerUtils instance to check port availability - - Returns: - Available port number - """ - # Start from last port + 1 - port = self.last_port + 1 - - # Check if port is in use (either in our registry or system-wide) - while str(port) in self.port_map or docker_utils.is_port_in_use(port): - port += 1 - - # Update last port - self.last_port = port - self.save() - - return port - - def get_container_config_hash(self, container_id: str) -> Optional[str]: - """Get the configuration hash for a container. - - Args: - container_id: Docker container ID - - Returns: - Configuration hash if container is registered, None otherwise - """ - if container_id in self.containers: - return self.containers[container_id]["config_hash"] - return None - - def cleanup_stale_containers(self, docker_utils): - """Clean up containers that are no longer running. - - Args: - docker_utils: DockerUtils instance to check container status - """ - to_remove = [] - - # Find containers that are no longer running - for port_str, browser_info in self.port_map.items(): - if browser_info.get("browser_type") == "docker" and "container_id" in browser_info: - container_id = browser_info["container_id"] - if not docker_utils.is_container_running(container_id): - to_remove.append(container_id) - - # Remove stale containers - for container_id in to_remove: - self.unregister_container(container_id) \ No newline at end of file diff --git a/crawl4ai/browser/docker_utils.py b/crawl4ai/browser/docker_utils.py deleted file mode 100644 index f93a51b9..00000000 --- a/crawl4ai/browser/docker_utils.py +++ /dev/null @@ -1,661 +0,0 @@ -import os -import json -import asyncio -import hashlib -import tempfile -import shutil -import socket -import subprocess -from typing import Dict, List, Optional, Tuple, Union - - -class DockerUtils: - """Utility class for Docker operations in browser automation. - - This class provides methods for managing Docker images, containers, - and related operations needed for browser automation. It handles - image building, container lifecycle, port management, and registry operations. - - Attributes: - DOCKER_FOLDER (str): Path to folder containing Docker files - DOCKER_CONNECT_FILE (str): Path to Dockerfile for connect mode - DOCKER_LAUNCH_FILE (str): Path to Dockerfile for launch mode - DOCKER_START_SCRIPT (str): Path to startup script for connect mode - DEFAULT_CONNECT_IMAGE (str): Default image name for connect mode - DEFAULT_LAUNCH_IMAGE (str): Default image name for launch mode - logger: Optional logger instance - """ - - # File paths for Docker resources - DOCKER_FOLDER = os.path.join(os.path.dirname(__file__), "docker") - DOCKER_CONNECT_FILE = os.path.join(DOCKER_FOLDER, "connect.Dockerfile") - DOCKER_LAUNCH_FILE = os.path.join(DOCKER_FOLDER, "launch.Dockerfile") - DOCKER_START_SCRIPT = os.path.join(DOCKER_FOLDER, "start.sh") - - # Default image names - DEFAULT_CONNECT_IMAGE = "crawl4ai/browser-connect:latest" - DEFAULT_LAUNCH_IMAGE = "crawl4ai/browser-launch:latest" - - def __init__(self, logger=None): - """Initialize Docker utilities. - - Args: - logger: Optional logger for recording operations - """ - self.logger = logger - - # Image Management Methods - - async def check_image_exists(self, image_name: str) -> bool: - """Check if a Docker image exists. - - Args: - image_name: Name of the Docker image to check - - Returns: - bool: True if the image exists, False otherwise - """ - cmd = ["docker", "image", "inspect", image_name] - - try: - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - _, _ = await process.communicate() - return process.returncode == 0 - except Exception as e: - if self.logger: - self.logger.debug( - f"Error checking if image exists: {str(e)}", tag="DOCKER" - ) - return False - - async def build_docker_image( - self, - image_name: str, - dockerfile_path: str, - files_to_copy: Dict[str, str] = None, - ) -> bool: - """Build a Docker image from a Dockerfile. - - Args: - image_name: Name to give the built image - dockerfile_path: Path to the Dockerfile - files_to_copy: Dict of {dest_name: source_path} for files to copy to build context - - Returns: - bool: True if image was built successfully, False otherwise - """ - # Create a temporary build context - with tempfile.TemporaryDirectory() as temp_dir: - # Copy the Dockerfile - shutil.copy(dockerfile_path, os.path.join(temp_dir, "Dockerfile")) - - # Copy any additional files needed - if files_to_copy: - for dest_name, source_path in files_to_copy.items(): - shutil.copy(source_path, os.path.join(temp_dir, dest_name)) - - # Build the image - cmd = ["docker", "build", "-t", image_name, temp_dir] - - if self.logger: - self.logger.debug( - f"Building Docker image with command: {' '.join(cmd)}", tag="DOCKER" - ) - - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - if process.returncode != 0: - if self.logger: - self.logger.error( - message="Failed to build Docker image: {error}", - tag="DOCKER", - params={"error": stderr.decode()}, - ) - return False - - if self.logger: - self.logger.success( - f"Successfully built Docker image: {image_name}", tag="DOCKER" - ) - return True - - async def ensure_docker_image_exists( - self, image_name: str, mode: str = "connect" - ) -> str: - """Ensure the required Docker image exists, creating it if necessary. - - Args: - image_name: Name of the Docker image - mode: Either "connect" or "launch" to determine which image to build - - Returns: - str: Name of the available Docker image - - Raises: - Exception: If image doesn't exist and can't be built - """ - # If image name is not specified, use default based on mode - if not image_name: - image_name = ( - self.DEFAULT_CONNECT_IMAGE - if mode == "connect" - else self.DEFAULT_LAUNCH_IMAGE - ) - - # Check if the image already exists - if await self.check_image_exists(image_name): - if self.logger: - self.logger.debug( - f"Docker image {image_name} already exists", tag="DOCKER" - ) - return image_name - - # If we're using a custom image that doesn't exist, warn and fail - if ( - image_name != self.DEFAULT_CONNECT_IMAGE - and image_name != self.DEFAULT_LAUNCH_IMAGE - ): - if self.logger: - self.logger.warning( - f"Custom Docker image {image_name} not found and cannot be automatically created", - tag="DOCKER", - ) - raise Exception(f"Docker image {image_name} not found") - - # Build the appropriate default image - if self.logger: - self.logger.info( - f"Docker image {image_name} not found, creating it now...", tag="DOCKER" - ) - - if mode == "connect": - success = await self.build_docker_image( - image_name, - self.DOCKER_CONNECT_FILE, - {"start.sh": self.DOCKER_START_SCRIPT}, - ) - else: - success = await self.build_docker_image(image_name, self.DOCKER_LAUNCH_FILE) - - if not success: - raise Exception(f"Failed to create Docker image {image_name}") - - return image_name - - # Container Management Methods - - async def create_container( - self, - image_name: str, - host_port: int, - container_name: Optional[str] = None, - volumes: List[str] = None, - network: Optional[str] = None, - env_vars: Dict[str, str] = None, - cpu_limit: float = 1.0, - memory_limit: str = "1.5g", - extra_args: List[str] = None, - ) -> Optional[str]: - """Create a new Docker container. - - Args: - image_name: Docker image to use - host_port: Port on host to map to container port 9223 - container_name: Optional name for the container - volumes: List of volume mappings (e.g., ["host_path:container_path"]) - network: Optional Docker network to use - env_vars: Dictionary of environment variables - cpu_limit: CPU limit for the container - memory_limit: Memory limit for the container - extra_args: Additional docker run arguments - - Returns: - str: Container ID if successful, None otherwise - """ - # Prepare container command - cmd = [ - "docker", - "run", - "--detach", - ] - - # Add container name if specified - if container_name: - cmd.extend(["--name", container_name]) - - # Add port mapping - cmd.extend(["-p", f"{host_port}:9223"]) - - # Add volumes - if volumes: - for volume in volumes: - cmd.extend(["-v", volume]) - - # Add network if specified - if network: - cmd.extend(["--network", network]) - - # Add environment variables - if env_vars: - for key, value in env_vars.items(): - cmd.extend(["-e", f"{key}={value}"]) - - # Add CPU and memory limits - if cpu_limit: - cmd.extend(["--cpus", str(cpu_limit)]) - if memory_limit: - cmd.extend(["--memory", memory_limit]) - cmd.extend(["--memory-swap", memory_limit]) - if self.logger: - self.logger.debug( - f"Setting CPU limit: {cpu_limit}, Memory limit: {memory_limit}", - tag="DOCKER", - ) - - # Add extra args - if extra_args: - cmd.extend(extra_args) - - # Add image - cmd.append(image_name) - - if self.logger: - self.logger.debug( - f"Creating Docker container with command: {' '.join(cmd)}", tag="DOCKER" - ) - - # Run docker command - try: - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - if process.returncode != 0: - if self.logger: - self.logger.error( - message="Failed to create Docker container: {error}", - tag="DOCKER", - params={"error": stderr.decode()}, - ) - return None - - # Get container ID - container_id = stdout.decode().strip() - - if self.logger: - self.logger.success( - f"Created Docker container: {container_id[:12]}", tag="DOCKER" - ) - - return container_id - - except Exception as e: - if self.logger: - self.logger.error( - message="Error creating Docker container: {error}", - tag="DOCKER", - params={"error": str(e)}, - ) - return None - - async def is_container_running(self, container_id: str) -> bool: - """Check if a container is running. - - Args: - container_id: ID of the container to check - - Returns: - bool: True if the container is running, False otherwise - """ - cmd = ["docker", "inspect", "--format", "{{.State.Running}}", container_id] - - try: - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - stdout, _ = await process.communicate() - - return process.returncode == 0 and stdout.decode().strip() == "true" - except Exception as e: - if self.logger: - self.logger.debug( - f"Error checking if container is running: {str(e)}", tag="DOCKER" - ) - return False - - async def wait_for_container_ready( - self, container_id: str, timeout: int = 30 - ) -> bool: - """Wait for the container to be in running state. - - Args: - container_id: ID of the container to wait for - timeout: Maximum time to wait in seconds - - Returns: - bool: True if container is ready, False if timeout occurred - """ - for _ in range(timeout): - if await self.is_container_running(container_id): - return True - await asyncio.sleep(1) - - if self.logger: - self.logger.warning( - f"Container {container_id[:12]} not ready after {timeout}s timeout", - tag="DOCKER", - ) - return False - - async def stop_container(self, container_id: str) -> bool: - """Stop a Docker container. - - Args: - container_id: ID of the container to stop - - Returns: - bool: True if stopped successfully, False otherwise - """ - cmd = ["docker", "stop", container_id] - - try: - process = await asyncio.create_subprocess_exec(*cmd) - await process.communicate() - - if self.logger: - self.logger.debug( - f"Stopped container: {container_id[:12]}", tag="DOCKER" - ) - - return process.returncode == 0 - except Exception as e: - if self.logger: - self.logger.warning( - message="Failed to stop container: {error}", - tag="DOCKER", - params={"error": str(e)}, - ) - return False - - async def remove_container(self, container_id: str, force: bool = True) -> bool: - """Remove a Docker container. - - Args: - container_id: ID of the container to remove - force: Whether to force removal - - Returns: - bool: True if removed successfully, False otherwise - """ - cmd = ["docker", "rm"] - if force: - cmd.append("-f") - cmd.append(container_id) - - try: - process = await asyncio.create_subprocess_exec(*cmd) - await process.communicate() - - if self.logger: - self.logger.debug( - f"Removed container: {container_id[:12]}", tag="DOCKER" - ) - - return process.returncode == 0 - except Exception as e: - if self.logger: - self.logger.warning( - message="Failed to remove container: {error}", - tag="DOCKER", - params={"error": str(e)}, - ) - return False - - # Container Command Execution Methods - - async def exec_in_container( - self, container_id: str, command: List[str], detach: bool = False - ) -> Tuple[int, str, str]: - """Execute a command in a running container. - - Args: - container_id: ID of the container - command: Command to execute as a list of strings - detach: Whether to run the command in detached mode - - Returns: - Tuple of (return_code, stdout, stderr) - """ - cmd = ["docker", "exec"] - if detach: - cmd.append("-d") - cmd.append(container_id) - cmd.extend(command) - - try: - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - return process.returncode, stdout.decode(), stderr.decode() - except Exception as e: - if self.logger: - self.logger.error( - message="Error executing command in container: {error}", - tag="DOCKER", - params={"error": str(e)}, - ) - return -1, "", str(e) - - async def start_socat_in_container(self, container_id: str) -> bool: - """Start socat in the container to map port 9222 to 9223. - - Args: - container_id: ID of the container - - Returns: - bool: True if socat started successfully, False otherwise - """ - # Command to run socat as a background process - cmd = ["socat", "TCP-LISTEN:9223,fork", "TCP:localhost:9222"] - - returncode, _, stderr = await self.exec_in_container( - container_id, cmd, detach=True - ) - - if returncode != 0: - if self.logger: - self.logger.error( - message="Failed to start socat in container: {error}", - tag="DOCKER", - params={"error": stderr}, - ) - return False - - if self.logger: - self.logger.debug( - f"Started socat in container: {container_id[:12]}", tag="DOCKER" - ) - - # Wait a moment for socat to start - await asyncio.sleep(1) - return True - - async def launch_chrome_in_container( - self, container_id: str, browser_args: List[str] - ) -> bool: - """Launch Chrome inside the container with specified arguments. - - Args: - container_id: ID of the container - browser_args: Chrome command line arguments - - Returns: - bool: True if Chrome started successfully, False otherwise - """ - # Build Chrome command - chrome_cmd = ["chromium"] - chrome_cmd.extend(browser_args) - - returncode, _, stderr = await self.exec_in_container( - container_id, chrome_cmd, detach=True - ) - - if returncode != 0: - if self.logger: - self.logger.error( - message="Failed to launch Chrome in container: {error}", - tag="DOCKER", - params={"error": stderr}, - ) - return False - - if self.logger: - self.logger.debug( - f"Launched Chrome in container: {container_id[:12]}", tag="DOCKER" - ) - - return True - - async def get_process_id_in_container( - self, container_id: str, process_name: str - ) -> Optional[int]: - """Get the process ID for a process in the container. - - Args: - container_id: ID of the container - process_name: Name pattern to search for - - Returns: - int: Process ID if found, None otherwise - """ - cmd = ["pgrep", "-f", process_name] - - returncode, stdout, _ = await self.exec_in_container(container_id, cmd) - - if returncode == 0 and stdout.strip(): - pid = int(stdout.strip().split("\n")[0]) - return pid - - return None - - async def stop_process_in_container(self, container_id: str, pid: int) -> bool: - """Stop a process in the container by PID. - - Args: - container_id: ID of the container - pid: Process ID to stop - - Returns: - bool: True if process was stopped, False otherwise - """ - cmd = ["kill", "-TERM", str(pid)] - - returncode, _, stderr = await self.exec_in_container(container_id, cmd) - - if returncode != 0: - if self.logger: - self.logger.warning( - message="Failed to stop process in container: {error}", - tag="DOCKER", - params={"error": stderr}, - ) - return False - - if self.logger: - self.logger.debug( - f"Stopped process {pid} in container: {container_id[:12]}", tag="DOCKER" - ) - - return True - - # Network and Port Methods - - async def wait_for_cdp_ready(self, host_port: int, timeout: int = 10) -> dict: - """Wait for the CDP endpoint to be ready. - - Args: - host_port: Port to check for CDP endpoint - timeout: Maximum time to wait in seconds - - Returns: - dict: CDP JSON config if ready, None if timeout occurred - """ - import aiohttp - - url = f"http://localhost:{host_port}/json/version" - - for _ in range(timeout): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=1) as response: - if response.status == 200: - if self.logger: - self.logger.debug( - f"CDP endpoint ready on port {host_port}", - tag="DOCKER", - ) - cdp_json_config = await response.json() - if self.logger: - self.logger.debug( - f"CDP JSON config: {cdp_json_config}", tag="DOCKER" - ) - return cdp_json_config - except Exception: - pass - await asyncio.sleep(1) - - if self.logger: - self.logger.warning( - f"CDP endpoint not ready on port {host_port} after {timeout}s timeout", - tag="DOCKER", - ) - return None - - def is_port_in_use(self, port: int) -> bool: - """Check if a port is already in use on the host. - - Args: - port: Port number to check - - Returns: - bool: True if port is in use, False otherwise - """ - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex(("localhost", port)) == 0 - - def get_next_available_port(self, start_port: int = 9223) -> int: - """Get the next available port starting from a given port. - - Args: - start_port: Port number to start checking from - - Returns: - int: First available port number - """ - port = start_port - while self.is_port_in_use(port): - port += 1 - return port - - # Configuration Hash Methods - - def generate_config_hash(self, config_dict: Dict) -> str: - """Generate a hash of the configuration for container matching. - - Args: - config_dict: Dictionary of configuration parameters - - Returns: - str: Hash string uniquely identifying this configuration - """ - # Convert to canonical JSON string and hash - config_json = json.dumps(config_dict, sort_keys=True) - return hashlib.sha256(config_json.encode()).hexdigest() diff --git a/crawl4ai/browser/manager copy.py b/crawl4ai/browser/manager copy.py deleted file mode 100644 index 97aaf587..00000000 --- a/crawl4ai/browser/manager copy.py +++ /dev/null @@ -1,177 +0,0 @@ -"""Browser manager module for Crawl4AI. - -This module provides a central browser management class that uses the -strategy pattern internally while maintaining the existing API. -It also implements a page pooling mechanism for improved performance. -""" - -from typing import Optional, Tuple, List - -from playwright.async_api import Page, BrowserContext - -from ..async_logger import AsyncLogger -from ..async_configs import BrowserConfig, CrawlerRunConfig - -from .strategies import ( - BaseBrowserStrategy, - PlaywrightBrowserStrategy, - CDPBrowserStrategy, - BuiltinBrowserStrategy, - DockerBrowserStrategy -) - -class BrowserManager: - """Main interface for browser management in Crawl4AI. - - This class maintains backward compatibility with the existing implementation - while using the strategy pattern internally for different browser types. - - Attributes: - config (BrowserConfig): Configuration object containing all browser settings - logger: Logger instance for recording events and errors - browser: The browser instance - default_context: The default browser context - managed_browser: The managed browser instance - playwright: The Playwright instance - sessions: Dictionary to store session information - session_ttl: Session timeout in seconds - """ - - def __init__(self, browser_config: Optional[BrowserConfig] = None, logger: Optional[AsyncLogger] = None): - """Initialize the BrowserManager with a browser configuration. - - Args: - browser_config: Configuration object containing all browser settings - logger: Logger instance for recording events and errors - """ - self.config = browser_config or BrowserConfig() - self.logger = logger - - # Create strategy based on configuration - self.strategy = self._create_strategy() - - # Initialize state variables for compatibility with existing code - self.browser = None - self.default_context = None - self.managed_browser = None - self.playwright = None - - # For session management (from existing implementation) - self.sessions = {} - self.session_ttl = 1800 # 30 minutes - - def _create_strategy(self) -> BaseBrowserStrategy: - """Create appropriate browser strategy based on configuration. - - Returns: - BaseBrowserStrategy: The selected browser strategy - """ - if self.config.browser_mode == "builtin": - return BuiltinBrowserStrategy(self.config, self.logger) - elif self.config.browser_mode == "docker": - if DockerBrowserStrategy is None: - if self.logger: - self.logger.error( - "Docker browser strategy requested but not available. " - "Falling back to PlaywrightBrowserStrategy.", - tag="BROWSER" - ) - return PlaywrightBrowserStrategy(self.config, self.logger) - return DockerBrowserStrategy(self.config, self.logger) - elif self.config.browser_mode == "cdp" or self.config.cdp_url or self.config.use_managed_browser: - return CDPBrowserStrategy(self.config, self.logger) - else: - return PlaywrightBrowserStrategy(self.config, self.logger) - - async def start(self): - """Start the browser instance and set up the default context. - - Returns: - self: For method chaining - """ - # Start the strategy - await self.strategy.start() - - # Update legacy references - self.browser = self.strategy.browser - self.default_context = self.strategy.default_context - - # Set browser process reference (for CDP strategy) - if hasattr(self.strategy, 'browser_process'): - self.managed_browser = self.strategy - - # Set Playwright reference - self.playwright = self.strategy.playwright - - # Sync sessions if needed - if hasattr(self.strategy, 'sessions'): - self.sessions = self.strategy.sessions - self.session_ttl = self.strategy.session_ttl - - return self - - async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - """Get a page for the given configuration. - - Args: - crawlerRunConfig: Configuration object for the crawler run - - Returns: - Tuple of (Page, BrowserContext) - """ - # Delegate to strategy - page, context = await self.strategy.get_page(crawlerRunConfig) - - # Sync sessions if needed - if hasattr(self.strategy, 'sessions'): - self.sessions = self.strategy.sessions - - return page, context - - async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]: - """Get multiple pages with the same configuration. - - This method efficiently creates multiple browser pages using the same configuration, - which is useful for parallel crawling of multiple URLs. - - Args: - crawlerRunConfig: Configuration for the pages - count: Number of pages to create - - Returns: - List of (Page, Context) tuples - """ - # Delegate to strategy - pages = await self.strategy.get_pages(crawlerRunConfig, count) - - # Sync sessions if needed - if hasattr(self.strategy, 'sessions'): - self.sessions = self.strategy.sessions - - return pages - - # Just for legacy compatibility - async def kill_session(self, session_id: str): - """Kill a browser session and clean up resources. - - Args: - session_id: The session ID to kill - """ - # Handle kill_session via our strategy if it supports it - await self.strategy.kill_session(session_id) - - # sync sessions if needed - if hasattr(self.strategy, 'sessions'): - self.sessions = self.strategy.sessions - - async def close(self): - """Close the browser and clean up resources.""" - # Delegate to strategy - await self.strategy.close() - - # Reset legacy references - self.browser = None - self.default_context = None - self.managed_browser = None - self.playwright = None - self.sessions = {} diff --git a/crawl4ai/browser/manager.py b/crawl4ai/browser/manager.py deleted file mode 100644 index 429d2516..00000000 --- a/crawl4ai/browser/manager.py +++ /dev/null @@ -1,853 +0,0 @@ -"""Browser manager module for Crawl4AI. - -This module provides a central browser management class that uses the -strategy pattern internally while maintaining the existing API. -It also implements browser pooling for improved performance. -""" - -import asyncio -import hashlib -import json -import math -from enum import Enum -from typing import Dict, List, Optional, Tuple, Any - -from playwright.async_api import Page, BrowserContext - -from ..async_logger import AsyncLogger -from ..async_configs import BrowserConfig, CrawlerRunConfig - -from .strategies import ( - BaseBrowserStrategy, - PlaywrightBrowserStrategy, - CDPBrowserStrategy, - BuiltinBrowserStrategy, - DockerBrowserStrategy -) - -class UnavailableBehavior(Enum): - """Behavior when no browser is available.""" - ON_DEMAND = "on_demand" # Create new browser on demand - PENDING = "pending" # Wait until a browser is available - EXCEPTION = "exception" # Raise an exception - - -class BrowserManager: - """Main interface for browser management and pooling in Crawl4AI. - - This class maintains backward compatibility with the existing implementation - while using the strategy pattern internally for different browser types. - It also implements browser pooling for improved performance. - - Attributes: - config (BrowserConfig): Default configuration object for browsers - logger (AsyncLogger): Logger instance for recording events and errors - browser_pool (Dict): Dictionary to store browser instances by configuration - browser_in_use (Dict): Dictionary to track which browsers are in use - request_queues (Dict): Queues for pending requests by configuration - unavailable_behavior (UnavailableBehavior): Behavior when no browser is available - """ - - def __init__( - self, - browser_config: Optional[BrowserConfig] = None, - logger: Optional[AsyncLogger] = None, - unavailable_behavior: UnavailableBehavior = UnavailableBehavior.EXCEPTION, - max_browsers_per_config: int = 10, - max_pages_per_browser: int = 5 - ): - """Initialize the BrowserManager with a browser configuration. - - Args: - browser_config: Configuration object containing all browser settings - logger: Logger instance for recording events and errors - unavailable_behavior: Behavior when no browser is available - max_browsers_per_config: Maximum number of browsers per configuration - max_pages_per_browser: Maximum number of pages per browser - """ - self.config = browser_config or BrowserConfig() - self.logger = logger - self.unavailable_behavior = unavailable_behavior - self.max_browsers_per_config = max_browsers_per_config - self.max_pages_per_browser = max_pages_per_browser - - # Browser pool management - self.browser_pool = {} # config_hash -> list of browser strategies - self.browser_in_use = {} # strategy instance -> Boolean - self.request_queues = {} # config_hash -> asyncio.Queue() - self._browser_locks = {} # config_hash -> asyncio.Lock() - self._browser_pool_lock = asyncio.Lock() # Global lock for pool modifications - - # Page pool management - self.page_pool = {} # (browser_config_hash, crawler_config_hash) -> list of (page, context, strategy) - self._page_pool_lock = asyncio.Lock() - - self.browser_page_counts = {} # strategy instance -> current page count - self._page_count_lock = asyncio.Lock() # Lock for thread-safe access to page counts - - # For session management (from existing implementation) - self.sessions = {} - self.session_ttl = 1800 # 30 minutes - - # For legacy compatibility - self.browser = None - self.default_context = None - self.managed_browser = None - self.playwright = None - self.strategy = None - - def _create_browser_config_hash(self, browser_config: BrowserConfig) -> str: - """Create a hash of the browser configuration for browser pooling. - - Args: - browser_config: Browser configuration - - Returns: - str: Hash of the browser configuration - """ - # Convert config to dictionary, excluding any callable objects - config_dict = browser_config.__dict__.copy() - for key in list(config_dict.keys()): - if callable(config_dict[key]): - del config_dict[key] - - # Convert to canonical JSON string - config_json = json.dumps(config_dict, sort_keys=True, default=str) - - # Hash the JSON - config_hash = hashlib.sha256(config_json.encode()).hexdigest() - return config_hash - - def _create_strategy(self, browser_config: BrowserConfig) -> BaseBrowserStrategy: - """Create appropriate browser strategy based on configuration. - - Args: - browser_config: Browser configuration - - Returns: - BaseBrowserStrategy: The selected browser strategy - """ - if browser_config.browser_mode == "builtin": - return BuiltinBrowserStrategy(browser_config, self.logger) - elif browser_config.browser_mode == "docker": - if DockerBrowserStrategy is None: - if self.logger: - self.logger.error( - "Docker browser strategy requested but not available. " - "Falling back to PlaywrightBrowserStrategy.", - tag="BROWSER" - ) - return PlaywrightBrowserStrategy(browser_config, self.logger) - return DockerBrowserStrategy(browser_config, self.logger) - elif browser_config.browser_mode == "cdp" or browser_config.cdp_url or browser_config.use_managed_browser: - return CDPBrowserStrategy(browser_config, self.logger) - else: - return PlaywrightBrowserStrategy(browser_config, self.logger) - - async def initialize_pool( - self, - browser_configs: List[BrowserConfig] = None, - browsers_per_config: int = 1, - page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None - ): - """Initialize the browser pool with multiple browser configurations. - - Args: - browser_configs: List of browser configurations to initialize - browsers_per_config: Number of browser instances per configuration - page_configs: Optional list of (browser_config, crawler_run_config, count) tuples - for pre-warming pages - - Returns: - self: For method chaining - """ - if not browser_configs: - browser_configs = [self.config] - - # Calculate how many browsers we'll need based on page_configs - browsers_needed = {} - if page_configs: - for browser_config, _, page_count in page_configs: - config_hash = self._create_browser_config_hash(browser_config) - # Calculate browsers based on max_pages_per_browser - browsers_needed_for_config = math.ceil(page_count / self.max_pages_per_browser) - browsers_needed[config_hash] = max( - browsers_needed.get(config_hash, 0), - browsers_needed_for_config - ) - - # Adjust browsers_per_config if needed to ensure enough capacity - config_browsers_needed = {} - for browser_config in browser_configs: - config_hash = self._create_browser_config_hash(browser_config) - - # Estimate browsers needed based on page requirements - browsers_for_config = browsers_per_config - if config_hash in browsers_needed: - browsers_for_config = max(browsers_for_config, browsers_needed[config_hash]) - - config_browsers_needed[config_hash] = browsers_for_config - - # Update max_browsers_per_config if needed - if browsers_for_config > self.max_browsers_per_config: - self.max_browsers_per_config = browsers_for_config - if self.logger: - self.logger.info( - f"Increased max_browsers_per_config to {browsers_for_config} to accommodate page requirements", - tag="POOL" - ) - - # Initialize locks and queues for each config - async with self._browser_pool_lock: - for browser_config in browser_configs: - config_hash = self._create_browser_config_hash(browser_config) - - # Initialize lock for this config if needed - if config_hash not in self._browser_locks: - self._browser_locks[config_hash] = asyncio.Lock() - - # Initialize queue for this config if needed - if config_hash not in self.request_queues: - self.request_queues[config_hash] = asyncio.Queue() - - # Initialize pool for this config if needed - if config_hash not in self.browser_pool: - self.browser_pool[config_hash] = [] - - # Create browser instances for each configuration in parallel - browser_tasks = [] - - for browser_config in browser_configs: - config_hash = self._create_browser_config_hash(browser_config) - browsers_to_create = config_browsers_needed.get( - config_hash, - browsers_per_config - ) - len(self.browser_pool.get(config_hash, [])) - - if browsers_to_create <= 0: - continue - - for _ in range(browsers_to_create): - # Create a task for each browser initialization - task = self._create_and_add_browser(browser_config, config_hash) - browser_tasks.append(task) - - # Wait for all browser initializations to complete - if browser_tasks: - if self.logger: - self.logger.info(f"Initializing {len(browser_tasks)} browsers in parallel...", tag="POOL") - await asyncio.gather(*browser_tasks) - - # Pre-warm pages if requested - if page_configs: - page_tasks = [] - for browser_config, crawler_run_config, count in page_configs: - task = self._prewarm_pages(browser_config, crawler_run_config, count) - page_tasks.append(task) - - if page_tasks: - if self.logger: - self.logger.info(f"Pre-warming pages with {len(page_tasks)} configurations...", tag="POOL") - await asyncio.gather(*page_tasks) - - # Update legacy references - if self.browser_pool and next(iter(self.browser_pool.values()), []): - strategy = next(iter(self.browser_pool.values()))[0] - self.strategy = strategy - self.browser = strategy.browser - self.default_context = strategy.default_context - self.playwright = strategy.playwright - - return self - - async def _create_and_add_browser(self, browser_config: BrowserConfig, config_hash: str): - """Create and add a browser to the pool. - - Args: - browser_config: Browser configuration - config_hash: Hash of the configuration - """ - try: - strategy = self._create_strategy(browser_config) - await strategy.start() - - async with self._browser_pool_lock: - if config_hash not in self.browser_pool: - self.browser_pool[config_hash] = [] - self.browser_pool[config_hash].append(strategy) - self.browser_in_use[strategy] = False - - if self.logger: - self.logger.debug( - f"Added browser to pool: {browser_config.browser_type} " - f"({browser_config.browser_mode})", - tag="POOL" - ) - except Exception as e: - if self.logger: - self.logger.error( - f"Failed to create browser: {str(e)}", - tag="POOL" - ) - raise - - def _make_config_signature(self, crawlerRunConfig: CrawlerRunConfig) -> str: - """Create a signature hash from crawler configuration. - - Args: - crawlerRunConfig: Crawler run configuration - - Returns: - str: Hash of the crawler configuration - """ - config_dict = crawlerRunConfig.__dict__.copy() - # Exclude items that do not affect page creation - ephemeral_keys = [ - "session_id", - "js_code", - "scraping_strategy", - "extraction_strategy", - "chunking_strategy", - "cache_mode", - "content_filter", - "semaphore_count", - "url" - ] - for key in ephemeral_keys: - if key in config_dict: - del config_dict[key] - - # Convert to canonical JSON string - config_json = json.dumps(config_dict, sort_keys=True, default=str) - - # Hash the JSON - config_hash = hashlib.sha256(config_json.encode("utf-8")).hexdigest() - return config_hash - - async def _prewarm_pages( - self, - browser_config: BrowserConfig, - crawler_run_config: CrawlerRunConfig, - count: int - ): - """Pre-warm pages for a specific configuration. - - Args: - browser_config: Browser configuration - crawler_run_config: Crawler run configuration - count: Number of pages to pre-warm - """ - try: - # Create individual page tasks and run them in parallel - browser_config_hash = self._create_browser_config_hash(browser_config) - crawler_config_hash = self._make_config_signature(crawler_run_config) - async def get_single_page(): - strategy = await self.get_available_browser(browser_config) - try: - page, context = await strategy.get_page(crawler_run_config) - # Store config hashes on the page object for later retrieval - setattr(page, "_browser_config_hash", browser_config_hash) - setattr(page, "_crawler_config_hash", crawler_config_hash) - return page, context, strategy - except Exception as e: - # Release the browser back to the pool - await self.release_browser(strategy, browser_config) - raise e - - # Create tasks for parallel execution - page_tasks = [get_single_page() for _ in range(count)] - - # Execute all page creation tasks in parallel - pages_contexts_strategies = await asyncio.gather(*page_tasks) - - # Add pages to the page pool - browser_config_hash = self._create_browser_config_hash(browser_config) - crawler_config_hash = self._make_config_signature(crawler_run_config) - pool_key = (browser_config_hash, crawler_config_hash) - - async with self._page_pool_lock: - if pool_key not in self.page_pool: - self.page_pool[pool_key] = [] - - # Add all pages to the pool - self.page_pool[pool_key].extend(pages_contexts_strategies) - - if self.logger: - self.logger.debug( - f"Pre-warmed {count} pages in parallel with config {crawler_run_config}", - tag="POOL" - ) - except Exception as e: - if self.logger: - self.logger.error( - f"Failed to pre-warm pages: {str(e)}", - tag="POOL" - ) - raise - - async def get_available_browser( - self, - browser_config: Optional[BrowserConfig] = None - ) -> BaseBrowserStrategy: - """Get an available browser from the pool for the given configuration. - - Args: - browser_config: Browser configuration to match - - Returns: - BaseBrowserStrategy: An available browser strategy - - Raises: - Exception: If no browser is available and behavior is EXCEPTION - """ - browser_config = browser_config or self.config - config_hash = self._create_browser_config_hash(browser_config) - - async with self._browser_locks.get(config_hash, asyncio.Lock()): - # Check if we have browsers for this config - if config_hash not in self.browser_pool or not self.browser_pool[config_hash]: - if self.unavailable_behavior == UnavailableBehavior.ON_DEMAND: - # Create a new browser on demand - if self.logger: - self.logger.info( - f"1> Creating new browser on demand for config {config_hash[:8]}", - tag="POOL" - ) - - # Initialize pool for this config if needed - async with self._browser_pool_lock: - if config_hash not in self.browser_pool: - self.browser_pool[config_hash] = [] - - strategy = self._create_strategy(browser_config) - await strategy.start() - - self.browser_pool[config_hash].append(strategy) - self.browser_in_use[strategy] = False - - elif self.unavailable_behavior == UnavailableBehavior.EXCEPTION: - raise Exception(f"No browsers available for configuration {config_hash[:8]}") - - # Check for an available browser with capacity in the pool - for strategy in self.browser_pool[config_hash]: - # Check if this browser has capacity for more pages - async with self._page_count_lock: - current_pages = self.browser_page_counts.get(strategy, 0) - - if current_pages < self.max_pages_per_browser: - # Increment the page count - self.browser_page_counts[strategy] = current_pages + 1 - - self.browser_in_use[strategy] = True - - # Get browser information for better logging - browser_type = getattr(strategy.config, 'browser_type', 'unknown') - browser_mode = getattr(strategy.config, 'browser_mode', 'unknown') - strategy_id = id(strategy) # Use object ID as a unique identifier - - if self.logger: - self.logger.debug( - f"Selected browser #{strategy_id} ({browser_type}/{browser_mode}) - " - f"pages: {current_pages+1}/{self.max_pages_per_browser}", - tag="POOL" - ) - - return strategy - - # All browsers are at capacity or in use - if self.unavailable_behavior == UnavailableBehavior.ON_DEMAND: - # Check if we've reached the maximum number of browsers - if len(self.browser_pool[config_hash]) >= self.max_browsers_per_config: - if self.logger: - self.logger.warning( - f"Maximum browsers reached for config {config_hash[:8]} and all at page capacity", - tag="POOL" - ) - if self.unavailable_behavior == UnavailableBehavior.EXCEPTION: - raise Exception("Maximum browsers reached and all at page capacity") - - # Create a new browser on demand - if self.logger: - self.logger.info( - f"2> Creating new browser on demand for config {config_hash[:8]}", - tag="POOL" - ) - - strategy = self._create_strategy(browser_config) - await strategy.start() - - async with self._browser_pool_lock: - self.browser_pool[config_hash].append(strategy) - self.browser_in_use[strategy] = True - - return strategy - - # If we get here, either behavior is EXCEPTION or PENDING - if self.unavailable_behavior == UnavailableBehavior.EXCEPTION: - raise Exception(f"All browsers in use or at page capacity for configuration {config_hash[:8]}") - - # For PENDING behavior, set up waiting mechanism - if config_hash not in self.request_queues: - self.request_queues[config_hash] = asyncio.Queue() - - # Create a future to wait on - future = asyncio.Future() - await self.request_queues[config_hash].put(future) - - if self.logger: - self.logger.debug( - f"Waiting for available browser for config {config_hash[:8]}", - tag="POOL" - ) - - # Wait for a browser to become available - strategy = await future - return strategy - - async def get_page( - self, - crawlerRunConfig: CrawlerRunConfig, - browser_config: Optional[BrowserConfig] = None - ) -> Tuple[Page, BrowserContext, BaseBrowserStrategy]: - """Get a page from the browser pool.""" - browser_config = browser_config or self.config - - # Check if we have a pre-warmed page available - browser_config_hash = self._create_browser_config_hash(browser_config) - crawler_config_hash = self._make_config_signature(crawlerRunConfig) - pool_key = (browser_config_hash, crawler_config_hash) - - # Try to get a page from the pool - async with self._page_pool_lock: - if pool_key in self.page_pool and self.page_pool[pool_key]: - # Get a page from the pool - page, context, strategy = self.page_pool[pool_key].pop() - - # Mark browser as in use (it already is, but ensure consistency) - self.browser_in_use[strategy] = True - - if self.logger: - self.logger.debug( - f"Using pre-warmed page for config {crawler_config_hash[:8]}", - tag="POOL" - ) - - # Note: We don't increment page count since it was already counted when created - - return page, context, strategy - - # No pre-warmed page available, create a new one - # get_available_browser already increments the page count - strategy = await self.get_available_browser(browser_config) - - try: - # Get a page from the browser - page, context = await strategy.get_page(crawlerRunConfig) - - # Store config hashes on the page object for later retrieval - setattr(page, "_browser_config_hash", browser_config_hash) - setattr(page, "_crawler_config_hash", crawler_config_hash) - - return page, context, strategy - except Exception as e: - # Release the browser back to the pool and decrement the page count - await self.release_browser(strategy, browser_config, decrement_page_count=True) - raise e - - async def release_page( - self, - page: Page, - strategy: BaseBrowserStrategy, - browser_config: Optional[BrowserConfig] = None, - keep_alive: bool = True, - return_to_pool: bool = True - ): - """Release a page back to the pool.""" - browser_config = browser_config or self.config - - page_url = page.url if page else None - - # If not keeping the page alive, close it and decrement count - if not keep_alive: - try: - await page.close() - except Exception as e: - if self.logger: - self.logger.error( - f"Error closing page: {str(e)}", - tag="POOL" - ) - # Release the browser with page count decrement - await self.release_browser(strategy, browser_config, decrement_page_count=True) - return - - # If returning to pool - if return_to_pool: - # Get the configuration hashes from the page object - browser_config_hash = getattr(page, "_browser_config_hash", None) - crawler_config_hash = getattr(page, "_crawler_config_hash", None) - - if browser_config_hash and crawler_config_hash: - pool_key = (browser_config_hash, crawler_config_hash) - - async with self._page_pool_lock: - if pool_key not in self.page_pool: - self.page_pool[pool_key] = [] - - # Add page back to the pool - self.page_pool[pool_key].append((page, page.context, strategy)) - - if self.logger: - self.logger.debug( - f"Returned page to pool for config {crawler_config_hash[:8]}, url: {page_url}", - tag="POOL" - ) - - # Note: We don't decrement the page count here since the page is still "in use" - # from the browser's perspective, just in our pool - return - else: - # If we can't identify the configuration, log a warning - if self.logger: - self.logger.warning( - "Cannot return page to pool - missing configuration hashes", - tag="POOL" - ) - - # If we got here, we couldn't return to pool, so just release the browser - await self.release_browser(strategy, browser_config, decrement_page_count=True) - - async def release_browser( - self, - strategy: BaseBrowserStrategy, - browser_config: Optional[BrowserConfig] = None, - decrement_page_count: bool = True - ): - """Release a browser back to the pool.""" - browser_config = browser_config or self.config - config_hash = self._create_browser_config_hash(browser_config) - - # Decrement page count - if decrement_page_count: - async with self._page_count_lock: - current_count = self.browser_page_counts.get(strategy, 1) - self.browser_page_counts[strategy] = max(0, current_count - 1) - - if self.logger: - self.logger.debug( - f"Decremented page count for browser (now: {self.browser_page_counts[strategy]})", - tag="POOL" - ) - - # Mark as not in use - self.browser_in_use[strategy] = False - - # Process any waiting requests - if config_hash in self.request_queues and not self.request_queues[config_hash].empty(): - future = await self.request_queues[config_hash].get() - if not future.done(): - future.set_result(strategy) - - async def get_pages( - self, - crawlerRunConfig: CrawlerRunConfig, - count: int = 1, - browser_config: Optional[BrowserConfig] = None - ) -> List[Tuple[Page, BrowserContext, BaseBrowserStrategy]]: - """Get multiple pages from the browser pool. - - Args: - crawlerRunConfig: Configuration for the crawler run - count: Number of pages to get - browser_config: Browser configuration to use - - Returns: - List of (Page, Context, Strategy) tuples - """ - results = [] - for _ in range(count): - try: - result = await self.get_page(crawlerRunConfig, browser_config) - results.append(result) - except Exception as e: - # Release any pages we've already gotten - for page, _, strategy in results: - await self.release_page(page, strategy, browser_config) - raise e - - return results - - async def get_page_pool_status(self) -> Dict[str, Any]: - """Get information about the page pool status. - - Returns: - Dict with page pool status information - """ - status = { - "total_pooled_pages": 0, - "configs": {} - } - - async with self._page_pool_lock: - for (browser_hash, crawler_hash), pages in self.page_pool.items(): - config_key = f"{browser_hash[:8]}_{crawler_hash[:8]}" - status["configs"][config_key] = len(pages) - status["total_pooled_pages"] += len(pages) - - if self.logger: - self.logger.debug( - f"Page pool status: {status['total_pooled_pages']} pages available", - tag="POOL" - ) - - return status - - async def get_pool_status(self) -> Dict[str, Any]: - """Get information about the browser pool status. - - Returns: - Dict with pool status information - """ - status = { - "total_browsers": 0, - "browsers_in_use": 0, - "total_pages": 0, - "configs": {} - } - - for config_hash, strategies in self.browser_pool.items(): - config_pages = 0 - in_use = 0 - - for strategy in strategies: - is_in_use = self.browser_in_use.get(strategy, False) - if is_in_use: - in_use += 1 - - # Get page count for this browser - try: - page_count = len(await strategy.get_opened_pages()) - config_pages += page_count - except Exception as e: - if self.logger: - self.logger.error(f"Error getting page count: {str(e)}", tag="POOL") - - config_status = { - "total_browsers": len(strategies), - "browsers_in_use": in_use, - "pages_open": config_pages, - "waiting_requests": self.request_queues.get(config_hash, asyncio.Queue()).qsize(), - "max_capacity": len(strategies) * self.max_pages_per_browser, - "utilization_pct": round((config_pages / (len(strategies) * self.max_pages_per_browser)) * 100, 1) - if strategies else 0 - } - - status["configs"][config_hash] = config_status - status["total_browsers"] += config_status["total_browsers"] - status["browsers_in_use"] += config_status["browsers_in_use"] - status["total_pages"] += config_pages - - # Add overall utilization - if status["total_browsers"] > 0: - max_capacity = status["total_browsers"] * self.max_pages_per_browser - status["overall_utilization_pct"] = round((status["total_pages"] / max_capacity) * 100, 1) - else: - status["overall_utilization_pct"] = 0 - - return status - - async def start(self): - """Start at least one browser instance in the pool. - - This method is kept for backward compatibility. - - Returns: - self: For method chaining - """ - await self.initialize_pool([self.config], 1) - return self - - async def kill_session(self, session_id: str): - """Kill a browser session and clean up resources. - - Delegated to the strategy. This method is kept for backward compatibility. - - Args: - session_id: The session ID to kill - """ - if not self.strategy: - return - - await self.strategy.kill_session(session_id) - - # Sync sessions - if hasattr(self.strategy, 'sessions'): - self.sessions = self.strategy.sessions - - async def close(self): - """Close all browsers in the pool and clean up resources.""" - # Close all browsers in the pool - for strategies in self.browser_pool.values(): - for strategy in strategies: - try: - await strategy.close() - except Exception as e: - if self.logger: - self.logger.error( - f"Error closing browser: {str(e)}", - tag="POOL" - ) - - # Clear pool data - self.browser_pool = {} - self.browser_in_use = {} - - # Reset legacy references - self.browser = None - self.default_context = None - self.managed_browser = None - self.playwright = None - self.strategy = None - self.sessions = {} - - -async def create_browser_manager( - browser_config: Optional[BrowserConfig] = None, - logger: Optional[AsyncLogger] = None, - unavailable_behavior: UnavailableBehavior = UnavailableBehavior.EXCEPTION, - max_browsers_per_config: int = 10, - initial_pool_size: int = 1, - page_configs: Optional[List[Tuple[BrowserConfig, CrawlerRunConfig, int]]] = None -) -> BrowserManager: - """Factory function to create and initialize a BrowserManager. - - Args: - browser_config: Configuration for the browsers - logger: Logger for recording events - unavailable_behavior: Behavior when no browser is available - max_browsers_per_config: Maximum browsers per configuration - initial_pool_size: Initial number of browsers per configuration - page_configs: Optional configurations for pre-warming pages - - Returns: - Initialized BrowserManager - """ - manager = BrowserManager( - browser_config=browser_config, - logger=logger, - unavailable_behavior=unavailable_behavior, - max_browsers_per_config=max_browsers_per_config - ) - - await manager.initialize_pool( - [browser_config] if browser_config else None, - initial_pool_size, - page_configs - ) - - return manager - - - - - diff --git a/crawl4ai/browser/models.py b/crawl4ai/browser/models.py deleted file mode 100644 index e2ac2b3f..00000000 --- a/crawl4ai/browser/models.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Docker configuration module for Crawl4AI browser automation. - -This module provides configuration classes for Docker-based browser automation, -allowing flexible configuration of Docker containers for browsing. -""" - -from typing import Dict, List, Optional - - -class DockerConfig: - """Configuration for Docker-based browser automation. - - This class contains Docker-specific settings to avoid cluttering BrowserConfig. - - Attributes: - mode (str): Docker operation mode - "connect" or "launch". - - "connect": Uses a container with Chrome already running - - "launch": Dynamically configures and starts Chrome in container - image (str): Docker image to use. If None, defaults from DockerUtils are used. - registry_file (str): Path to container registry file for persistence. - persistent (bool): Keep container running after browser closes. - remove_on_exit (bool): Remove container on exit when not persistent. - network (str): Docker network to use. - volumes (List[str]): Volume mappings (e.g., ["host_path:container_path"]). - env_vars (Dict[str, str]): Environment variables to set in container. - extra_args (List[str]): Additional docker run arguments. - host_port (int): Host port to map to container's 9223 port. - user_data_dir (str): Path to user data directory on host. - container_user_data_dir (str): Path to user data directory in container. - """ - - def __init__( - self, - mode: str = "connect", # "connect" or "launch" - image: Optional[str] = None, # Docker image to use - registry_file: Optional[str] = None, # Path to registry file - persistent: bool = False, # Keep container running after browser closes - remove_on_exit: bool = True, # Remove container on exit when not persistent - network: Optional[str] = None, # Docker network to use - volumes: List[str] = None, # Volume mappings - cpu_limit: float = 1.0, # CPU limit for the container - memory_limit: str = "1.5g", # Memory limit for the container - env_vars: Dict[str, str] = None, # Environment variables - host_port: Optional[int] = None, # Host port to map to container's 9223 - user_data_dir: Optional[str] = None, # Path to user data directory on host - container_user_data_dir: str = "/data", # Path to user data directory in container - extra_args: List[str] = None, # Additional docker run arguments - ): - """Initialize Docker configuration. - - Args: - mode: Docker operation mode ("connect" or "launch") - image: Docker image to use - registry_file: Path to container registry file - persistent: Whether to keep container running after browser closes - remove_on_exit: Whether to remove container on exit when not persistent - network: Docker network to use - volumes: Volume mappings as list of strings - cpu_limit: CPU limit for the container - memory_limit: Memory limit for the container - env_vars: Environment variables as dictionary - extra_args: Additional docker run arguments - host_port: Host port to map to container's 9223 - user_data_dir: Path to user data directory on host - container_user_data_dir: Path to user data directory in container - """ - self.mode = mode - self.image = image # If None, defaults will be used from DockerUtils - self.registry_file = registry_file - self.persistent = persistent - self.remove_on_exit = remove_on_exit - self.network = network - self.volumes = volumes or [] - self.cpu_limit = cpu_limit - self.memory_limit = memory_limit - self.env_vars = env_vars or {} - self.extra_args = extra_args or [] - self.host_port = host_port - self.user_data_dir = user_data_dir - self.container_user_data_dir = container_user_data_dir - - def to_dict(self) -> Dict: - """Convert this configuration to a dictionary. - - Returns: - Dictionary representation of this configuration - """ - return { - "mode": self.mode, - "image": self.image, - "registry_file": self.registry_file, - "persistent": self.persistent, - "remove_on_exit": self.remove_on_exit, - "network": self.network, - "volumes": self.volumes, - "cpu_limit": self.cpu_limit, - "memory_limit": self.memory_limit, - "env_vars": self.env_vars, - "extra_args": self.extra_args, - "host_port": self.host_port, - "user_data_dir": self.user_data_dir, - "container_user_data_dir": self.container_user_data_dir - } - - @staticmethod - def from_kwargs(kwargs: Dict) -> "DockerConfig": - """Create a DockerConfig from a dictionary of keyword arguments. - - Args: - kwargs: Dictionary of configuration options - - Returns: - New DockerConfig instance - """ - return DockerConfig( - mode=kwargs.get("mode", "connect"), - image=kwargs.get("image"), - registry_file=kwargs.get("registry_file"), - persistent=kwargs.get("persistent", False), - remove_on_exit=kwargs.get("remove_on_exit", True), - network=kwargs.get("network"), - volumes=kwargs.get("volumes"), - cpu_limit=kwargs.get("cpu_limit", 1.0), - memory_limit=kwargs.get("memory_limit", "1.5g"), - env_vars=kwargs.get("env_vars"), - extra_args=kwargs.get("extra_args"), - host_port=kwargs.get("host_port"), - user_data_dir=kwargs.get("user_data_dir"), - container_user_data_dir=kwargs.get("container_user_data_dir", "/data") - ) - - def clone(self, **kwargs) -> "DockerConfig": - """Create a copy of this configuration with updated values. - - Args: - **kwargs: Key-value pairs of configuration options to update - - Returns: - DockerConfig: A new instance with the specified updates - """ - config_dict = self.to_dict() - config_dict.update(kwargs) - return DockerConfig.from_kwargs(config_dict) \ No newline at end of file diff --git a/crawl4ai/browser/profiles.py b/crawl4ai/browser/profiles.py deleted file mode 100644 index afd0d78a..00000000 --- a/crawl4ai/browser/profiles.py +++ /dev/null @@ -1,457 +0,0 @@ -"""Browser profile management module for Crawl4AI. - -This module provides functionality for creating and managing browser profiles -that can be used for authenticated browsing. -""" - -import os -import asyncio -import signal -import sys -import datetime -import uuid -import shutil -from typing import List, Dict, Optional, Any -from colorama import Fore, Style, init - -from ..async_configs import BrowserConfig -from ..async_logger import AsyncLogger, AsyncLoggerBase -from ..utils import get_home_folder - -class BrowserProfileManager: - """Manages browser profiles for Crawl4AI. - - This class provides functionality to create and manage browser profiles - that can be used for authenticated browsing with Crawl4AI. - - Profiles are stored by default in ~/.crawl4ai/profiles/ - """ - - def __init__(self, logger: Optional[AsyncLoggerBase] = None): - """Initialize the BrowserProfileManager. - - Args: - logger: Logger for outputting messages. If None, a default AsyncLogger is created. - """ - # Initialize colorama for colorful terminal output - init() - - # Create a logger if not provided - if logger is None: - self.logger = AsyncLogger(verbose=True) - elif not isinstance(logger, AsyncLoggerBase): - self.logger = AsyncLogger(verbose=True) - else: - self.logger = logger - - # Ensure profiles directory exists - self.profiles_dir = os.path.join(get_home_folder(), "profiles") - os.makedirs(self.profiles_dir, exist_ok=True) - - async def create_profile(self, - profile_name: Optional[str] = None, - browser_config: Optional[BrowserConfig] = None) -> Optional[str]: - """Create a browser profile interactively. - - Args: - profile_name: Name for the profile. If None, a name is generated. - browser_config: Configuration for the browser. If None, a default configuration is used. - - Returns: - Path to the created profile directory, or None if creation failed - """ - # Create default browser config if none provided - if browser_config is None: - browser_config = BrowserConfig( - browser_type="chromium", - headless=False, # Must be visible for user interaction - verbose=True - ) - else: - # Ensure headless is False for user interaction - browser_config.headless = False - - # Generate profile name if not provided - if not profile_name: - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - profile_name = f"profile_{timestamp}_{uuid.uuid4().hex[:6]}" - - # Sanitize profile name (replace spaces and special chars) - profile_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in profile_name) - - # Set user data directory - profile_path = os.path.join(self.profiles_dir, profile_name) - os.makedirs(profile_path, exist_ok=True) - - # Print instructions for the user with colorama formatting - border = f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}" - self.logger.info(f"\n{border}", tag="PROFILE") - self.logger.info(f"Creating browser profile: {Fore.GREEN}{profile_name}{Style.RESET_ALL}", tag="PROFILE") - self.logger.info(f"Profile directory: {Fore.YELLOW}{profile_path}{Style.RESET_ALL}", tag="PROFILE") - - self.logger.info("\nInstructions:", tag="PROFILE") - self.logger.info("1. A browser window will open for you to set up your profile.", tag="PROFILE") - self.logger.info(f"2. {Fore.CYAN}Log in to websites{Style.RESET_ALL}, configure settings, etc. as needed.", tag="PROFILE") - self.logger.info(f"3. When you're done, {Fore.YELLOW}press 'q' in this terminal{Style.RESET_ALL} to close the browser.", tag="PROFILE") - self.logger.info("4. The profile will be saved and ready to use with Crawl4AI.", tag="PROFILE") - self.logger.info(f"{border}\n", tag="PROFILE") - - # Import the necessary classes with local imports to avoid circular references - from .strategies import CDPBrowserStrategy - - # Set browser config to use the profile path - browser_config.user_data_dir = profile_path - - # Create a CDP browser strategy for the profile creation - browser_strategy = CDPBrowserStrategy(browser_config, self.logger) - - # Set up signal handlers to ensure cleanup on interrupt - original_sigint = signal.getsignal(signal.SIGINT) - original_sigterm = signal.getsignal(signal.SIGTERM) - - # Define cleanup handler for signals - async def cleanup_handler(sig, frame): - self.logger.warning("\nCleaning up browser process...", tag="PROFILE") - await browser_strategy.close() - # Restore original signal handlers - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - if sig == signal.SIGINT: - self.logger.error("Profile creation interrupted. Profile may be incomplete.", tag="PROFILE") - sys.exit(1) - - # Set signal handlers - def sigint_handler(sig, frame): - asyncio.create_task(cleanup_handler(sig, frame)) - - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - - # Event to signal when user is done with the browser - user_done_event = asyncio.Event() - - # Run keyboard input loop in a separate task - async def listen_for_quit_command(): - import termios - import tty - import select - - # First output the prompt - self.logger.info(f"{Fore.CYAN}Press '{Fore.WHITE}q{Fore.CYAN}' when you've finished using the browser...{Style.RESET_ALL}", tag="PROFILE") - - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - - try: - # Switch to non-canonical mode (no line buffering) - tty.setcbreak(fd) - - while True: - # Check if input is available (non-blocking) - readable, _, _ = select.select([sys.stdin], [], [], 0.5) - if readable: - key = sys.stdin.read(1) - if key.lower() == 'q': - self.logger.info(f"{Fore.GREEN}Closing browser and saving profile...{Style.RESET_ALL}", tag="PROFILE") - user_done_event.set() - return - - # Check if the browser process has already exited - if browser_strategy.browser_process and browser_strategy.browser_process.poll() is not None: - self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE") - user_done_event.set() - return - - await asyncio.sleep(0.1) - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - try: - # Start the browser - await browser_strategy.start() - - # Check if browser started successfully - if not browser_strategy.browser_process: - self.logger.error("Failed to start browser process.", tag="PROFILE") - return None - - self.logger.info(f"Browser launched. {Fore.CYAN}Waiting for you to finish...{Style.RESET_ALL}", tag="PROFILE") - - # Start listening for keyboard input - listener_task = asyncio.create_task(listen_for_quit_command()) - - # Wait for either the user to press 'q' or for the browser process to exit naturally - while not user_done_event.is_set() and browser_strategy.browser_process.poll() is None: - await asyncio.sleep(0.5) - - # Cancel the listener task if it's still running - if not listener_task.done(): - listener_task.cancel() - try: - await listener_task - except asyncio.CancelledError: - pass - - # If the browser is still running and the user pressed 'q', terminate it - if browser_strategy.browser_process.poll() is None and user_done_event.is_set(): - self.logger.info("Terminating browser process...", tag="PROFILE") - await browser_strategy.close() - - self.logger.success(f"Browser closed. Profile saved at: {Fore.GREEN}{profile_path}{Style.RESET_ALL}", tag="PROFILE") - - except Exception as e: - self.logger.error(f"Error creating profile: {str(e)}", tag="PROFILE") - await browser_strategy.close() - return None - finally: - # Restore original signal handlers - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - - # Make sure browser is fully cleaned up - await browser_strategy.close() - - # Return the profile path - return profile_path - - def list_profiles(self) -> List[Dict[str, Any]]: - """List all available browser profiles. - - Returns: - List of dictionaries containing profile information - """ - if not os.path.exists(self.profiles_dir): - return [] - - profiles = [] - - for name in os.listdir(self.profiles_dir): - profile_path = os.path.join(self.profiles_dir, name) - - # Skip if not a directory - if not os.path.isdir(profile_path): - continue - - # Check if this looks like a valid browser profile - # For Chromium: Look for Preferences file - # For Firefox: Look for prefs.js file - is_valid = False - - if os.path.exists(os.path.join(profile_path, "Preferences")) or \ - os.path.exists(os.path.join(profile_path, "Default", "Preferences")): - is_valid = "chromium" - elif os.path.exists(os.path.join(profile_path, "prefs.js")): - is_valid = "firefox" - - if is_valid: - # Get creation time - created = datetime.datetime.fromtimestamp( - os.path.getctime(profile_path) - ) - - profiles.append({ - "name": name, - "path": profile_path, - "created": created, - "type": is_valid - }) - - # Sort by creation time, newest first - profiles.sort(key=lambda x: x["created"], reverse=True) - - return profiles - - def get_profile_path(self, profile_name: str) -> Optional[str]: - """Get the full path to a profile by name. - - Args: - profile_name: Name of the profile (not the full path) - - Returns: - Full path to the profile directory, or None if not found - """ - profile_path = os.path.join(self.profiles_dir, profile_name) - - # Check if path exists and is a valid profile - if not os.path.isdir(profile_path): - # Check if profile_name itself is full path - if os.path.isabs(profile_name): - profile_path = profile_name - else: - return None - - # Look for profile indicators - is_profile = ( - os.path.exists(os.path.join(profile_path, "Preferences")) or - os.path.exists(os.path.join(profile_path, "Default", "Preferences")) or - os.path.exists(os.path.join(profile_path, "prefs.js")) - ) - - if not is_profile: - return None # Not a valid browser profile - - return profile_path - - def delete_profile(self, profile_name_or_path: str) -> bool: - """Delete a browser profile by name or path. - - Args: - profile_name_or_path: Name of the profile or full path to profile directory - - Returns: - True if the profile was deleted successfully, False otherwise - """ - # Determine if input is a name or a path - if os.path.isabs(profile_name_or_path): - # Full path provided - profile_path = profile_name_or_path - else: - # Just a name provided, construct path - profile_path = os.path.join(self.profiles_dir, profile_name_or_path) - - # Check if path exists and is a valid profile - if not os.path.isdir(profile_path): - return False - - # Look for profile indicators - is_profile = ( - os.path.exists(os.path.join(profile_path, "Preferences")) or - os.path.exists(os.path.join(profile_path, "Default", "Preferences")) or - os.path.exists(os.path.join(profile_path, "prefs.js")) - ) - - if not is_profile: - return False # Not a valid browser profile - - # Delete the profile directory - try: - shutil.rmtree(profile_path) - return True - except Exception: - return False - - async def interactive_manager(self, crawl_callback=None): - """Launch an interactive profile management console. - - Args: - crawl_callback: Function to call when selecting option to use - a profile for crawling. It will be called with (profile_path, url). - """ - while True: - self.logger.info(f"\n{Fore.CYAN}Profile Management Options:{Style.RESET_ALL}", tag="MENU") - self.logger.info(f"1. {Fore.GREEN}Create a new profile{Style.RESET_ALL}", tag="MENU") - self.logger.info(f"2. {Fore.YELLOW}List available profiles{Style.RESET_ALL}", tag="MENU") - self.logger.info(f"3. {Fore.RED}Delete a profile{Style.RESET_ALL}", tag="MENU") - - # Only show crawl option if callback provided - if crawl_callback: - self.logger.info(f"4. {Fore.CYAN}Use a profile to crawl a website{Style.RESET_ALL}", tag="MENU") - self.logger.info(f"5. {Fore.MAGENTA}Exit{Style.RESET_ALL}", tag="MENU") - exit_option = "5" - else: - self.logger.info(f"4. {Fore.MAGENTA}Exit{Style.RESET_ALL}", tag="MENU") - exit_option = "4" - - choice = input(f"\n{Fore.CYAN}Enter your choice (1-{exit_option}): {Style.RESET_ALL}") - - if choice == "1": - # Create new profile - name = input(f"{Fore.GREEN}Enter a name for the new profile (or press Enter for auto-generated name): {Style.RESET_ALL}") - await self.create_profile(name or None) - - elif choice == "2": - # List profiles - profiles = self.list_profiles() - - if not profiles: - self.logger.warning(" No profiles found. Create one first with option 1.", tag="PROFILES") - continue - - # Print profile information with colorama formatting - self.logger.info("\nAvailable profiles:", tag="PROFILES") - for i, profile in enumerate(profiles): - self.logger.info(f"[{i+1}] {Fore.CYAN}{profile['name']}{Style.RESET_ALL}", tag="PROFILES") - self.logger.info(f" Path: {Fore.YELLOW}{profile['path']}{Style.RESET_ALL}", tag="PROFILES") - self.logger.info(f" Created: {profile['created'].strftime('%Y-%m-%d %H:%M:%S')}", tag="PROFILES") - self.logger.info(f" Browser type: {profile['type']}", tag="PROFILES") - self.logger.info("", tag="PROFILES") # Empty line for spacing - - elif choice == "3": - # Delete profile - profiles = self.list_profiles() - if not profiles: - self.logger.warning("No profiles found to delete", tag="PROFILES") - continue - - # Display numbered list - self.logger.info(f"\n{Fore.YELLOW}Available profiles:{Style.RESET_ALL}", tag="PROFILES") - for i, profile in enumerate(profiles): - self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES") - - # Get profile to delete - profile_idx = input(f"{Fore.RED}Enter the number of the profile to delete (or 'c' to cancel): {Style.RESET_ALL}") - if profile_idx.lower() == 'c': - continue - - try: - idx = int(profile_idx) - 1 - if 0 <= idx < len(profiles): - profile_name = profiles[idx]["name"] - self.logger.info(f"Deleting profile: {Fore.YELLOW}{profile_name}{Style.RESET_ALL}", tag="PROFILES") - - # Confirm deletion - confirm = input(f"{Fore.RED}Are you sure you want to delete this profile? (y/n): {Style.RESET_ALL}") - if confirm.lower() == 'y': - success = self.delete_profile(profiles[idx]["path"]) - - if success: - self.logger.success(f"Profile {Fore.GREEN}{profile_name}{Style.RESET_ALL} deleted successfully", tag="PROFILES") - else: - self.logger.error(f"Failed to delete profile {Fore.RED}{profile_name}{Style.RESET_ALL}", tag="PROFILES") - else: - self.logger.error("Invalid profile number", tag="PROFILES") - except ValueError: - self.logger.error("Please enter a valid number", tag="PROFILES") - - elif choice == "4" and crawl_callback: - # Use profile to crawl a site - profiles = self.list_profiles() - if not profiles: - self.logger.warning("No profiles found. Create one first.", tag="PROFILES") - continue - - # Display numbered list - self.logger.info(f"\n{Fore.YELLOW}Available profiles:{Style.RESET_ALL}", tag="PROFILES") - for i, profile in enumerate(profiles): - self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES") - - # Get profile to use - profile_idx = input(f"{Fore.CYAN}Enter the number of the profile to use (or 'c' to cancel): {Style.RESET_ALL}") - if profile_idx.lower() == 'c': - continue - - try: - idx = int(profile_idx) - 1 - if 0 <= idx < len(profiles): - profile_path = profiles[idx]["path"] - url = input(f"{Fore.CYAN}Enter the URL to crawl: {Style.RESET_ALL}") - if url: - # Call the provided crawl callback - await crawl_callback(profile_path, url) - else: - self.logger.error("No URL provided", tag="CRAWL") - else: - self.logger.error("Invalid profile number", tag="PROFILES") - except ValueError: - self.logger.error("Please enter a valid number", tag="PROFILES") - - elif (choice == "4" and not crawl_callback) or (choice == "5" and crawl_callback): - # Exit - self.logger.info("Exiting profile management", tag="MENU") - break - - else: - self.logger.error(f"Invalid choice. Please enter a number between 1 and {exit_option}.", tag="MENU") diff --git a/crawl4ai/browser/strategies/__init__.py b/crawl4ai/browser/strategies/__init__.py deleted file mode 100644 index c4f17fd9..00000000 --- a/crawl4ai/browser/strategies/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from .base import BaseBrowserStrategy -from .cdp import CDPBrowserStrategy -from .docker_strategy import DockerBrowserStrategy -from .playwright import PlaywrightBrowserStrategy -from .builtin import BuiltinBrowserStrategy - -__all__ = [ - "BrowserStrategy", - "CDPBrowserStrategy", - "DockerBrowserStrategy", - "PlaywrightBrowserStrategy", - "BuiltinBrowserStrategy", -] \ No newline at end of file diff --git a/crawl4ai/browser/strategies/base.py b/crawl4ai/browser/strategies/base.py deleted file mode 100644 index 14f7464d..00000000 --- a/crawl4ai/browser/strategies/base.py +++ /dev/null @@ -1,601 +0,0 @@ -"""Browser strategies module for Crawl4AI. - -This module implements the browser strategy pattern for different -browser implementations, including Playwright, CDP, and builtin browsers. -""" - -from abc import ABC, abstractmethod -import asyncio -import json -import hashlib -import os -import time -from typing import Optional, Tuple, List - -from playwright.async_api import BrowserContext, Page - -from ...async_logger import AsyncLogger -from ...async_configs import BrowserConfig, CrawlerRunConfig -from ...config import DOWNLOAD_PAGE_TIMEOUT -from ...js_snippet import load_js_script -from ..utils import get_playwright - - -class BaseBrowserStrategy(ABC): - """Base class for all browser strategies. - - This abstract class defines the interface that all browser strategies - must implement. It handles common functionality like context caching, - browser configuration, and session management. - """ - - _playwright_instance = None - - @classmethod - async def get_playwright(cls): - """Get or create a shared Playwright instance. - - Returns: - Playwright: The shared Playwright instance - """ - # For now I dont want Singleton pattern for Playwright - if cls._playwright_instance is None or True: - cls._playwright_instance = await get_playwright() - return cls._playwright_instance - - def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None): - """Initialize the strategy with configuration and logger. - - Args: - config: Browser configuration - logger: Logger for recording events and errors - """ - self.config = config - self.logger = logger - self.browser = None - self.default_context = None - - # Context management - self.contexts_by_config = {} # config_signature -> context - - self._contexts_lock = asyncio.Lock() - - # Session management - self.sessions = {} - self.session_ttl = 1800 # 30 minutes default - - # Playwright instance - self.playwright = None - - @abstractmethod - async def start(self): - """Start the browser. - - This method should be implemented by concrete strategies to initialize - the browser in the appropriate way (direct launch, CDP connection, etc.) - - Returns: - self: For method chaining - """ - # Base implementation gets the playwright instance - self.playwright = await self.get_playwright() - return self - - @abstractmethod - async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - pass - - async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - """Get a page with specified configuration. - - This method should be implemented by concrete strategies to create - or retrieve a page according to their browser management approach. - - Args: - crawlerRunConfig: Crawler run configuration - - Returns: - Tuple of (Page, BrowserContext) - """ - # Clean up expired sessions first - self._cleanup_expired_sessions() - - # If a session_id is provided and we already have it, reuse that page + context - if crawlerRunConfig.session_id and crawlerRunConfig.session_id in self.sessions: - context, page, _ = self.sessions[crawlerRunConfig.session_id] - # Update last-used timestamp - self.sessions[crawlerRunConfig.session_id] = (context, page, time.time()) - return page, context - - page, context = await self._generate_page(crawlerRunConfig) - - import uuid - setattr(page, "guid", uuid.uuid4()) - - # If a session_id is specified, store this session so we can reuse later - if crawlerRunConfig.session_id: - self.sessions[crawlerRunConfig.session_id] = (context, page, time.time()) - - return page, context - pass - - async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]: - """Get multiple pages with the same configuration. - - Args: - crawlerRunConfig: Configuration for the pages - count: Number of pages to create - - Returns: - List of (Page, Context) tuples - """ - pages = [] - for _ in range(count): - page, context = await self.get_page(crawlerRunConfig) - pages.append((page, context)) - return pages - - async def get_opened_pages(self) -> List[Page]: - """Get all opened pages in the - browser. - """ - return [page for context in self.contexts_by_config.values() for page in context.pages] - - def _build_browser_args(self) -> dict: - """Build browser launch arguments from config. - - Returns: - dict: Browser launch arguments for Playwright - """ - # Define common browser arguments that improve performance and stability - args = [ - "--no-sandbox", - "--no-first-run", - "--no-default-browser-check", - "--window-position=0,0", - "--ignore-certificate-errors", - "--ignore-certificate-errors-spki-list", - "--window-position=400,0", - "--force-color-profile=srgb", - "--mute-audio", - "--disable-gpu", - "--disable-gpu-compositing", - "--disable-software-rasterizer", - "--disable-dev-shm-usage", - "--disable-infobars", - "--disable-blink-features=AutomationControlled", - "--disable-renderer-backgrounding", - "--disable-ipc-flooding-protection", - "--disable-background-timer-throttling", - f"--window-size={self.config.viewport_width},{self.config.viewport_height}", - ] - - # Define browser disable options for light mode - browser_disable_options = [ - "--disable-backgrounding-occluded-windows", - "--disable-breakpad", - "--disable-client-side-phishing-detection", - "--disable-component-extensions-with-background-pages", - "--disable-default-apps", - "--disable-extensions", - "--disable-features=TranslateUI", - "--disable-hang-monitor", - "--disable-popup-blocking", - "--disable-prompt-on-repost", - "--disable-sync", - "--metrics-recording-only", - "--password-store=basic", - "--use-mock-keychain", - ] - - # Apply light mode settings if enabled - if self.config.light_mode: - args.extend(browser_disable_options) - - # Apply text mode settings if enabled (disables images, JS, etc) - if self.config.text_mode: - args.extend([ - "--blink-settings=imagesEnabled=false", - "--disable-remote-fonts", - "--disable-images", - "--disable-javascript", - "--disable-software-rasterizer", - "--disable-dev-shm-usage", - ]) - - # Add any extra arguments from the config - if self.config.extra_args: - args.extend(self.config.extra_args) - - # Build the core browser args dictionary - browser_args = {"headless": self.config.headless, "args": args} - - # Add chrome channel if specified - if self.config.chrome_channel: - browser_args["channel"] = self.config.chrome_channel - - # Configure downloads - if self.config.accept_downloads: - browser_args["downloads_path"] = self.config.downloads_path or os.path.join( - os.getcwd(), "downloads" - ) - os.makedirs(browser_args["downloads_path"], exist_ok=True) - - # Check for user data directory - if self.config.user_data_dir: - # Ensure the directory exists - os.makedirs(self.config.user_data_dir, exist_ok=True) - browser_args["user_data_dir"] = self.config.user_data_dir - - # Configure proxy settings - if self.config.proxy or self.config.proxy_config: - from playwright.async_api import ProxySettings - - proxy_settings = ( - ProxySettings(server=self.config.proxy) - if self.config.proxy - else ProxySettings( - server=self.config.proxy_config.server, - username=self.config.proxy_config.username, - password=self.config.proxy_config.password, - ) - ) - browser_args["proxy"] = proxy_settings - - return browser_args - - def _make_config_signature(self, crawlerRunConfig: CrawlerRunConfig) -> str: - """Create a signature hash from configuration for context caching. - - Converts the crawlerRunConfig into a dict, excludes ephemeral fields, - then returns a hash of the sorted JSON. This yields a stable signature - that identifies configurations requiring a unique browser context. - - Args: - crawlerRunConfig: Crawler run configuration - - Returns: - str: Unique hash for this configuration - """ - config_dict = crawlerRunConfig.__dict__.copy() - # Exclude items that do not affect browser-level setup - ephemeral_keys = [ - "session_id", - "js_code", - "scraping_strategy", - "extraction_strategy", - "chunking_strategy", - "cache_mode", - "content_filter", - "semaphore_count", - "url" - ] - for key in ephemeral_keys: - if key in config_dict: - del config_dict[key] - - # Convert to canonical JSON string - signature_json = json.dumps(config_dict, sort_keys=True, default=str) - - # Hash the JSON so we get a compact, unique string - signature_hash = hashlib.sha256(signature_json.encode("utf-8")).hexdigest() - return signature_hash - - async def create_browser_context(self, crawlerRunConfig: Optional[CrawlerRunConfig] = None) -> BrowserContext: - """Creates and returns a new browser context with configured settings. - - Args: - crawlerRunConfig: Configuration object for the crawler run - - Returns: - BrowserContext: Browser context object with the specified configurations - """ - if not self.browser: - raise ValueError("Browser must be initialized before creating context") - - # Base settings - user_agent = self.config.headers.get("User-Agent", self.config.user_agent) - viewport_settings = { - "width": self.config.viewport_width, - "height": self.config.viewport_height, - } - proxy_settings = {"server": self.config.proxy} if self.config.proxy else None - - # Define blocked extensions for resource optimization - blocked_extensions = [ - # Images - "jpg", "jpeg", "png", "gif", "webp", "svg", "ico", "bmp", "tiff", "psd", - # Fonts - "woff", "woff2", "ttf", "otf", "eot", - # Media - "mp4", "webm", "ogg", "avi", "mov", "wmv", "flv", "m4v", "mp3", "wav", "aac", - "m4a", "opus", "flac", - # Documents - "pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", - # Archives - "zip", "rar", "7z", "tar", "gz", - # Scripts and data - "xml", "swf", "wasm", - ] - - # Common context settings - context_settings = { - "user_agent": user_agent, - "viewport": viewport_settings, - "proxy": proxy_settings, - "accept_downloads": self.config.accept_downloads, - "storage_state": self.config.storage_state, - "ignore_https_errors": self.config.ignore_https_errors, - "device_scale_factor": 1.0, - "java_script_enabled": self.config.java_script_enabled, - } - - # Apply text mode settings if enabled - if self.config.text_mode: - text_mode_settings = { - "has_touch": False, - "is_mobile": False, - "java_script_enabled": False, # Disable javascript in text mode - } - # Update context settings with text mode settings - context_settings.update(text_mode_settings) - if self.logger: - self.logger.debug("Text mode enabled for browser context", tag="BROWSER") - - # Handle storage state properly - this is key for persistence - if self.config.storage_state: - if self.logger: - if isinstance(self.config.storage_state, str): - self.logger.debug(f"Using storage state from file: {self.config.storage_state}", tag="BROWSER") - else: - self.logger.debug("Using storage state from config object", tag="BROWSER") - - if self.config.user_data_dir: - # For CDP-based browsers, storage persistence is typically handled by the user_data_dir - # at the browser level, but we'll create a storage_state location for Playwright as well - storage_path = os.path.join(self.config.user_data_dir, "storage_state.json") - if not os.path.exists(storage_path): - # Create parent directory if it doesn't exist - os.makedirs(os.path.dirname(storage_path), exist_ok=True) - with open(storage_path, "w") as f: - json.dump({}, f) - self.config.storage_state = storage_path - - if self.logger: - self.logger.debug(f"Using user data directory: {self.config.user_data_dir}", tag="BROWSER") - - # Apply crawler-specific configurations if provided - if crawlerRunConfig: - # Check if there is value for crawlerRunConfig.proxy_config set add that to context - if crawlerRunConfig.proxy_config: - proxy_settings = { - "server": crawlerRunConfig.proxy_config.server, - } - if crawlerRunConfig.proxy_config.username: - proxy_settings.update({ - "username": crawlerRunConfig.proxy_config.username, - "password": crawlerRunConfig.proxy_config.password, - }) - context_settings["proxy"] = proxy_settings - - # Create and return the context - try: - # Create the context with appropriate settings - context = await self.browser.new_context(**context_settings) - - # Apply text mode resource blocking if enabled - if self.config.text_mode: - # Create and apply route patterns for each extension - for ext in blocked_extensions: - await context.route(f"**/*.{ext}", lambda route: route.abort()) - - return context - except Exception as e: - if self.logger: - self.logger.error(f"Error creating browser context: {str(e)}", tag="BROWSER") - # Fallback to basic context creation if the advanced settings fail - return await self.browser.new_context() - - async def setup_context(self, context: BrowserContext, crawlerRunConfig: Optional[CrawlerRunConfig] = None): - """Set up a browser context with the configured options. - - Args: - context: The browser context to set up - crawlerRunConfig: Configuration object containing all browser settings - """ - # Set HTTP headers - if self.config.headers: - await context.set_extra_http_headers(self.config.headers) - - # Add cookies - if self.config.cookies: - await context.add_cookies(self.config.cookies) - - # Apply storage state if provided - if self.config.storage_state: - await context.storage_state(path=None) - - # Configure downloads - if self.config.accept_downloads: - context.set_default_timeout(DOWNLOAD_PAGE_TIMEOUT) - context.set_default_navigation_timeout(DOWNLOAD_PAGE_TIMEOUT) - if self.config.downloads_path: - context._impl_obj._options["accept_downloads"] = True - context._impl_obj._options["downloads_path"] = self.config.downloads_path - - # Handle user agent and browser hints - if self.config.user_agent: - combined_headers = { - "User-Agent": self.config.user_agent, - "sec-ch-ua": self.config.browser_hint, - } - combined_headers.update(self.config.headers) - await context.set_extra_http_headers(combined_headers) - - # Add default cookie - target_url = (crawlerRunConfig and crawlerRunConfig.url) or "https://crawl4ai.com/" - await context.add_cookies( - [ - { - "name": "cookiesEnabled", - "value": "true", - "url": target_url, - } - ] - ) - - # Handle navigator overrides - if crawlerRunConfig: - if ( - crawlerRunConfig.override_navigator - or crawlerRunConfig.simulate_user - or crawlerRunConfig.magic - ): - await context.add_init_script(load_js_script("navigator_overrider")) - - async def kill_session(self, session_id: str): - """Kill a browser session and clean up resources. - - Args: - session_id (str): The session ID to kill. - """ - if session_id not in self.sessions: - return - - context, page, _ = self.sessions[session_id] - - # Close the page - try: - await page.close() - except Exception as e: - if self.logger: - self.logger.error(f"Error closing page for session {session_id}: {str(e)}", tag="BROWSER") - - # Remove session from tracking - del self.sessions[session_id] - - # Clean up any contexts that no longer have pages - await self._cleanup_unused_contexts() - - if self.logger: - self.logger.debug(f"Killed session: {session_id}", tag="BROWSER") - - async def _cleanup_unused_contexts(self): - """Clean up contexts that no longer have any pages.""" - async with self._contexts_lock: - # Get all contexts we're managing - contexts_to_check = list(self.contexts_by_config.values()) - - for context in contexts_to_check: - # Check if the context has any pages left - if not context.pages: - # No pages left, we can close this context - config_signature = next((sig for sig, ctx in self.contexts_by_config.items() - if ctx == context), None) - if config_signature: - try: - await context.close() - del self.contexts_by_config[config_signature] - if self.logger: - self.logger.debug(f"Closed unused context", tag="BROWSER") - except Exception as e: - if self.logger: - self.logger.error(f"Error closing unused context: {str(e)}", tag="BROWSER") - - def _cleanup_expired_sessions(self): - """Clean up expired sessions based on TTL.""" - current_time = time.time() - expired_sessions = [ - sid - for sid, (_, _, last_used) in self.sessions.items() - if current_time - last_used > self.session_ttl - ] - - for sid in expired_sessions: - if self.logger: - self.logger.debug(f"Session expired: {sid}", tag="BROWSER") - asyncio.create_task(self.kill_session(sid)) - - async def close(self): - """Close the browser and clean up resources. - - This method handles common cleanup tasks like: - 1. Persisting storage state if a user_data_dir is configured - 2. Closing all sessions - 3. Closing all browser contexts - 4. Closing the browser - 5. Stopping Playwright - - Child classes should override this method to add their specific cleanup logic, - but should call super().close() to ensure common cleanup tasks are performed. - """ - # Set a flag to prevent race conditions during cleanup - self.shutting_down = True - - try: - # Add brief delay if configured - if self.config.sleep_on_close: - await asyncio.sleep(0.5) - - # Persist storage state if using a user data directory - if self.config.user_data_dir and self.browser: - for context in self.browser.contexts: - try: - # Ensure the directory exists - storage_dir = os.path.join(self.config.user_data_dir, "Default") - os.makedirs(storage_dir, exist_ok=True) - - # Save storage state - storage_path = os.path.join(storage_dir, "storage_state.json") - await context.storage_state(path=storage_path) - - if self.logger: - self.logger.debug("Storage state persisted before closing browser", tag="BROWSER") - except Exception as e: - if self.logger: - self.logger.warning( - message="Failed to ensure storage persistence: {error}", - tag="BROWSER", - params={"error": str(e)} - ) - - # Close all active sessions - session_ids = list(self.sessions.keys()) - for session_id in session_ids: - await self.kill_session(session_id) - - # Close all cached contexts - for ctx in self.contexts_by_config.values(): - try: - await ctx.close() - except Exception as e: - if self.logger: - self.logger.error( - message="Error closing context: {error}", - tag="BROWSER", - params={"error": str(e)} - ) - self.contexts_by_config.clear() - - # Close the browser if it exists - if self.browser: - await self.browser.close() - self.browser = None - - # Stop playwright - if self.playwright: - await self.playwright.stop() - self.playwright = None - - except Exception as e: - if self.logger: - self.logger.error( - message="Error during browser cleanup: {error}", - tag="BROWSER", - params={"error": str(e)} - ) - finally: - # Reset shutting down flag - self.shutting_down = False - - \ No newline at end of file diff --git a/crawl4ai/browser/strategies/builtin.py b/crawl4ai/browser/strategies/builtin.py deleted file mode 100644 index 678346fc..00000000 --- a/crawl4ai/browser/strategies/builtin.py +++ /dev/null @@ -1,468 +0,0 @@ -import asyncio -import os -import time -import json -import subprocess -import shutil -import signal -from typing import Optional, Dict, Any, Tuple - - -from ...async_logger import AsyncLogger -from ...async_configs import CrawlerRunConfig -from playwright.async_api import Page, BrowserContext -from ...async_logger import AsyncLogger -from ...async_configs import BrowserConfig -from ...utils import get_home_folder -from ..utils import get_browser_executable, is_windows, is_browser_running, find_process_by_port, terminate_process - - -from .cdp import CDPBrowserStrategy -from .base import BaseBrowserStrategy - -class BuiltinBrowserStrategy(CDPBrowserStrategy): - """Built-in browser strategy. - - This strategy extends the CDP strategy to use the built-in browser. - """ - - def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None): - """Initialize the built-in browser strategy. - - Args: - config: Browser configuration - logger: Logger for recording events and errors - """ - super().__init__(config, logger) - self.builtin_browser_dir = os.path.join(get_home_folder(), "builtin-browser") if not self.config.user_data_dir else self.config.user_data_dir - self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json") - - # Raise error if user data dir is already engaged - if self._check_user_dir_is_engaged(self.builtin_browser_dir): - raise Exception(f"User data directory {self.builtin_browser_dir} is already engaged by another browser instance.") - - os.makedirs(self.builtin_browser_dir, exist_ok=True) - - def _check_user_dir_is_engaged(self, user_data_dir: str) -> bool: - """Check if the user data directory is already in use. - - Returns: - bool: True if the directory is engaged, False otherwise - """ - # Load browser config file, then iterate in port_map values, check "user_data_dir" key if it matches - # the current user data directory - if os.path.exists(self.builtin_config_file): - try: - with open(self.builtin_config_file, 'r') as f: - browser_info_dict = json.load(f) - - # Check if user data dir is already engaged - for port_str, browser_info in browser_info_dict.get("port_map", {}).items(): - if browser_info.get("user_data_dir") == user_data_dir: - return True - except Exception as e: - if self.logger: - self.logger.error(f"Error reading built-in browser config: {str(e)}", tag="BUILTIN") - return False - - async def start(self): - """Start or connect to the built-in browser. - - Returns: - self: For method chaining - """ - # Initialize Playwright instance via base class method - await BaseBrowserStrategy.start(self) - - try: - # Check for existing built-in browser (get_browser_info already checks if running) - browser_info = self.get_browser_info() - if browser_info: - if self.logger: - self.logger.info(f"Using existing built-in browser at {browser_info.get('cdp_url')}", tag="BROWSER") - self.config.cdp_url = browser_info.get('cdp_url') - else: - if self.logger: - self.logger.info("Built-in browser not found, launching new instance...", tag="BROWSER") - cdp_url = await self.launch_builtin_browser( - browser_type=self.config.browser_type, - debugging_port=self.config.debugging_port, - headless=self.config.headless, - ) - if not cdp_url: - if self.logger: - self.logger.warning("Failed to launch built-in browser, falling back to regular CDP strategy", tag="BROWSER") - # Call CDP's start but skip BaseBrowserStrategy.start() since we already called it - return await CDPBrowserStrategy.start(self) - self.config.cdp_url = cdp_url - - # Connect to the browser using CDP protocol - self.browser = await self.playwright.chromium.connect_over_cdp(self.config.cdp_url) - - # Get or create default context - contexts = self.browser.contexts - if contexts: - self.default_context = contexts[0] - else: - self.default_context = await self.create_browser_context() - - await self.setup_context(self.default_context) - - if self.logger: - self.logger.debug(f"Connected to built-in browser at {self.config.cdp_url}", tag="BUILTIN") - - return self - except Exception as e: - if self.logger: - self.logger.error(f"Failed to start built-in browser: {str(e)}", tag="BUILTIN") - - # There is a possibility that at this point I need to clean up some resourece - raise - - def _get_builtin_browser_info(cls, debugging_port: int, config_file: str, logger: Optional[AsyncLogger] = None) -> Optional[Dict[str, Any]]: - """Get information about the built-in browser for a specific debugging port. - - Args: - debugging_port: The debugging port to look for - config_file: Path to the config file - logger: Optional logger for recording events - - Returns: - dict: Browser information or None if no running browser is configured for this port - """ - if not os.path.exists(config_file): - return None - - try: - with open(config_file, 'r') as f: - browser_info_dict = json.load(f) - - # Get browser info from port map - if isinstance(browser_info_dict, dict) and "port_map" in browser_info_dict: - port_str = str(debugging_port) - if port_str in browser_info_dict["port_map"]: - browser_info = browser_info_dict["port_map"][port_str] - - # Check if the browser is still running - pids = browser_info.get('pid', '') - if isinstance(pids, str): - pids = [int(pid) for pid in pids.split() if pid.isdigit()] - elif isinstance(pids, int): - pids = [pids] - else: - pids = [] - - # Check if any of the PIDs are running - if not pids: - if logger: - logger.warning(f"Built-in browser on port {debugging_port} has no valid PID", tag="BUILTIN") - # Remove this port from the dictionary - del browser_info_dict["port_map"][port_str] - with open(config_file, 'w') as f: - json.dump(browser_info_dict, f, indent=2) - return None - # Check if any of the PIDs are running - for pid in pids: - if is_browser_running(pid): - browser_info['pid'] = pid - break - else: - # If none of the PIDs are running, remove this port from the dictionary - if logger: - logger.warning(f"Built-in browser on port {debugging_port} is not running", tag="BUILTIN") - # Remove this port from the dictionary - del browser_info_dict["port_map"][port_str] - with open(config_file, 'w') as f: - json.dump(browser_info_dict, f, indent=2) - return None - - return browser_info - - return None - - except Exception as e: - if logger: - logger.error(f"Error reading built-in browser config: {str(e)}", tag="BUILTIN") - return None - - def get_browser_info(self) -> Optional[Dict[str, Any]]: - """Get information about the current built-in browser instance. - - Returns: - dict: Browser information or None if no running browser is configured - """ - return self._get_builtin_browser_info( - debugging_port=self.config.debugging_port, - config_file=self.builtin_config_file, - logger=self.logger - ) - - async def launch_builtin_browser(self, - browser_type: str = "chromium", - debugging_port: int = 9222, - headless: bool = True) -> Optional[str]: - """Launch a browser in the background for use as the built-in browser. - - Args: - browser_type: Type of browser to launch ('chromium' or 'firefox') - debugging_port: Port to use for CDP debugging - headless: Whether to run in headless mode - - Returns: - str: CDP URL for the browser, or None if launch failed - """ - # Check if there's an existing browser still running - browser_info = self._get_builtin_browser_info( - debugging_port=debugging_port, - config_file=self.builtin_config_file, - logger=self.logger - ) - if browser_info: - if self.logger: - self.logger.info(f"Built-in browser is already running on port {debugging_port}", tag="BUILTIN") - return browser_info.get('cdp_url') - - # Create a user data directory for the built-in browser - user_data_dir = os.path.join(self.builtin_browser_dir, "user_data") - - # Raise error if user data dir is already engaged - if self._check_user_dir_is_engaged(user_data_dir): - raise Exception(f"User data directory {user_data_dir} is already engaged by another browser instance.") - - # Create the user data directory if it doesn't exist - os.makedirs(user_data_dir, exist_ok=True) - - # Prepare browser launch arguments - browser_args = super()._build_browser_args() - browser_path = await get_browser_executable(browser_type) - base_args = [browser_path] - - if browser_type == "chromium": - args = [ - browser_path, - f"--remote-debugging-port={debugging_port}", - f"--user-data-dir={user_data_dir}", - ] - # if headless: - # args.append("--headless=new") - - elif browser_type == "firefox": - args = [ - browser_path, - "--remote-debugging-port", - str(debugging_port), - "--profile", - user_data_dir, - ] - if headless: - args.append("--headless") - else: - if self.logger: - self.logger.error(f"Browser type {browser_type} not supported for built-in browser", tag="BUILTIN") - return None - - args = base_args + browser_args + args - - try: - - # Check if the port is already in use - PID = "" - cdp_url = f"http://localhost:{debugging_port}" - config_json = await self._check_port_in_use(cdp_url) - if config_json: - if self.logger: - self.logger.info(f"Port {debugging_port} is already in use.", tag="BUILTIN") - PID = find_process_by_port(debugging_port) - else: - # Start the browser process detached - process = None - if is_windows(): - process = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP - ) - else: - process = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - preexec_fn=os.setpgrp # Start in a new process group - ) - - # Wait briefly to ensure the process starts successfully - await asyncio.sleep(2.0) - - # Check if the process is still running - if process and process.poll() is not None: - if self.logger: - self.logger.error(f"Browser process exited immediately with code {process.returncode}", tag="BUILTIN") - return None - - PID = process.pid - # Construct CDP URL - config_json = await self._check_port_in_use(cdp_url) - - - # Create browser info - browser_info = { - 'pid': PID, - 'cdp_url': cdp_url, - 'user_data_dir': user_data_dir, - 'browser_type': browser_type, - 'debugging_port': debugging_port, - 'start_time': time.time(), - 'config': config_json - } - - # Read existing config file if it exists - port_map = {} - if os.path.exists(self.builtin_config_file): - try: - with open(self.builtin_config_file, 'r') as f: - existing_data = json.load(f) - - # Check if it already uses port mapping - if isinstance(existing_data, dict) and "port_map" in existing_data: - port_map = existing_data["port_map"] - - # # Convert legacy format to port mapping - # elif isinstance(existing_data, dict) and "debugging_port" in existing_data: - # old_port = str(existing_data.get("debugging_port")) - # if self._is_browser_running(existing_data.get("pid")): - # port_map[old_port] = existing_data - except Exception as e: - if self.logger: - self.logger.warning(f"Could not read existing config: {str(e)}", tag="BUILTIN") - - # Add/update this browser in the port map - port_map[str(debugging_port)] = browser_info - - # Write updated config - with open(self.builtin_config_file, 'w') as f: - json.dump({"port_map": port_map}, f, indent=2) - - # Detach from the browser process - don't keep any references - # This is important to allow the Python script to exit while the browser continues running - process = None - - if self.logger: - self.logger.success(f"Built-in browser launched at CDP URL: {cdp_url}", tag="BUILTIN") - return cdp_url - - except Exception as e: - if self.logger: - self.logger.error(f"Error launching built-in browser: {str(e)}", tag="BUILTIN") - return None - - async def _check_port_in_use(self, cdp_url: str) -> dict: - """Check if a port is already in use by a Chrome DevTools instance. - - Args: - cdp_url: The CDP URL to check - - Returns: - dict: Chrome DevTools protocol version information or None if not found - """ - import aiohttp - json_url = f"{cdp_url}/json/version" - json_config = None - - try: - async with aiohttp.ClientSession() as session: - try: - async with session.get(json_url, timeout=2.0) as response: - if response.status == 200: - json_config = await response.json() - if self.logger: - self.logger.debug(f"Found CDP server running at {cdp_url}", tag="BUILTIN") - return json_config - except (aiohttp.ClientError, asyncio.TimeoutError): - pass - return None - except Exception as e: - if self.logger: - self.logger.debug(f"Error checking CDP port: {str(e)}", tag="BUILTIN") - return None - - async def kill_builtin_browser(self) -> bool: - """Kill the built-in browser if it's running. - - Returns: - bool: True if the browser was killed, False otherwise - """ - browser_info = self.get_browser_info() - if not browser_info: - if self.logger: - self.logger.warning(f"No built-in browser found on port {self.config.debugging_port}", tag="BUILTIN") - return False - - pid = browser_info.get('pid') - if not pid: - return False - - success, error_msg = terminate_process(pid, logger=self.logger) - if success: - # Update config file to remove this browser - with open(self.builtin_config_file, 'r') as f: - browser_info_dict = json.load(f) - - # Remove this port from the dictionary - port_str = str(self.config.debugging_port) - if port_str in browser_info_dict.get("port_map", {}): - del browser_info_dict["port_map"][port_str] - - with open(self.builtin_config_file, 'w') as f: - json.dump(browser_info_dict, f, indent=2) - - # Remove user data directory if it exists - if os.path.exists(self.builtin_browser_dir): - shutil.rmtree(self.builtin_browser_dir) - - # Clear the browser info cache - self.browser = None - self.temp_dir = None - self.shutting_down = True - - if self.logger: - self.logger.success("Built-in browser terminated", tag="BUILTIN") - return True - else: - if self.logger: - self.logger.error(f"Error killing built-in browser: {error_msg}", tag="BUILTIN") - return False - - async def get_builtin_browser_status(self) -> Dict[str, Any]: - """Get status information about the built-in browser. - - Returns: - dict: Status information with running, cdp_url, and info fields - """ - browser_info = self.get_browser_info() - - if not browser_info: - return { - 'running': False, - 'cdp_url': None, - 'info': None, - 'port': self.config.debugging_port - } - - return { - 'running': True, - 'cdp_url': browser_info.get('cdp_url'), - 'info': browser_info, - 'port': self.config.debugging_port - } - - async def close(self): - """Close the built-in browser and clean up resources.""" - # Call parent class close method - await super().close() - - # Clean up built-in browser if we created it and were in shutdown mode - if self.shutting_down: - await self.kill_builtin_browser() - if self.logger: - self.logger.debug("Killed built-in browser during shutdown", tag="BUILTIN") \ No newline at end of file diff --git a/crawl4ai/browser/strategies/cdp.py b/crawl4ai/browser/strategies/cdp.py deleted file mode 100644 index 0bef6fec..00000000 --- a/crawl4ai/browser/strategies/cdp.py +++ /dev/null @@ -1,281 +0,0 @@ -"""Browser strategies module for Crawl4AI. - -This module implements the browser strategy pattern for different -browser implementations, including Playwright, CDP, and builtin browsers. -""" - -import asyncio -import os -import time -import json -import subprocess -import shutil -from typing import Optional, Tuple, List - -from playwright.async_api import BrowserContext, Page - -from ...async_logger import AsyncLogger -from ...async_configs import BrowserConfig, CrawlerRunConfig -from ..utils import get_playwright, get_browser_executable, create_temp_directory, is_windows, check_process_is_running, terminate_process - -from .base import BaseBrowserStrategy - -class CDPBrowserStrategy(BaseBrowserStrategy): - """CDP-based browser strategy. - - This strategy connects to an existing browser using CDP protocol or - launches and connects to a browser using CDP. - """ - - def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None): - """Initialize the CDP browser strategy. - - Args: - config: Browser configuration - logger: Logger for recording events and errors - """ - super().__init__(config, logger) - self.sessions = {} - self.session_ttl = 1800 # 30 minutes - self.browser_process = None - self.temp_dir = None - self.shutting_down = False - - async def start(self): - """Start or connect to the browser using CDP. - - Returns: - self: For method chaining - """ - # Call the base class start to initialize Playwright - await super().start() - - try: - # Get or create CDP URL - cdp_url = await self._get_or_create_cdp_url() - - # Connect to the browser using CDP - self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url) - - # Get or create default context - contexts = self.browser.contexts - if contexts: - self.default_context = contexts[0] - else: - self.default_context = await self.create_browser_context() - - await self.setup_context(self.default_context) - - if self.logger: - self.logger.debug(f"Connected to CDP browser at {cdp_url}", tag="CDP") - - except Exception as e: - if self.logger: - self.logger.error(f"Failed to connect to CDP browser: {str(e)}", tag="CDP") - - # Clean up any resources before re-raising - await self._cleanup_process() - raise - - return self - - async def _get_or_create_cdp_url(self) -> str: - """Get existing CDP URL or launch a browser and return its CDP URL. - - Returns: - str: CDP URL for connecting to the browser - """ - # If CDP URL is provided, just return it - if self.config.cdp_url: - return self.config.cdp_url - - # Create temp dir if needed - if not self.config.user_data_dir: - self.temp_dir = create_temp_directory() - user_data_dir = self.temp_dir - else: - user_data_dir = self.config.user_data_dir - - # Get browser args based on OS and browser type - # args = await self._get_browser_args(user_data_dir) - browser_args = super()._build_browser_args() - browser_path = await get_browser_executable(self.config.browser_type) - base_args = [browser_path] - - if self.config.browser_type == "chromium": - args = [ - f"--remote-debugging-port={self.config.debugging_port}", - f"--user-data-dir={user_data_dir}", - ] - # if self.config.headless: - # args.append("--headless=new") - - elif self.config.browser_type == "firefox": - args = [ - "--remote-debugging-port", - str(self.config.debugging_port), - "--profile", - user_data_dir, - ] - if self.config.headless: - args.append("--headless") - else: - raise NotImplementedError(f"Browser type {self.config.browser_type} not supported") - - args = base_args + browser_args['args'] + args - - # Start browser process - try: - # Use DETACHED_PROCESS flag on Windows to fully detach the process - # On Unix, we'll use preexec_fn=os.setpgrp to start the process in a new process group - if is_windows(): - self.browser_process = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP - ) - else: - self.browser_process = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - preexec_fn=os.setpgrp # Start in a new process group - ) - - # Monitor for a short time to make sure it starts properly - is_running, return_code, stdout, stderr = await check_process_is_running(self.browser_process, delay=2) - if not is_running: - if self.logger: - self.logger.error( - message="Browser process terminated unexpectedly | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}", - tag="ERROR", - params={ - "code": return_code, - "stdout": stdout.decode() if stdout else "", - "stderr": stderr.decode() if stderr else "", - }, - ) - await self._cleanup_process() - raise Exception("Browser process terminated unexpectedly") - - return f"http://localhost:{self.config.debugging_port}" - except Exception as e: - await self._cleanup_process() - raise Exception(f"Failed to start browser: {e}") - - async def _cleanup_process(self): - """Cleanup browser process and temporary directory.""" - # Set shutting_down flag BEFORE any termination actions - self.shutting_down = True - - if self.browser_process: - try: - # Only attempt termination if the process is still running - if self.browser_process.poll() is None: - # Use our robust cross-platform termination utility - success = terminate_process( - pid=self.browser_process.pid, - timeout=1.0, # Equivalent to the previous 10*0.1s wait - logger=self.logger - ) - - if not success and self.logger: - self.logger.warning( - message="Failed to terminate browser process cleanly", - tag="PROCESS" - ) - - except Exception as e: - if self.logger: - self.logger.error( - message="Error during browser process cleanup: {error}", - tag="ERROR", - params={"error": str(e)}, - ) - - if self.temp_dir and os.path.exists(self.temp_dir): - try: - shutil.rmtree(self.temp_dir) - self.temp_dir = None - if self.logger: - self.logger.debug("Removed temporary directory", tag="CDP") - except Exception as e: - if self.logger: - self.logger.error( - message="Error removing temporary directory: {error}", - tag="CDP", - params={"error": str(e)} - ) - - self.browser_process = None - - async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - # For CDP, we typically use the shared default_context - context = self.default_context - pages = context.pages - - # Otherwise, check if we have an existing context for this config - config_signature = self._make_config_signature(crawlerRunConfig) - self.contexts_by_config[config_signature] = context - - await self.setup_context(context, crawlerRunConfig) - - # Check if there's already a page with the target URL - page = next((p for p in pages if p.url == crawlerRunConfig.url), None) - - # If not found, create a new page - if not page: - page = await context.new_page() - - return page, context - - async def _get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - """Get a page for the given configuration. - - Args: - crawlerRunConfig: Configuration object for the crawler run - - Returns: - Tuple of (Page, BrowserContext) - """ - # Call parent method to ensure browser is started - await super().get_page(crawlerRunConfig) - - # For CDP, we typically use the shared default_context - context = self.default_context - pages = context.pages - - # Otherwise, check if we have an existing context for this config - config_signature = self._make_config_signature(crawlerRunConfig) - self.contexts_by_config[config_signature] = context - - await self.setup_context(context, crawlerRunConfig) - - # Check if there's already a page with the target URL - page = next((p for p in pages if p.url == crawlerRunConfig.url), None) - - # If not found, create a new page - if not page: - page = await context.new_page() - - # If a session_id is specified, store this session for reuse - if crawlerRunConfig.session_id: - self.sessions[crawlerRunConfig.session_id] = (context, page, time.time()) - - return page, context - - async def close(self): - """Close the CDP browser and clean up resources.""" - # Skip cleanup if using external CDP URL and not launched by us - if self.config.cdp_url and not self.browser_process: - if self.logger: - self.logger.debug("Skipping cleanup for external CDP browser", tag="CDP") - return - - # Call parent implementation for common cleanup - await super().close() - - # Additional CDP-specific cleanup - await asyncio.sleep(0.5) - await self._cleanup_process() diff --git a/crawl4ai/browser/strategies/docker_strategy.py b/crawl4ai/browser/strategies/docker_strategy.py deleted file mode 100644 index 5390fc8a..00000000 --- a/crawl4ai/browser/strategies/docker_strategy.py +++ /dev/null @@ -1,430 +0,0 @@ -"""Docker browser strategy module for Crawl4AI. - -This module provides browser strategies for running browsers in Docker containers, -which offers better isolation, consistency across platforms, and easy scaling. -""" - -import os -import uuid -from typing import List, Optional - - -from ...async_logger import AsyncLogger -from ...async_configs import BrowserConfig -from ..models import DockerConfig -from ..docker_registry import DockerRegistry -from ..docker_utils import DockerUtils -from .builtin import CDPBrowserStrategy -from .base import BaseBrowserStrategy - -class DockerBrowserStrategy(CDPBrowserStrategy): - """Docker-based browser strategy. - - Extends the CDPBrowserStrategy to run browsers in Docker containers. - Supports two modes: - 1. "connect" - Uses a Docker image with Chrome already running - 2. "launch" - Starts Chrome within the container with custom settings - - Attributes: - docker_config: Docker-specific configuration options - container_id: ID of current Docker container - container_name: Name assigned to the container - registry: Registry for tracking and reusing containers - docker_utils: Utilities for Docker operations - chrome_process_id: Process ID of Chrome within container - socat_process_id: Process ID of socat within container - internal_cdp_port: Chrome's internal CDP port - internal_mapped_port: Port that socat maps to internally - """ - - def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None): - """Initialize the Docker browser strategy. - - Args: - config: Browser configuration including Docker-specific settings - logger: Logger for recording events and errors - """ - super().__init__(config, logger) - - # Initialize Docker-specific attributes - self.docker_config = self.config.docker_config or DockerConfig() - self.container_id = None - self.container_name = f"crawl4ai-browser-{uuid.uuid4().hex[:8]}" - - # Use the shared registry file path for consistency with BuiltinBrowserStrategy - registry_file = self.docker_config.registry_file - if registry_file is None and self.config.user_data_dir: - # Use the same registry file as BuiltinBrowserStrategy if possible - registry_file = os.path.join( - os.path.dirname(self.config.user_data_dir), "browser_config.json" - ) - - self.registry = DockerRegistry(self.docker_config.registry_file) - self.docker_utils = DockerUtils(logger) - self.chrome_process_id = None - self.socat_process_id = None - self.internal_cdp_port = 9222 # Chrome's internal CDP port - self.internal_mapped_port = 9223 # Port that socat maps to internally - self.shutting_down = False - - async def start(self): - """Start or connect to a browser running in a Docker container. - - This method initializes Playwright and establishes a connection to - a browser running in a Docker container. Depending on the configured mode: - - "connect": Connects to a container with Chrome already running - - "launch": Creates a container and launches Chrome within it - - Returns: - self: For method chaining - """ - # Initialize Playwright - await BaseBrowserStrategy.start(self) - - if self.logger: - self.logger.info( - f"Starting Docker browser strategy in {self.docker_config.mode} mode", - tag="DOCKER", - ) - - try: - # Get CDP URL by creating or reusing a Docker container - # This handles the container management and browser startup - cdp_url = await self._get_or_create_cdp_url() - - if not cdp_url: - raise Exception( - "Failed to establish CDP connection to Docker container" - ) - - if self.logger: - self.logger.info( - f"Connecting to browser in Docker via CDP: {cdp_url}", tag="DOCKER" - ) - - # Connect to the browser using CDP - self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url) - - # Get existing context or create default context - contexts = self.browser.contexts - if contexts: - self.default_context = contexts[0] - if self.logger: - self.logger.debug("Using existing browser context", tag="DOCKER") - else: - if self.logger: - self.logger.debug("Creating new browser context", tag="DOCKER") - self.default_context = await self.create_browser_context() - await self.setup_context(self.default_context) - - return self - - except Exception as e: - # Clean up resources if startup fails - if self.container_id and not self.docker_config.persistent: - if self.logger: - self.logger.warning( - f"Cleaning up container after failed start: {self.container_id[:12]}", - tag="DOCKER", - ) - await self.docker_utils.remove_container(self.container_id) - self.registry.unregister_container(self.container_id) - self.container_id = None - - if self.playwright: - await self.playwright.stop() - self.playwright = None - - # Re-raise the exception - if self.logger: - self.logger.error( - f"Failed to start Docker browser: {str(e)}", tag="DOCKER" - ) - raise - - async def _generate_config_hash(self) -> str: - """Generate a hash of the configuration for container matching. - - Returns: - Hash string uniquely identifying this configuration - """ - # Create a dict with the relevant parts of the config - config_dict = { - "image": self.docker_config.image, - "mode": self.docker_config.mode, - "browser_type": self.config.browser_type, - "headless": self.config.headless, - } - - # Add browser-specific config if in launch mode - if self.docker_config.mode == "launch": - config_dict.update( - { - "text_mode": self.config.text_mode, - "light_mode": self.config.light_mode, - "viewport_width": self.config.viewport_width, - "viewport_height": self.config.viewport_height, - } - ) - - # Use the utility method to generate the hash - return self.docker_utils.generate_config_hash(config_dict) - - async def _get_or_create_cdp_url(self) -> str: - """Get CDP URL by either creating a new container or using an existing one. - - Returns: - CDP URL for connecting to the browser - - Raises: - Exception: If container creation or browser launch fails - """ - # If CDP URL is explicitly provided, use it - if self.config.cdp_url: - return self.config.cdp_url - - # Ensure Docker image exists (will build if needed) - image_name = await self.docker_utils.ensure_docker_image_exists( - self.docker_config.image, self.docker_config.mode - ) - - # Generate config hash for container matching - config_hash = await self._generate_config_hash() - - # Look for existing container with matching config - container_id = await self.registry.find_container_by_config( - config_hash, self.docker_utils - ) - - if container_id: - # Use existing container - self.container_id = container_id - host_port = self.registry.get_container_host_port(container_id) - if self.logger: - self.logger.info( - f"Using existing Docker container: {container_id[:12]}", - tag="DOCKER", - ) - else: - # Get a port for the new container - host_port = ( - self.docker_config.host_port - or self.registry.get_next_available_port(self.docker_utils) - ) - - # Prepare volumes list - volumes = list(self.docker_config.volumes) - - # Add user data directory if specified - if self.docker_config.user_data_dir: - # Ensure user data directory exists - os.makedirs(self.docker_config.user_data_dir, exist_ok=True) - volumes.append( - f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}" - ) - - # # Update config user_data_dir to point to container path - # self.config.user_data_dir = self.docker_config.container_user_data_dir - - # Create a new container - container_id = await self.docker_utils.create_container( - image_name=image_name, - host_port=host_port, - container_name=self.container_name, - volumes=volumes, - network=self.docker_config.network, - env_vars=self.docker_config.env_vars, - cpu_limit=self.docker_config.cpu_limit, - memory_limit=self.docker_config.memory_limit, - extra_args=self.docker_config.extra_args, - ) - - if not container_id: - raise Exception("Failed to create Docker container") - - self.container_id = container_id - - # Wait for container to be ready - await self.docker_utils.wait_for_container_ready(container_id) - - # Handle specific setup based on mode - if self.docker_config.mode == "launch": - # In launch mode, we need to start socat and Chrome - await self.docker_utils.start_socat_in_container(container_id) - - # Build browser arguments - browser_args = self._build_browser_args() - - # Launch Chrome - await self.docker_utils.launch_chrome_in_container( - container_id, browser_args - ) - - # Get PIDs for later cleanup - self.chrome_process_id = ( - await self.docker_utils.get_process_id_in_container( - container_id, "chromium" - ) - ) - self.socat_process_id = ( - await self.docker_utils.get_process_id_in_container( - container_id, "socat" - ) - ) - - # Wait for CDP to be ready - cdp_json_config = await self.docker_utils.wait_for_cdp_ready(host_port) - - if cdp_json_config: - # Register the container in the shared registry - self.registry.register_container( - container_id, host_port, config_hash, cdp_json_config - ) - else: - raise Exception("Failed to get CDP JSON config from Docker container") - - if self.logger: - self.logger.success( - f"Docker container ready: {container_id[:12]} on port {host_port}", - tag="DOCKER", - ) - - # Return CDP URL - return f"http://localhost:{host_port}" - - def _build_browser_args(self) -> List[str]: - """Build Chrome command line arguments based on BrowserConfig. - - Returns: - List of command line arguments for Chrome - """ - # Call parent method to get common arguments - browser_args = super()._build_browser_args() - return browser_args["args"] + [ - f"--remote-debugging-port={self.internal_cdp_port}", - "--remote-debugging-address=0.0.0.0", # Allow external connections - "--disable-dev-shm-usage", - "--headless=new", - ] - - # args = [ - # "--no-sandbox", - # "--disable-gpu", - # f"--remote-debugging-port={self.internal_cdp_port}", - # "--remote-debugging-address=0.0.0.0", # Allow external connections - # "--disable-dev-shm-usage", - # ] - - # if self.config.headless: - # args.append("--headless=new") - - # if self.config.viewport_width and self.config.viewport_height: - # args.append(f"--window-size={self.config.viewport_width},{self.config.viewport_height}") - - # if self.config.user_agent: - # args.append(f"--user-agent={self.config.user_agent}") - - # if self.config.text_mode: - # args.extend([ - # "--blink-settings=imagesEnabled=false", - # "--disable-remote-fonts", - # "--disable-images", - # "--disable-javascript", - # ]) - - # if self.config.light_mode: - # # Import here to avoid circular import - # from ..utils import get_browser_disable_options - # args.extend(get_browser_disable_options()) - - # if self.config.user_data_dir: - # args.append(f"--user-data-dir={self.config.user_data_dir}") - - # if self.config.extra_args: - # args.extend(self.config.extra_args) - - # return args - - async def close(self): - """Close the browser and clean up Docker container if needed.""" - # Set flag to track if we were the ones initiating shutdown - initiated_shutdown = not self.shutting_down - # Storage persistence for Docker needs special handling - # We need to store state before calling super().close() which will close the browser - if ( - self.browser - and self.docker_config.user_data_dir - and self.docker_config.persistent - ): - for context in self.browser.contexts: - try: - # Ensure directory exists - os.makedirs(self.docker_config.user_data_dir, exist_ok=True) - - # Save storage state to user data directory - storage_path = os.path.join( - self.docker_config.user_data_dir, "storage_state.json" - ) - await context.storage_state(path=storage_path) - if self.logger: - self.logger.debug( - "Persisted Docker-specific storage state", tag="DOCKER" - ) - except Exception as e: - if self.logger: - self.logger.warning( - message="Failed to persist Docker storage state: {error}", - tag="DOCKER", - params={"error": str(e)}, - ) - - # Call parent method to handle common cleanup - await super().close() - - # Only perform container cleanup if we initiated shutdown - # and we need to handle Docker-specific resources - if initiated_shutdown: - # Only clean up container if not persistent - if self.container_id and not self.docker_config.persistent: - # Stop Chrome process in "launch" mode - if self.docker_config.mode == "launch" and self.chrome_process_id: - await self.docker_utils.stop_process_in_container( - self.container_id, self.chrome_process_id - ) - if self.logger: - self.logger.debug( - f"Stopped Chrome process {self.chrome_process_id} in container", - tag="DOCKER", - ) - - # Stop socat process in "launch" mode - if self.docker_config.mode == "launch" and self.socat_process_id: - await self.docker_utils.stop_process_in_container( - self.container_id, self.socat_process_id - ) - if self.logger: - self.logger.debug( - f"Stopped socat process {self.socat_process_id} in container", - tag="DOCKER", - ) - - # Remove or stop container based on configuration - if self.docker_config.remove_on_exit: - await self.docker_utils.remove_container(self.container_id) - # Unregister from registry - if hasattr(self, "registry") and self.registry: - self.registry.unregister_container(self.container_id) - if self.logger: - self.logger.debug( - f"Removed Docker container {self.container_id}", - tag="DOCKER", - ) - else: - await self.docker_utils.stop_container(self.container_id) - if self.logger: - self.logger.debug( - f"Stopped Docker container {self.container_id}", - tag="DOCKER", - ) - - self.container_id = None diff --git a/crawl4ai/browser/strategies/playwright.py b/crawl4ai/browser/strategies/playwright.py deleted file mode 100644 index bea99753..00000000 --- a/crawl4ai/browser/strategies/playwright.py +++ /dev/null @@ -1,134 +0,0 @@ -"""Browser strategies module for Crawl4AI. - -This module implements the browser strategy pattern for different -browser implementations, including Playwright, CDP, and builtin browsers. -""" - -import time -from typing import Optional, Tuple - -from playwright.async_api import BrowserContext, Page - -from ...async_logger import AsyncLogger -from ...async_configs import BrowserConfig, CrawlerRunConfig - -from playwright_stealth import StealthConfig - -from .base import BaseBrowserStrategy - -stealth_config = StealthConfig( - webdriver=True, - chrome_app=True, - chrome_csi=True, - chrome_load_times=True, - chrome_runtime=True, - navigator_languages=True, - navigator_plugins=True, - navigator_permissions=True, - webgl_vendor=True, - outerdimensions=True, - navigator_hardware_concurrency=True, - media_codecs=True, -) - -class PlaywrightBrowserStrategy(BaseBrowserStrategy): - """Standard Playwright browser strategy. - - This strategy launches a new browser instance using Playwright - and manages browser contexts. - """ - - def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None): - """Initialize the Playwright browser strategy. - - Args: - config: Browser configuration - logger: Logger for recording events and errors - """ - super().__init__(config, logger) - # No need to re-initialize sessions and session_ttl as they're now in the base class - - async def start(self): - """Start the browser instance. - - Returns: - self: For method chaining - """ - # Call the base class start to initialize Playwright - await super().start() - - # Build browser arguments using the base class method - browser_args = self._build_browser_args() - - try: - # Launch appropriate browser type - if self.config.browser_type == "firefox": - self.browser = await self.playwright.firefox.launch(**browser_args) - elif self.config.browser_type == "webkit": - self.browser = await self.playwright.webkit.launch(**browser_args) - else: - self.browser = await self.playwright.chromium.launch(**browser_args) - - self.default_context = self.browser - - if self.logger: - self.logger.debug(f"Launched {self.config.browser_type} browser", tag="BROWSER") - - except Exception as e: - if self.logger: - self.logger.error(f"Failed to launch browser: {str(e)}", tag="BROWSER") - raise - - return self - - async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - # Otherwise, check if we have an existing context for this config - config_signature = self._make_config_signature(crawlerRunConfig) - - async with self._contexts_lock: - if config_signature in self.contexts_by_config: - context = self.contexts_by_config[config_signature] - else: - # Create and setup a new context - context = await self.create_browser_context(crawlerRunConfig) - await self.setup_context(context, crawlerRunConfig) - self.contexts_by_config[config_signature] = context - - # Create a new page from the chosen context - page = await context.new_page() - - return page, context - - async def _get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]: - """Get a page for the given configuration. - - Args: - crawlerRunConfig: Configuration object for the crawler run - - Returns: - Tuple of (Page, BrowserContext) - """ - # Call parent method to ensure browser is started - await super().get_page(crawlerRunConfig) - - # Otherwise, check if we have an existing context for this config - config_signature = self._make_config_signature(crawlerRunConfig) - - async with self._contexts_lock: - if config_signature in self.contexts_by_config: - context = self.contexts_by_config[config_signature] - else: - # Create and setup a new context - context = await self.create_browser_context(crawlerRunConfig) - await self.setup_context(context, crawlerRunConfig) - self.contexts_by_config[config_signature] = context - - # Create a new page from the chosen context - page = await context.new_page() - - # If a session_id is specified, store this session so we can reuse later - if crawlerRunConfig.session_id: - self.sessions[crawlerRunConfig.session_id] = (context, page, time.time()) - - return page, context - diff --git a/crawl4ai/browser/utils.py b/crawl4ai/browser/utils.py deleted file mode 100644 index 421230bf..00000000 --- a/crawl4ai/browser/utils.py +++ /dev/null @@ -1,465 +0,0 @@ -"""Browser utilities module for Crawl4AI. - -This module provides utility functions for browser management, -including process management, CDP connection utilities, -and Playwright instance management. -""" - -import asyncio -import os -import sys -import time -import tempfile -import subprocess -from typing import Optional, Tuple, Union -import signal -import psutil - -from playwright.async_api import async_playwright - -from ..utils import get_chromium_path -from ..async_configs import BrowserConfig, CrawlerRunConfig - -from ..async_logger import AsyncLogger - - -_playwright_instance = None - -async def get_playwright(): - """Get or create the Playwright instance (singleton pattern). - - Returns: - Playwright: The Playwright instance - """ - global _playwright_instance - if _playwright_instance is None or True: - _playwright_instance = await async_playwright().start() - return _playwright_instance - -async def get_browser_executable(browser_type: str) -> str: - """Get the path to browser executable, with platform-specific handling. - - Args: - browser_type: Type of browser (chromium, firefox, webkit) - - Returns: - Path to browser executable - """ - return await get_chromium_path(browser_type) - -def create_temp_directory(prefix="browser-profile-") -> str: - """Create a temporary directory for browser data. - - Args: - prefix: Prefix for the temporary directory name - - Returns: - Path to the created temporary directory - """ - return tempfile.mkdtemp(prefix=prefix) - -def is_windows() -> bool: - """Check if the current platform is Windows. - - Returns: - True if Windows, False otherwise - """ - return sys.platform == "win32" - -def is_macos() -> bool: - """Check if the current platform is macOS. - - Returns: - True if macOS, False otherwise - """ - return sys.platform == "darwin" - -def is_linux() -> bool: - """Check if the current platform is Linux. - - Returns: - True if Linux, False otherwise - """ - return not (is_windows() or is_macos()) - -def is_browser_running(pid: Optional[int]) -> bool: - """Check if a process with the given PID is running. - - Args: - pid: Process ID to check - - Returns: - bool: True if the process is running, False otherwise - """ - if not pid: - return False - - try: - if type(pid) is str: - pid = int(pid) - # Check if the process exists - if is_windows(): - process = subprocess.run(["tasklist", "/FI", f"PID eq {pid}"], - capture_output=True, text=True) - return str(pid) in process.stdout - else: - # Unix-like systems - os.kill(pid, 0) # This doesn't actually kill the process, just checks if it exists - return True - except (ProcessLookupError, PermissionError, OSError): - return False - -def get_browser_disable_options() -> list: - """Get standard list of browser disable options for performance. - - Returns: - List of command-line options to disable various browser features - """ - return [ - "--disable-background-networking", - "--disable-background-timer-throttling", - "--disable-backgrounding-occluded-windows", - "--disable-breakpad", - "--disable-client-side-phishing-detection", - "--disable-component-extensions-with-background-pages", - "--disable-default-apps", - "--disable-extensions", - "--disable-features=TranslateUI", - "--disable-hang-monitor", - "--disable-ipc-flooding-protection", - "--disable-popup-blocking", - "--disable-prompt-on-repost", - "--disable-sync", - "--force-color-profile=srgb", - "--metrics-recording-only", - "--no-first-run", - "--password-store=basic", - "--use-mock-keychain", - ] - - -async def find_optimal_browser_config(total_urls=50, verbose=True, rate_limit_delay=0.2): - """Find optimal browser configuration for crawling a specific number of URLs. - - Args: - total_urls: Number of URLs to crawl - verbose: Whether to print progress - rate_limit_delay: Delay between page loads to avoid rate limiting - - Returns: - dict: Contains fastest, lowest_memory, and optimal configurations - """ - from .manager import BrowserManager - if verbose: - print(f"\n=== Finding optimal configuration for crawling {total_urls} URLs ===\n") - - # Generate test URLs with timestamp to avoid caching - timestamp = int(time.time()) - urls = [f"https://example.com/page_{i}?t={timestamp}" for i in range(total_urls)] - - # Limit browser configurations to test (1 browser to max 10) - max_browsers = min(10, total_urls) - configs_to_test = [] - - # Generate configurations (browser count, pages distribution) - for num_browsers in range(1, max_browsers + 1): - base_pages = total_urls // num_browsers - remainder = total_urls % num_browsers - - # Create distribution array like [3, 3, 2, 2] (some browsers get one more page) - if remainder > 0: - distribution = [base_pages + 1] * remainder + [base_pages] * (num_browsers - remainder) - else: - distribution = [base_pages] * num_browsers - - configs_to_test.append((num_browsers, distribution)) - - results = [] - - # Test each configuration - for browser_count, page_distribution in configs_to_test: - if verbose: - print(f"Testing {browser_count} browsers with distribution {tuple(page_distribution)}") - - try: - # Track memory if possible - try: - import psutil - process = psutil.Process() - start_memory = process.memory_info().rss / (1024 * 1024) # MB - except ImportError: - if verbose: - print("Memory tracking not available (psutil not installed)") - start_memory = 0 - - # Start browsers in parallel - managers = [] - start_tasks = [] - start_time = time.time() - - logger = AsyncLogger(verbose=True, log_file=None) - - for i in range(browser_count): - config = BrowserConfig(headless=True) - manager = BrowserManager(browser_config=config, logger=logger) - start_tasks.append(manager.start()) - managers.append(manager) - - await asyncio.gather(*start_tasks) - - # Distribute URLs among browsers - urls_per_manager = {} - url_index = 0 - - for i, manager in enumerate(managers): - pages_for_this_browser = page_distribution[i] - end_index = url_index + pages_for_this_browser - urls_per_manager[manager] = urls[url_index:end_index] - url_index = end_index - - # Create pages for each browser - all_pages = [] - for manager, manager_urls in urls_per_manager.items(): - if not manager_urls: - continue - pages = await manager.get_pages(CrawlerRunConfig(), count=len(manager_urls)) - all_pages.extend(zip(pages, manager_urls)) - - # Crawl pages with delay to avoid rate limiting - async def crawl_page(page_ctx, url): - page, _ = page_ctx - try: - await page.goto(url) - if rate_limit_delay > 0: - await asyncio.sleep(rate_limit_delay) - title = await page.title() - return title - finally: - await page.close() - - crawl_start = time.time() - crawl_tasks = [crawl_page(page_ctx, url) for page_ctx, url in all_pages] - await asyncio.gather(*crawl_tasks) - crawl_time = time.time() - crawl_start - total_time = time.time() - start_time - - # Measure final memory usage - if start_memory > 0: - end_memory = process.memory_info().rss / (1024 * 1024) - memory_used = end_memory - start_memory - else: - memory_used = 0 - - # Close all browsers - for manager in managers: - await manager.close() - - # Calculate metrics - pages_per_second = total_urls / crawl_time - - # Calculate efficiency score (higher is better) - # This balances speed vs memory - if memory_used > 0: - efficiency = pages_per_second / (memory_used + 1) - else: - efficiency = pages_per_second - - # Store result - result = { - "browser_count": browser_count, - "distribution": tuple(page_distribution), - "crawl_time": crawl_time, - "total_time": total_time, - "memory_used": memory_used, - "pages_per_second": pages_per_second, - "efficiency": efficiency - } - - results.append(result) - - if verbose: - print(f" ✓ Crawled {total_urls} pages in {crawl_time:.2f}s ({pages_per_second:.1f} pages/sec)") - if memory_used > 0: - print(f" ✓ Memory used: {memory_used:.1f}MB ({memory_used/total_urls:.1f}MB per page)") - print(f" ✓ Efficiency score: {efficiency:.4f}") - - except Exception as e: - if verbose: - print(f" ✗ Error: {str(e)}") - - # Clean up - for manager in managers: - try: - await manager.close() - except: - pass - - # If no successful results, return None - if not results: - return None - - # Find best configurations - fastest = sorted(results, key=lambda x: x["crawl_time"])[0] - - # Only consider memory if available - memory_results = [r for r in results if r["memory_used"] > 0] - if memory_results: - lowest_memory = sorted(memory_results, key=lambda x: x["memory_used"])[0] - else: - lowest_memory = fastest - - # Find most efficient (balanced speed vs memory) - optimal = sorted(results, key=lambda x: x["efficiency"], reverse=True)[0] - - # Print summary - if verbose: - print("\n=== OPTIMAL CONFIGURATIONS ===") - print(f"⚡ Fastest: {fastest['browser_count']} browsers {fastest['distribution']}") - print(f" {fastest['crawl_time']:.2f}s, {fastest['pages_per_second']:.1f} pages/sec") - - print(f"💾 Memory-efficient: {lowest_memory['browser_count']} browsers {lowest_memory['distribution']}") - if lowest_memory["memory_used"] > 0: - print(f" {lowest_memory['memory_used']:.1f}MB, {lowest_memory['memory_used']/total_urls:.2f}MB per page") - - print(f"🌟 Balanced optimal: {optimal['browser_count']} browsers {optimal['distribution']}") - print(f" {optimal['crawl_time']:.2f}s, {optimal['pages_per_second']:.1f} pages/sec, score: {optimal['efficiency']:.4f}") - - return { - "fastest": fastest, - "lowest_memory": lowest_memory, - "optimal": optimal, - "all_configs": results - } - - -# Find process ID of the existing browser using os -def find_process_by_port(port: int) -> str: - """Find process ID listening on a specific port. - - Args: - port: Port number to check - - Returns: - str: Process ID or empty string if not found - """ - try: - if is_windows(): - cmd = f"netstat -ano | findstr :{port}" - result = subprocess.check_output(cmd, shell=True).decode() - return result.strip().split()[-1] if result else "" - else: - cmd = f"lsof -i :{port} -t" - return subprocess.check_output(cmd, shell=True).decode().strip() - except subprocess.CalledProcessError: - return "" - -async def check_process_is_running(process: subprocess.Popen, delay: float = 0.5) -> Tuple[bool, Optional[int], bytes, bytes]: - """Perform a quick check to make sure the browser started successfully.""" - if not process: - return False, None, b"", b"" - - # Check that process started without immediate termination - await asyncio.sleep(delay) - if process.poll() is not None: - # Process already terminated - stdout, stderr = b"", b"" - try: - stdout, stderr = process.communicate(timeout=0.5) - except subprocess.TimeoutExpired: - pass - - return False, process.returncode, stdout, stderr - - - return True, 0, b"", b"" - - -def terminate_process( - pid: Union[int, str], - timeout: float = 5.0, - force_kill_timeout: float = 3.0, - logger = None -) -> Tuple[bool, Optional[str]]: - """ - Robustly terminate a process across platforms with verification. - - Args: - pid: Process ID to terminate (int or string) - timeout: Seconds to wait for graceful termination before force killing - force_kill_timeout: Seconds to wait after force kill before considering it failed - logger: Optional logger object with error, warning, and info methods - - Returns: - Tuple of (success: bool, error_message: Optional[str]) - """ - # Convert pid to int if it's a string - if isinstance(pid, str): - try: - pid = int(pid) - except ValueError: - error_msg = f"Invalid PID format: {pid}" - if logger: - logger.error(error_msg) - return False, error_msg - - # Check if process exists - if not psutil.pid_exists(pid): - return True, None # Process already terminated - - try: - process = psutil.Process(pid) - - # First attempt: graceful termination - if logger: - logger.info(f"Attempting graceful termination of process {pid}") - - if os.name == 'nt': # Windows - subprocess.run(["taskkill", "/PID", str(pid)], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=False) - else: # Unix/Linux/MacOS - process.send_signal(signal.SIGTERM) - - # Wait for process to terminate - try: - process.wait(timeout=timeout) - if logger: - logger.info(f"Process {pid} terminated gracefully") - return True, None - except psutil.TimeoutExpired: - if logger: - logger.warning(f"Process {pid} did not terminate gracefully within {timeout} seconds, forcing termination") - - # Second attempt: force kill - if os.name == 'nt': # Windows - subprocess.run(["taskkill", "/F", "/PID", str(pid)], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=False) - else: # Unix/Linux/MacOS - process.send_signal(signal.SIGKILL) - - # Verify process is killed - gone, alive = psutil.wait_procs([process], timeout=force_kill_timeout) - if process in alive: - error_msg = f"Failed to kill process {pid} even after force kill" - if logger: - logger.error(error_msg) - return False, error_msg - - if logger: - logger.info(f"Process {pid} terminated by force") - return True, None - - except psutil.NoSuchProcess: - # Process terminated while we were working with it - if logger: - logger.info(f"Process {pid} already terminated") - return True, None - - except Exception as e: - error_msg = f"Error terminating process {pid}: {str(e)}" - if logger: - logger.error(error_msg) - return False, error_msg \ No newline at end of file