refactor(browser): consolidate browser strategy implementations

Moves common browser functionality into BaseBrowserStrategy class to reduce code duplication and improve maintainability. Key changes:
- Adds shared browser argument building and session management to base class
- Standardizes storage state handling across strategies
- Improves process cleanup and error handling
- Consolidates CDP URL management and container lifecycle

BREAKING CHANGE: Changes browser_mode="custom" to "cdp" for consistency
This commit is contained in:
UncleCode
2025-03-28 22:47:28 +08:00
parent 64f20ab44a
commit 3ff7eec8f3
12 changed files with 1102 additions and 671 deletions

View File

@@ -29,7 +29,7 @@ from enum import Enum
from .proxy_strategy import ProxyConfig
try:
from .browser.docker_config import DockerConfig
from .browser.models import DockerConfig
except ImportError:
DockerConfig = None
@@ -176,7 +176,7 @@ class BrowserConfig:
browser_mode (str): Determines how the browser should be initialized:
"builtin" - use the builtin CDP browser running in background
"dedicated" - create a new dedicated browser instance each time
"custom" - use explicit CDP settings provided in cdp_url
"cdp" - use explicit CDP settings provided in cdp_url
"docker" - run browser in Docker container with isolation
Default: "dedicated"
use_managed_browser (bool): Launch the browser using a managed approach (e.g., via CDP), allowing
@@ -242,7 +242,7 @@ class BrowserConfig:
channel: str = "chromium",
proxy: str = None,
proxy_config: Union[ProxyConfig, dict, None] = None,
docker_config: Union["DockerConfig", dict, None] = None,
docker_config: Union[DockerConfig, dict, None] = None,
viewport_width: int = 1080,
viewport_height: int = 600,
viewport: dict = None,
@@ -289,6 +289,10 @@ class BrowserConfig:
self.docker_config = DockerConfig.from_kwargs(docker_config)
else:
self.docker_config = docker_config
if self.docker_config:
self.user_data_dir = self.docker_config.user_data_dir
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.viewport = viewport

View File

@@ -80,7 +80,7 @@ class BrowserManager:
)
return PlaywrightBrowserStrategy(self.config, self.logger)
return DockerBrowserStrategy(self.config, self.logger)
elif self.config.cdp_url or self.config.use_managed_browser:
elif self.config.browser_mode == "cdp" or self.config.cdp_url or self.config.use_managed_browser:
return CDPBrowserStrategy(self.config, self.logger)
else:
return PlaywrightBrowserStrategy(self.config, self.logger)
@@ -159,16 +159,12 @@ class BrowserManager:
session_id: The session ID to kill
"""
# Handle kill_session via our strategy if it supports it
if hasattr(self._strategy, '_kill_session'):
await self._strategy._kill_session(session_id)
elif session_id in self.sessions:
context, page, _ = self.sessions[session_id]
await page.close()
# Only close context if not using CDP
if not self.config.use_managed_browser and not self.config.cdp_url and not self.config.browser_mode == "builtin":
await context.close()
del self.sessions[session_id]
await self._strategy.kill_session(session_id)
# sync sessions if needed
if hasattr(self._strategy, 'sessions'):
self.sessions = self._strategy.sessions
def _cleanup_expired_sessions(self):
"""Clean up expired sessions based on TTL."""
# Use strategy's implementation if available

View File

@@ -8,6 +8,8 @@ from abc import ABC, abstractmethod
import asyncio
import json
import hashlib
import os
import time
from typing import Optional, Tuple, List
from playwright.async_api import BrowserContext, Page
@@ -16,14 +18,31 @@ from ...async_logger import AsyncLogger
from ...async_configs import BrowserConfig, CrawlerRunConfig
from ...config import DOWNLOAD_PAGE_TIMEOUT
from ...js_snippet import load_js_script
from ..utils import get_playwright
class BaseBrowserStrategy(ABC):
"""Base class for all browser strategies.
This abstract class defines the interface that all browser strategies
must implement. It handles common functionality like context caching.
must implement. It handles common functionality like context caching,
browser configuration, and session management.
"""
_playwright_instance = None
@classmethod
async def get_playwright(cls):
"""Get or create a shared Playwright instance.
Returns:
Playwright: The shared Playwright instance
"""
# For now I dont want Singleton pattern for Playwright
if cls._playwright_instance is None or True:
cls._playwright_instance = await get_playwright()
return cls._playwright_instance
def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None):
"""Initialize the strategy with configuration and logger.
@@ -35,23 +54,40 @@ class BaseBrowserStrategy(ABC):
self.logger = logger
self.browser = None
self.default_context = None
self.contexts_by_config = {}
self._contexts_lock = asyncio.Lock()
self.playwright = None
# Context management
self.contexts_by_config = {} # config_signature -> context
self._contexts_lock = asyncio.Lock()
# Session management
self.sessions = {}
self.session_ttl = 1800 # 30 minutes default
# Playwright instance
self.playwright = None
@abstractmethod
async def start(self):
"""Start the browser.
This method should be implemented by concrete strategies to initialize
the browser in the appropriate way (direct launch, CDP connection, etc.)
Returns:
self: For method chaining
"""
pass
# Base implementation gets the playwright instance
self.playwright = await self.get_playwright()
return self
@abstractmethod
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page with specified configuration.
This method should be implemented by concrete strategies to create
or retrieve a page according to their browser management approach.
Args:
crawlerRunConfig: Crawler run configuration
@@ -75,15 +111,122 @@ class BaseBrowserStrategy(ABC):
page, context = await self.get_page(crawlerRunConfig)
pages.append((page, context))
return pages
@abstractmethod
async def close(self):
"""Close the browser and clean up resources."""
pass
def _build_browser_args(self) -> dict:
"""Build browser launch arguments from config.
Returns:
dict: Browser launch arguments for Playwright
"""
# Define common browser arguments that improve performance and stability
args = [
"--disable-gpu",
"--disable-gpu-compositing",
"--disable-software-rasterizer",
"--no-sandbox",
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
"--disable-blink-features=AutomationControlled",
"--window-position=400,0",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection",
"--force-color-profile=srgb",
"--mute-audio",
"--disable-background-timer-throttling",
f"--window-size={self.config.viewport_width},{self.config.viewport_height}",
]
# Define browser disable options for light mode
browser_disable_options = [
"--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-default-apps",
"--disable-extensions",
"--disable-features=TranslateUI",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-sync",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain",
]
# Apply light mode settings if enabled
if self.config.light_mode:
args.extend(browser_disable_options)
# Apply text mode settings if enabled (disables images, JS, etc)
if self.config.text_mode:
args.extend([
"--blink-settings=imagesEnabled=false",
"--disable-remote-fonts",
"--disable-images",
"--disable-javascript",
"--disable-software-rasterizer",
"--disable-dev-shm-usage",
])
# Add any extra arguments from the config
if self.config.extra_args:
args.extend(self.config.extra_args)
# Build the core browser args dictionary
browser_args = {"headless": self.config.headless, "args": args}
# Add chrome channel if specified
if self.config.chrome_channel:
browser_args["channel"] = self.config.chrome_channel
# Configure downloads
if self.config.accept_downloads:
browser_args["downloads_path"] = self.config.downloads_path or os.path.join(
os.getcwd(), "downloads"
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
# Check for user data directory
if self.config.user_data_dir:
# Ensure the directory exists
os.makedirs(self.config.user_data_dir, exist_ok=True)
browser_args["user_data_dir"] = self.config.user_data_dir
# Configure proxy settings
if self.config.proxy or self.config.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
)
browser_args["proxy"] = proxy_settings
return browser_args
def _make_config_signature(self, crawlerRunConfig: CrawlerRunConfig) -> str:
"""Create a signature hash from configuration for context caching.
Converts the crawlerRunConfig into a dict, excludes ephemeral fields,
then returns a hash of the sorted JSON. This yields a stable signature
that identifies configurations requiring a unique browser context.
Args:
crawlerRunConfig: Crawler run configuration
@@ -157,6 +300,7 @@ class BaseBrowserStrategy(ABC):
"viewport": viewport_settings,
"proxy": proxy_settings,
"accept_downloads": self.config.accept_downloads,
"storage_state": self.config.storage_state,
"ignore_https_errors": self.config.ignore_https_errors,
"device_scale_factor": 1.0,
"java_script_enabled": self.config.java_script_enabled,
@@ -167,8 +311,7 @@ class BaseBrowserStrategy(ABC):
text_mode_settings = {
"has_touch": False,
"is_mobile": False,
# Disable javascript in text mode
"java_script_enabled": False
"java_script_enabled": False, # Disable javascript in text mode
}
# Update context settings with text mode settings
context_settings.update(text_mode_settings)
@@ -177,16 +320,25 @@ class BaseBrowserStrategy(ABC):
# Handle storage state properly - this is key for persistence
if self.config.storage_state:
context_settings["storage_state"] = self.config.storage_state
if self.logger:
if isinstance(self.config.storage_state, str):
self.logger.debug(f"Using storage state from file: {self.config.storage_state}", tag="BROWSER")
else:
self.logger.debug("Using storage state from config object", tag="BROWSER")
# If user_data_dir is specified, browser persistence should be automatic
if self.config.user_data_dir and self.logger:
self.logger.debug(f"Using user data directory: {self.config.user_data_dir}", tag="BROWSER")
if self.config.user_data_dir:
# For CDP-based browsers, storage persistence is typically handled by the user_data_dir
# at the browser level, but we'll create a storage_state location for Playwright as well
storage_path = os.path.join(self.config.user_data_dir, "storage_state.json")
if not os.path.exists(storage_path):
# Create parent directory if it doesn't exist
os.makedirs(os.path.dirname(storage_path), exist_ok=True)
with open(storage_path, "w") as f:
json.dump({}, f)
self.config.storage_state = storage_path
if self.logger:
self.logger.debug(f"Using user data directory: {self.config.user_data_dir}", tag="BROWSER")
# Apply crawler-specific configurations if provided
if crawlerRunConfig:
@@ -227,12 +379,19 @@ class BaseBrowserStrategy(ABC):
context: The browser context to set up
crawlerRunConfig: Configuration object containing all browser settings
"""
# Set HTTP headers
if self.config.headers:
await context.set_extra_http_headers(self.config.headers)
# Add cookies
if self.config.cookies:
await context.add_cookies(self.config.cookies)
# Apply storage state if provided
if self.config.storage_state:
await context.storage_state(path=None)
# Configure downloads
if self.config.accept_downloads:
context.set_default_timeout(DOWNLOAD_PAGE_TIMEOUT)
context.set_default_navigation_timeout(DOWNLOAD_PAGE_TIMEOUT)
@@ -250,12 +409,13 @@ class BaseBrowserStrategy(ABC):
await context.set_extra_http_headers(combined_headers)
# Add default cookie
target_url = (crawlerRunConfig and crawlerRunConfig.url) or "https://crawl4ai.com/"
await context.add_cookies(
[
{
"name": "cookiesEnabled",
"value": "true",
"url": crawlerRunConfig and crawlerRunConfig.url or "https://crawl4ai.com/",
"url": target_url,
}
]
)
@@ -268,3 +428,150 @@ class BaseBrowserStrategy(ABC):
or crawlerRunConfig.magic
):
await context.add_init_script(load_js_script("navigator_overrider"))
async def kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
Args:
session_id (str): The session ID to kill.
"""
if session_id not in self.sessions:
return
context, page, _ = self.sessions[session_id]
# Close the page
try:
await page.close()
except Exception as e:
if self.logger:
self.logger.error(f"Error closing page for session {session_id}: {str(e)}", tag="BROWSER")
# Remove session from tracking
del self.sessions[session_id]
# Clean up any contexts that no longer have pages
await self._cleanup_unused_contexts()
if self.logger:
self.logger.debug(f"Killed session: {session_id}", tag="BROWSER")
async def _cleanup_unused_contexts(self):
"""Clean up contexts that no longer have any pages."""
async with self._contexts_lock:
# Get all contexts we're managing
contexts_to_check = list(self.contexts_by_config.values())
for context in contexts_to_check:
# Check if the context has any pages left
if not context.pages:
# No pages left, we can close this context
config_signature = next((sig for sig, ctx in self.contexts_by_config.items()
if ctx == context), None)
if config_signature:
try:
await context.close()
del self.contexts_by_config[config_signature]
if self.logger:
self.logger.debug(f"Closed unused context", tag="BROWSER")
except Exception as e:
if self.logger:
self.logger.error(f"Error closing unused context: {str(e)}", tag="BROWSER")
def _cleanup_expired_sessions(self):
"""Clean up expired sessions based on TTL."""
current_time = time.time()
expired_sessions = [
sid
for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
if self.logger:
self.logger.debug(f"Session expired: {sid}", tag="BROWSER")
asyncio.create_task(self.kill_session(sid))
async def close(self):
"""Close the browser and clean up resources.
This method handles common cleanup tasks like:
1. Persisting storage state if a user_data_dir is configured
2. Closing all sessions
3. Closing all browser contexts
4. Closing the browser
5. Stopping Playwright
Child classes should override this method to add their specific cleanup logic,
but should call super().close() to ensure common cleanup tasks are performed.
"""
# Set a flag to prevent race conditions during cleanup
self.shutting_down = True
try:
# Add brief delay if configured
if self.config.sleep_on_close:
await asyncio.sleep(0.5)
# Persist storage state if using a user data directory
if self.config.user_data_dir and self.browser:
for context in self.browser.contexts:
try:
# Ensure the directory exists
storage_dir = os.path.join(self.config.user_data_dir, "Default")
os.makedirs(storage_dir, exist_ok=True)
# Save storage state
storage_path = os.path.join(storage_dir, "storage_state.json")
await context.storage_state(path=storage_path)
if self.logger:
self.logger.debug("Storage state persisted before closing browser", tag="BROWSER")
except Exception as e:
if self.logger:
self.logger.warning(
message="Failed to ensure storage persistence: {error}",
tag="BROWSER",
params={"error": str(e)}
)
# Close all active sessions
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self.kill_session(session_id)
# Close all cached contexts
for ctx in self.contexts_by_config.values():
try:
await ctx.close()
except Exception as e:
if self.logger:
self.logger.error(
message="Error closing context: {error}",
tag="BROWSER",
params={"error": str(e)}
)
self.contexts_by_config.clear()
# Close the browser if it exists
if self.browser:
await self.browser.close()
self.browser = None
# Stop playwright
if self.playwright:
await self.playwright.stop()
self.playwright = None
except Exception as e:
if self.logger:
self.logger.error(
message="Error during browser cleanup: {error}",
tag="BROWSER",
params={"error": str(e)}
)
finally:
# Reset shutting down flag
self.shutting_down = False

View File

@@ -5,16 +5,20 @@ import json
import subprocess
import shutil
import signal
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Tuple
from ...async_logger import AsyncLogger
from ...async_configs import CrawlerRunConfig
from playwright.async_api import Page, BrowserContext
from ...async_logger import AsyncLogger
from ...async_configs import BrowserConfig
from ...utils import get_home_folder
from ..utils import get_browser_executable, is_windows, is_browser_running
from ..utils import get_browser_executable, is_windows, is_browser_running, find_process_by_port, terminate_process
from .cdp import CDPBrowserStrategy
from .base import BaseBrowserStrategy
class BuiltinBrowserStrategy(CDPBrowserStrategy):
"""Built-in browser strategy.
@@ -67,29 +71,66 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
Returns:
self: For method chaining
"""
# Check for existing built-in browser (get_browser_info already checks if running)
browser_info = self.get_browser_info()
if browser_info:
if self.logger:
self.logger.info(f"Using existing built-in browser at {browser_info.get('cdp_url')}", tag="BROWSER")
self.config.cdp_url = browser_info.get('cdp_url')
else:
if self.logger:
self.logger.info("Built-in browser not found, launching new instance...", tag="BROWSER")
cdp_url = await self.launch_builtin_browser(
browser_type=self.config.browser_type,
debugging_port=self.config.debugging_port,
headless=self.config.headless,
)
if not cdp_url:
if self.logger:
self.logger.warning("Failed to launch built-in browser, falling back to regular CDP strategy", tag="BROWSER")
return await super().start()
self.config.cdp_url = cdp_url
# Initialize Playwright instance via base class method
await BaseBrowserStrategy.start(self)
# Call parent class implementation with updated CDP URL
return await super().start()
try:
# Check for existing built-in browser (get_browser_info already checks if running)
browser_info = self.get_browser_info()
if browser_info:
if self.logger:
self.logger.info(f"Using existing built-in browser at {browser_info.get('cdp_url')}", tag="BROWSER")
self.config.cdp_url = browser_info.get('cdp_url')
else:
if self.logger:
self.logger.info("Built-in browser not found, launching new instance...", tag="BROWSER")
cdp_url = await self.launch_builtin_browser(
browser_type=self.config.browser_type,
debugging_port=self.config.debugging_port,
headless=self.config.headless,
)
if not cdp_url:
if self.logger:
self.logger.warning("Failed to launch built-in browser, falling back to regular CDP strategy", tag="BROWSER")
# Call CDP's start but skip BaseBrowserStrategy.start() since we already called it
return await CDPBrowserStrategy.start(self)
self.config.cdp_url = cdp_url
# Connect to the browser using CDP protocol
self.browser = await self.playwright.chromium.connect_over_cdp(self.config.cdp_url)
# Get or create default context
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
self.default_context = await self.create_browser_context()
await self.setup_context(self.default_context)
if self.logger:
self.logger.debug(f"Connected to built-in browser at {self.config.cdp_url}", tag="BUILTIN")
return self
except Exception as e:
if self.logger:
self.logger.error(f"Failed to start built-in browser: {str(e)}", tag="BUILTIN")
raise
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Inherits behavior from CDPBrowserStrategy for page management.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
Tuple of (Page, BrowserContext)
"""
# For built-in browsers, we use the same page management as CDP strategy
return await super().get_page(crawlerRunConfig)
@classmethod
def get_builtin_browser_info(cls, debugging_port: int, config_file: str, logger: Optional[AsyncLogger] = None) -> Optional[Dict[str, Any]]:
"""Get information about the built-in browser for a specific debugging port.
@@ -116,7 +157,31 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
browser_info = browser_info_dict["port_map"][port_str]
# Check if the browser is still running
if not is_browser_running(browser_info.get('pid')):
pids = browser_info.get('pid')
if type(pids) == str and len(pids.split("\n")) > 1:
pids = [int(pid) for pid in pids.split("\n") if pid.isdigit()]
elif type(pids) == str and pids.isdigit():
pids = [int(pids)]
elif type(pids) == int:
pids = [pids]
else:
pids = []
# Check if any of the PIDs are running
if not pids:
if logger:
logger.warning(f"Built-in browser on port {debugging_port} has no valid PID", tag="BUILTIN")
# Remove this port from the dictionary
del browser_info_dict["port_map"][port_str]
with open(config_file, 'w') as f:
json.dump(browser_info_dict, f, indent=2)
return None
# Check if any of the PIDs are running
for pid in pids:
if is_browser_running(pid):
browser_info['pid'] = pid
break
else:
# If none of the PIDs are running, remove this port from the dictionary
if logger:
logger.warning(f"Built-in browser on port {debugging_port} is not running", tag="BUILTIN")
# Remove this port from the dictionary
@@ -146,7 +211,6 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
logger=self.logger
)
async def launch_builtin_browser(self,
browser_type: str = "chromium",
debugging_port: int = 9222,
@@ -207,57 +271,50 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
return None
try:
# Start the browser process detached
if is_windows():
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
)
else:
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setpgrp # Start in a new process group
)
# Wait briefly to ensure the process starts successfully
await asyncio.sleep(2.0)
# Check if the process is still running
if process.poll() is not None:
if self.logger:
self.logger.error(f"Browser process exited immediately with code {process.returncode}", tag="BUILTIN")
return None
# Construct CDP URL
# Check if the port is already in use
PID = ""
cdp_url = f"http://localhost:{debugging_port}"
# Try to verify browser is responsive by fetching version info
import aiohttp
json_url = f"{cdp_url}/json/version"
config_json = None
try:
async with aiohttp.ClientSession() as session:
for _ in range(10): # Try multiple times
try:
async with session.get(json_url) as response:
if response.status == 200:
config_json = await response.json()
break
except Exception:
pass
await asyncio.sleep(0.5)
except Exception as e:
config_json = await self._check_port_in_use(cdp_url)
if config_json:
if self.logger:
self.logger.warning(f"Could not verify browser: {str(e)}", tag="BUILTIN")
self.logger.info(f"Port {debugging_port} is already in use.", tag="BUILTIN")
PID = find_process_by_port(debugging_port)
else:
# Start the browser process detached
process = None
if is_windows():
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
)
else:
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setpgrp # Start in a new process group
)
# Wait briefly to ensure the process starts successfully
await asyncio.sleep(2.0)
# Check if the process is still running
if process and process.poll() is not None:
if self.logger:
self.logger.error(f"Browser process exited immediately with code {process.returncode}", tag="BUILTIN")
return None
PID = process.pid
# Construct CDP URL
config_json = await self._check_port_in_use(cdp_url)
# Create browser info
browser_info = {
'pid': process.pid,
'pid': PID,
'cdp_url': cdp_url,
'user_data_dir': user_data_dir,
'browser_type': browser_type,
@@ -304,7 +361,37 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
if self.logger:
self.logger.error(f"Error launching built-in browser: {str(e)}", tag="BUILTIN")
return None
async def _check_port_in_use(self, cdp_url: str) -> dict:
"""Check if a port is already in use by a Chrome DevTools instance.
Args:
cdp_url: The CDP URL to check
Returns:
dict: Chrome DevTools protocol version information or None if not found
"""
import aiohttp
json_url = f"{cdp_url}/json/version"
json_config = None
try:
async with aiohttp.ClientSession() as session:
try:
async with session.get(json_url, timeout=2.0) as response:
if response.status == 200:
json_config = await response.json()
if self.logger:
self.logger.debug(f"Found CDP server running at {cdp_url}", tag="BUILTIN")
return json_config
except (aiohttp.ClientError, asyncio.TimeoutError):
pass
return None
except Exception as e:
if self.logger:
self.logger.debug(f"Error checking CDP port: {str(e)}", tag="BUILTIN")
return None
async def kill_builtin_browser(self) -> bool:
"""Kill the built-in browser if it's running.
@@ -321,20 +408,8 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
if not pid:
return False
try:
if is_windows():
subprocess.run(["taskkill", "/F", "/PID", str(pid)], check=True)
else:
os.kill(pid, signal.SIGTERM)
# Wait for termination
for _ in range(5):
if not is_browser_running(pid):
break
await asyncio.sleep(0.5)
else:
# Force kill if still running
os.kill(pid, signal.SIGKILL)
success, error_msg = terminate_process(pid, logger=self.logger)
if success:
# Update config file to remove this browser
with open(self.builtin_config_file, 'r') as f:
browser_info_dict = json.load(f)
@@ -355,9 +430,9 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
if self.logger:
self.logger.success("Built-in browser terminated", tag="BUILTIN")
return True
except Exception as e:
else:
if self.logger:
self.logger.error(f"Error killing built-in browser: {str(e)}", tag="BUILTIN")
self.logger.error(f"Error killing built-in browser: {error_msg}", tag="BUILTIN")
return False
async def get_builtin_browser_status(self) -> Dict[str, Any]:
@@ -383,12 +458,16 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
'port': self.config.debugging_port
}
# Override the close method to handle built-in browser cleanup
async def close(self):
"""Close the built-in browser and clean up resources."""
# Store the shutting_down state
was_shutting_down = getattr(self, 'shutting_down', False)
# Call parent class close method
await super().close()
# Clean up built-in browser if we created it
if self.shutting_down:
# Clean up built-in browser if we created it and were in shutdown mode
if was_shutting_down:
await self.kill_builtin_browser()
if self.logger:
self.logger.debug("Killed built-in browser during shutdown", tag="BUILTIN")

View File

@@ -16,7 +16,7 @@ from playwright.async_api import BrowserContext, Page
from ...async_logger import AsyncLogger
from ...async_configs import BrowserConfig, CrawlerRunConfig
from ..utils import get_playwright, get_browser_executable, create_temp_directory, is_windows
from ..utils import get_playwright, get_browser_executable, create_temp_directory, is_windows, check_process_is_running, terminate_process
from .base import BaseBrowserStrategy
@@ -47,22 +47,34 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
Returns:
self: For method chaining
"""
self.playwright = await get_playwright()
# Call the base class start to initialize Playwright
await super().start()
# Get or create CDP URL
cdp_url = await self._get_or_create_cdp_url()
# Connect to the browser using CDP
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
# Get or create default context
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
self.default_context = await self.create_browser_context()
await self.setup_context(self.default_context)
try:
# Get or create CDP URL
cdp_url = await self._get_or_create_cdp_url()
# Connect to the browser using CDP
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
# Get or create default context
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
self.default_context = await self.create_browser_context()
await self.setup_context(self.default_context)
if self.logger:
self.logger.debug(f"Connected to CDP browser at {cdp_url}", tag="CDP")
except Exception as e:
if self.logger:
self.logger.error(f"Failed to connect to CDP browser: {str(e)}", tag="CDP")
# Clean up any resources before re-raising
await self._cleanup_process()
raise
return self
async def _get_or_create_cdp_url(self) -> str:
@@ -105,39 +117,25 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
)
# Monitor for a short time to make sure it starts properly
await asyncio.sleep(0.5) # Give browser time to start
await self._initial_startup_check()
await asyncio.sleep(2) # Give browser more time to start
is_running, return_code, stdout, stderr = await check_process_is_running(self.browser_process, delay=2)
if not is_running:
if self.logger:
self.logger.error(
message="Browser process terminated unexpectedly | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": return_code,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
},
)
await self._cleanup_process()
raise Exception("Browser process terminated unexpectedly")
return f"http://localhost:{self.config.debugging_port}"
except Exception as e:
await self._cleanup_process()
raise Exception(f"Failed to start browser: {e}")
async def _initial_startup_check(self):
"""Perform a quick check to make sure the browser started successfully."""
if not self.browser_process:
return
# Check that process started without immediate termination
await asyncio.sleep(0.5)
if self.browser_process.poll() is not None:
# Process already terminated
stdout, stderr = b"", b""
try:
stdout, stderr = self.browser_process.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
pass
if self.logger:
self.logger.error(
message="Browser process terminated during startup | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": self.browser_process.returncode,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
},
)
raise Exception(f"Failed to start browser: {e}")
async def _get_browser_args(self, user_data_dir: str) -> List[str]:
"""Returns browser-specific command line arguments.
@@ -148,6 +146,7 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
Returns:
List of command-line arguments for the browser
"""
browser_args = super()._build_browser_args()
browser_path = await get_browser_executable(self.config.browser_type)
base_args = [browser_path]
@@ -170,7 +169,7 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
else:
raise NotImplementedError(f"Browser type {self.config.browser_type} not supported")
return base_args + args
return base_args + browser_args + args
async def _cleanup_process(self):
"""Cleanup browser process and temporary directory."""
@@ -179,33 +178,26 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
if self.browser_process:
try:
# Only terminate if we have proper control over the process
if not self.browser_process.poll():
# Process is still running
self.browser_process.terminate()
# Wait for process to end gracefully
for _ in range(10): # 10 attempts, 100ms each
if self.browser_process.poll() is not None:
break
await asyncio.sleep(0.1)
# Force kill if still running
if self.browser_process.poll() is None:
if is_windows():
# On Windows we might need taskkill for detached processes
try:
subprocess.run(["taskkill", "/F", "/PID", str(self.browser_process.pid)])
except Exception:
self.browser_process.kill()
else:
self.browser_process.kill()
await asyncio.sleep(0.1) # Brief wait for kill to take effect
# Only attempt termination if the process is still running
if self.browser_process.poll() is None:
# Use our robust cross-platform termination utility
success = terminate_process(
pid=self.browser_process.pid,
timeout=1.0, # Equivalent to the previous 10*0.1s wait
logger=self.logger
)
if not success and self.logger:
self.logger.warning(
message="Failed to terminate browser process cleanly",
tag="PROCESS"
)
except Exception as e:
if self.logger:
self.logger.error(
message="Error terminating browser: {error}",
tag="ERROR",
message="Error during browser process cleanup: {error}",
tag="ERROR",
params={"error": str(e)},
)
@@ -220,54 +212,6 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
params={"error": str(e)},
)
async def create_browser_context(self, crawlerRunConfig: Optional[CrawlerRunConfig] = None) -> BrowserContext:
"""Create a new browser context.
Uses the base class implementation which handles all configurations.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
BrowserContext: Browser context object
"""
# Handle user_data_dir for CDP browsers
if self.config.user_data_dir:
# For CDP-based browsers, storage persistence is typically handled by the user_data_dir
# at the browser level, but we'll create a storage_state location for Playwright as well
storage_path = os.path.join(self.config.user_data_dir, "storage_state.json")
if not os.path.exists(storage_path):
# Create parent directory if it doesn't exist
os.makedirs(os.path.dirname(storage_path), exist_ok=True)
with open(storage_path, "w") as f:
json.dump({}, f)
self.config.storage_state = storage_path
# Use the base class implementation
return await super().create_browser_context(crawlerRunConfig)
def _cleanup_expired_sessions(self):
"""Clean up expired sessions based on TTL."""
current_time = time.time()
expired_sessions = [
sid
for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
asyncio.create_task(self._kill_session(sid))
async def _kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
Args:
session_id: The session ID to kill
"""
if session_id in self.sessions:
context, page, _ = self.sessions[session_id]
await page.close()
del self.sessions[session_id]
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
@@ -277,6 +221,7 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
Returns:
Tuple of (Page, BrowserContext)
"""
# Clean up expired sessions using base class method
self._cleanup_expired_sessions()
# If a session_id is provided and we already have it, reuse that page + context
@@ -289,71 +234,56 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
# For CDP, we typically use the shared default_context
context = self.default_context
pages = context.pages
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)
self.contexts_by_config[config_signature] = context
await self.setup_context(context, crawlerRunConfig)
# Check if there's already a page with the target URL
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
# If not found, create a new page
if not page:
page = await context.new_page()
# If a session_id is specified, store this session so we can reuse later
# If a session_id is specified, store this session for reuse
if crawlerRunConfig.session_id:
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
async def close(self):
"""Close the browser and clean up resources."""
"""Close the CDP browser and clean up resources."""
# Skip cleanup if using external CDP URL and not launched by us
if self.config.cdp_url and not self.browser_process:
if self.logger:
self.logger.debug("Skipping cleanup for external CDP browser", tag="CDP")
return
if self.config.sleep_on_close:
await asyncio.sleep(0.5)
# Call parent implementation for common cleanup
await super().close()
# If we have a user_data_dir configured, ensure persistence of storage state
if self.config.user_data_dir and self.browser and self.default_context:
for context in self.browser.contexts:
try:
await context.storage_state(path=os.path.join(self.config.user_data_dir, "Default", "storage_state.json"))
if self.logger:
self.logger.debug("Ensuring storage state is persisted before closing browser", tag="BROWSER")
except Exception as e:
if self.logger:
self.logger.warning(
message="Failed to ensure storage persistence: {error}",
tag="BROWSER",
params={"error": str(e)}
)
# Close all sessions
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self._kill_session(session_id)
# Close browser
if self.browser:
await self.browser.close()
self.browser = None
# Clean up managed browser if we created it
# Additional CDP-specific cleanup
if self.browser_process:
await asyncio.sleep(0.5)
await self._cleanup_process()
self.browser_process = None
if self.logger:
self.logger.debug("Cleaned up CDP browser process", tag="CDP")
# Close temporary directory
# Clean up temporary directory
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
self.temp_dir = None
if self.logger:
self.logger.debug("Removed temporary directory", tag="CDP")
except Exception as e:
if self.logger:
self.logger.error(
message="Error removing temporary directory: {error}",
tag="ERROR",
params={"error": str(e)},
)
# Stop playwright
if self.playwright:
await self.playwright.stop()
self.playwright = None
tag="CDP",
params={"error": str(e)}
)

View File

@@ -19,12 +19,12 @@ from .builtin import CDPBrowserStrategy
class DockerBrowserStrategy(CDPBrowserStrategy):
"""Docker-based browser strategy.
Extends the CDPBrowserStrategy to run browsers in Docker containers.
Supports two modes:
1. "connect" - Uses a Docker image with Chrome already running
2. "launch" - Starts Chrome within the container with custom settings
Attributes:
docker_config: Docker-specific configuration options
container_id: ID of current Docker container
@@ -36,16 +36,16 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
internal_cdp_port: Chrome's internal CDP port
internal_mapped_port: Port that socat maps to internally
"""
def __init__(self, config: BrowserConfig, logger: Optional[AsyncLogger] = None):
"""Initialize the Docker browser strategy.
Args:
config: Browser configuration including Docker-specific settings
logger: Logger for recording events and errors
"""
super().__init__(config, logger)
# Initialize Docker-specific attributes
self.docker_config = self.config.docker_config or DockerConfig()
self.container_id = None
@@ -56,10 +56,9 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
if registry_file is None and self.config.user_data_dir:
# Use the same registry file as BuiltinBrowserStrategy if possible
registry_file = os.path.join(
os.path.dirname(self.config.user_data_dir),
"browser_config.json"
os.path.dirname(self.config.user_data_dir), "browser_config.json"
)
self.registry = DockerRegistry(self.docker_config.registry_file)
self.docker_utils = DockerUtils(logger)
self.chrome_process_id = None
@@ -70,39 +69,44 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
async def start(self):
"""Start or connect to a browser running in a Docker container.
This method initializes Playwright and establishes a connection to
This method initializes Playwright and establishes a connection to
a browser running in a Docker container. Depending on the configured mode:
- "connect": Connects to a container with Chrome already running
- "launch": Creates a container and launches Chrome within it
Returns:
self: For method chaining
"""
# Initialize Playwright
from ..utils import get_playwright
self.playwright = await get_playwright()
if self.logger:
self.logger.info(
f"Starting Docker browser strategy in {self.docker_config.mode} mode",
tag="DOCKER"
tag="DOCKER",
)
try:
# Get CDP URL by creating or reusing a Docker container
# This handles the container management and browser startup
cdp_url = await self._get_or_create_cdp_url()
if not cdp_url:
raise Exception("Failed to establish CDP connection to Docker container")
raise Exception(
"Failed to establish CDP connection to Docker container"
)
if self.logger:
self.logger.info(f"Connecting to browser in Docker via CDP: {cdp_url}", tag="DOCKER")
self.logger.info(
f"Connecting to browser in Docker via CDP: {cdp_url}", tag="DOCKER"
)
# Connect to the browser using CDP
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
# Get existing context or create default context
contexts = self.browser.contexts
if contexts:
@@ -114,33 +118,35 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
self.logger.debug("Creating new browser context", tag="DOCKER")
self.default_context = await self.create_browser_context()
await self.setup_context(self.default_context)
return self
except Exception as e:
# Clean up resources if startup fails
if self.container_id and not self.docker_config.persistent:
if self.logger:
self.logger.warning(
f"Cleaning up container after failed start: {self.container_id[:12]}",
tag="DOCKER"
tag="DOCKER",
)
await self.docker_utils.remove_container(self.container_id)
self.registry.unregister_container(self.container_id)
self.container_id = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
# Re-raise the exception
if self.logger:
self.logger.error(f"Failed to start Docker browser: {str(e)}", tag="DOCKER")
raise
self.logger.error(
f"Failed to start Docker browser: {str(e)}", tag="DOCKER"
)
raise
async def _generate_config_hash(self) -> str:
"""Generate a hash of the configuration for container matching.
Returns:
Hash string uniquely identifying this configuration
"""
@@ -151,66 +157,77 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
"browser_type": self.config.browser_type,
"headless": self.config.headless,
}
# Add browser-specific config if in launch mode
if self.docker_config.mode == "launch":
config_dict.update({
"text_mode": self.config.text_mode,
"light_mode": self.config.light_mode,
"viewport_width": self.config.viewport_width,
"viewport_height": self.config.viewport_height,
})
config_dict.update(
{
"text_mode": self.config.text_mode,
"light_mode": self.config.light_mode,
"viewport_width": self.config.viewport_width,
"viewport_height": self.config.viewport_height,
}
)
# Use the utility method to generate the hash
return self.docker_utils.generate_config_hash(config_dict)
async def _get_or_create_cdp_url1(self) -> str:
"""Get CDP URL by either creating a new container or using an existing one.
Returns:
CDP URL for connecting to the browser
Raises:
Exception: If container creation or browser launch fails
"""
# If CDP URL is explicitly provided, use it
if self.config.cdp_url:
return self.config.cdp_url
# Ensure Docker image exists (will build if needed)
image_name = await self.docker_utils.ensure_docker_image_exists(
self.docker_config.image,
self.docker_config.mode
self.docker_config.image, self.docker_config.mode
)
# Generate config hash for container matching
config_hash = await self._generate_config_hash()
# Look for existing container with matching config
container_id = self.registry.find_container_by_config(config_hash, self.docker_utils)
container_id = self.registry.find_container_by_config(
config_hash, self.docker_utils
)
if container_id:
# Use existing container
self.container_id = container_id
host_port = self.registry.get_container_host_port(container_id)
if self.logger:
self.logger.info(f"Using existing Docker container: {container_id[:12]}", tag="DOCKER")
self.logger.info(
f"Using existing Docker container: {container_id[:12]}",
tag="DOCKER",
)
else:
# Get a port for the new container
host_port = self.docker_config.host_port or self.registry.get_next_available_port(self.docker_utils)
host_port = (
self.docker_config.host_port
or self.registry.get_next_available_port(self.docker_utils)
)
# Prepare volumes list
volumes = list(self.docker_config.volumes)
# Add user data directory if specified
if self.docker_config.user_data_dir:
# Ensure user data directory exists
os.makedirs(self.docker_config.user_data_dir, exist_ok=True)
volumes.append(f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}")
volumes.append(
f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}"
)
# Update config user_data_dir to point to container path
self.config.user_data_dir = self.docker_config.container_user_data_dir
# Create a new container
container_id = await self.docker_utils.create_container(
image_name=image_name,
@@ -219,54 +236,63 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
volumes=volumes,
network=self.docker_config.network,
env_vars=self.docker_config.env_vars,
extra_args=self.docker_config.extra_args
extra_args=self.docker_config.extra_args,
)
if not container_id:
raise Exception("Failed to create Docker container")
self.container_id = container_id
# Register the container
self.registry.register_container(container_id, host_port, config_hash)
# Wait for container to be ready
await self.docker_utils.wait_for_container_ready(container_id)
# Handle specific setup based on mode
if self.docker_config.mode == "launch":
# In launch mode, we need to start socat and Chrome
await self.docker_utils.start_socat_in_container(container_id)
# Build browser arguments
browser_args = self._build_browser_args()
# Launch Chrome
await self.docker_utils.launch_chrome_in_container(container_id, browser_args)
await self.docker_utils.launch_chrome_in_container(
container_id, browser_args
)
# Get PIDs for later cleanup
self.chrome_process_id = await self.docker_utils.get_process_id_in_container(
container_id, "chrome"
self.chrome_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "chrome"
)
)
self.socat_process_id = await self.docker_utils.get_process_id_in_container(
container_id, "socat"
self.socat_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "socat"
)
)
# Wait for CDP to be ready
await self.docker_utils.wait_for_cdp_ready(host_port)
if self.logger:
self.logger.success(f"Docker container ready: {container_id[:12]} on port {host_port}", tag="DOCKER")
self.logger.success(
f"Docker container ready: {container_id[:12]} on port {host_port}",
tag="DOCKER",
)
# Return CDP URL
return f"http://localhost:{host_port}"
async def _get_or_create_cdp_url(self) -> str:
"""Get CDP URL by either creating a new container or using an existing one.
Returns:
CDP URL for connecting to the browser
Raises:
Exception: If container creation or browser launch fails
"""
@@ -276,38 +302,47 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
# Ensure Docker image exists (will build if needed)
image_name = await self.docker_utils.ensure_docker_image_exists(
self.docker_config.image,
self.docker_config.mode
self.docker_config.image, self.docker_config.mode
)
# Generate config hash for container matching
config_hash = await self._generate_config_hash()
# Look for existing container with matching config
container_id = await self.registry.find_container_by_config(config_hash, self.docker_utils)
container_id = await self.registry.find_container_by_config(
config_hash, self.docker_utils
)
if container_id:
# Use existing container
self.container_id = container_id
host_port = self.registry.get_container_host_port(container_id)
if self.logger:
self.logger.info(f"Using existing Docker container: {container_id[:12]}", tag="DOCKER")
self.logger.info(
f"Using existing Docker container: {container_id[:12]}",
tag="DOCKER",
)
else:
# Get a port for the new container
host_port = self.docker_config.host_port or self.registry.get_next_available_port(self.docker_utils)
host_port = (
self.docker_config.host_port
or self.registry.get_next_available_port(self.docker_utils)
)
# Prepare volumes list
volumes = list(self.docker_config.volumes)
# Add user data directory if specified
if self.docker_config.user_data_dir:
# Ensure user data directory exists
os.makedirs(self.docker_config.user_data_dir, exist_ok=True)
volumes.append(f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}")
# Update config user_data_dir to point to container path
self.config.user_data_dir = self.docker_config.container_user_data_dir
volumes.append(
f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}"
)
# # Update config user_data_dir to point to container path
# self.config.user_data_dir = self.docker_config.container_user_data_dir
# Create a new container
container_id = await self.docker_utils.create_container(
image_name=image_name,
@@ -318,148 +353,196 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
env_vars=self.docker_config.env_vars,
cpu_limit=self.docker_config.cpu_limit,
memory_limit=self.docker_config.memory_limit,
extra_args=self.docker_config.extra_args
extra_args=self.docker_config.extra_args,
)
if not container_id:
raise Exception("Failed to create Docker container")
self.container_id = container_id
self.container_id = container_id
# Wait for container to be ready
await self.docker_utils.wait_for_container_ready(container_id)
# Handle specific setup based on mode
if self.docker_config.mode == "launch":
# In launch mode, we need to start socat and Chrome
await self.docker_utils.start_socat_in_container(container_id)
# Build browser arguments
browser_args = self._build_browser_args()
# Launch Chrome
await self.docker_utils.launch_chrome_in_container(container_id, browser_args)
await self.docker_utils.launch_chrome_in_container(
container_id, browser_args
)
# Get PIDs for later cleanup
self.chrome_process_id = await self.docker_utils.get_process_id_in_container(
container_id, "chromium"
self.chrome_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "chromium"
)
)
self.socat_process_id = await self.docker_utils.get_process_id_in_container(
container_id, "socat"
self.socat_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "socat"
)
)
# Wait for CDP to be ready
cdp_json_config = await self.docker_utils.wait_for_cdp_ready(host_port)
if cdp_json_config:
# Register the container in the shared registry
self.registry.register_container(container_id, host_port, config_hash, cdp_json_config)
self.registry.register_container(
container_id, host_port, config_hash, cdp_json_config
)
else:
raise Exception("Failed to get CDP JSON config from Docker container")
if self.logger:
self.logger.success(f"Docker container ready: {container_id[:12]} on port {host_port}", tag="DOCKER")
self.logger.success(
f"Docker container ready: {container_id[:12]} on port {host_port}",
tag="DOCKER",
)
# Return CDP URL
return f"http://localhost:{host_port}"
def _build_browser_args(self) -> List[str]:
"""Build Chrome command line arguments based on BrowserConfig.
Returns:
List of command line arguments for Chrome
"""
args = [
"--no-sandbox",
"--disable-gpu",
# Call parent method to get common arguments
browser_args = super()._build_browser_args()
return browser_args["args"] + [
f"--remote-debugging-port={self.internal_cdp_port}",
"--remote-debugging-address=0.0.0.0", # Allow external connections
"--disable-dev-shm-usage",
"--headless=new",
]
if self.config.headless:
args.append("--headless=new")
if self.config.viewport_width and self.config.viewport_height:
args.append(f"--window-size={self.config.viewport_width},{self.config.viewport_height}")
if self.config.user_agent:
args.append(f"--user-agent={self.config.user_agent}")
if self.config.text_mode:
args.extend([
"--blink-settings=imagesEnabled=false",
"--disable-remote-fonts",
"--disable-images",
"--disable-javascript",
])
if self.config.light_mode:
# Import here to avoid circular import
from ..utils import get_browser_disable_options
args.extend(get_browser_disable_options())
if self.config.user_data_dir:
args.append(f"--user-data-dir={self.config.user_data_dir}")
if self.config.extra_args:
args.extend(self.config.extra_args)
return args
# args = [
# "--no-sandbox",
# "--disable-gpu",
# f"--remote-debugging-port={self.internal_cdp_port}",
# "--remote-debugging-address=0.0.0.0", # Allow external connections
# "--disable-dev-shm-usage",
# ]
# if self.config.headless:
# args.append("--headless=new")
# if self.config.viewport_width and self.config.viewport_height:
# args.append(f"--window-size={self.config.viewport_width},{self.config.viewport_height}")
# if self.config.user_agent:
# args.append(f"--user-agent={self.config.user_agent}")
# if self.config.text_mode:
# args.extend([
# "--blink-settings=imagesEnabled=false",
# "--disable-remote-fonts",
# "--disable-images",
# "--disable-javascript",
# ])
# if self.config.light_mode:
# # Import here to avoid circular import
# from ..utils import get_browser_disable_options
# args.extend(get_browser_disable_options())
# if self.config.user_data_dir:
# args.append(f"--user-data-dir={self.config.user_data_dir}")
# if self.config.extra_args:
# args.extend(self.config.extra_args)
# return args
async def close(self):
"""Close the browser and clean up Docker container if needed."""
# Set shutting_down flag to prevent race conditions
self.shutting_down = True
# Store state if needed before closing
if self.browser and self.docker_config.user_data_dir and self.docker_config.persistent:
# Set flag to track if we were the ones initiating shutdown
initiated_shutdown = not getattr(self, "shutting_down", False)
# Storage persistence for Docker needs special handling
# We need to store state before calling super().close() which will close the browser
if (
self.browser
and self.docker_config.user_data_dir
and self.docker_config.persistent
):
for context in self.browser.contexts:
try:
storage_path = os.path.join(self.docker_config.user_data_dir, "storage_state.json")
# Ensure directory exists
os.makedirs(self.docker_config.user_data_dir, exist_ok=True)
# Save storage state to user data directory
storage_path = os.path.join(
self.docker_config.user_data_dir, "storage_state.json"
)
await context.storage_state(path=storage_path)
if self.logger:
self.logger.debug("Persisted storage state before closing browser", tag="DOCKER")
self.logger.debug(
"Persisted Docker-specific storage state", tag="DOCKER"
)
except Exception as e:
if self.logger:
self.logger.warning(
message="Failed to persist storage state: {error}",
message="Failed to persist Docker storage state: {error}",
tag="DOCKER",
params={"error": str(e)}
params={"error": str(e)},
)
# Close browser connection (but not container)
if self.browser:
await self.browser.close()
self.browser = None
# Only clean up container if not persistent
if self.container_id and not self.docker_config.persistent:
# Stop Chrome process in "launch" mode
if self.docker_config.mode == "launch" and self.chrome_process_id:
await self.docker_utils.stop_process_in_container(
self.container_id, self.chrome_process_id
)
# Stop socat process in "launch" mode
if self.docker_config.mode == "launch" and self.socat_process_id:
await self.docker_utils.stop_process_in_container(
self.container_id, self.socat_process_id
)
# Remove or stop container based on configuration
if self.docker_config.remove_on_exit:
await self.docker_utils.remove_container(self.container_id)
# Unregister from registry
self.registry.unregister_container(self.container_id)
else:
await self.docker_utils.stop_container(self.container_id)
self.container_id = None
# Close Playwright
if self.playwright:
await self.playwright.stop()
self.playwright = None
self.shutting_down = False
# Call parent method to handle common cleanup
await super().close()
# Only perform container cleanup if we initiated shutdown
# and we need to handle Docker-specific resources
if initiated_shutdown:
# Only clean up container if not persistent
if self.container_id and not self.docker_config.persistent:
# Stop Chrome process in "launch" mode
if self.docker_config.mode == "launch" and self.chrome_process_id:
await self.docker_utils.stop_process_in_container(
self.container_id, self.chrome_process_id
)
if self.logger:
self.logger.debug(
f"Stopped Chrome process {self.chrome_process_id} in container",
tag="DOCKER",
)
# Stop socat process in "launch" mode
if self.docker_config.mode == "launch" and self.socat_process_id:
await self.docker_utils.stop_process_in_container(
self.container_id, self.socat_process_id
)
if self.logger:
self.logger.debug(
f"Stopped socat process {self.socat_process_id} in container",
tag="DOCKER",
)
# Remove or stop container based on configuration
if self.docker_config.remove_on_exit:
await self.docker_utils.remove_container(self.container_id)
# Unregister from registry
if hasattr(self, "registry") and self.registry:
self.registry.unregister_container(self.container_id)
if self.logger:
self.logger.debug(
f"Removed Docker container {self.container_id}",
tag="DOCKER",
)
else:
await self.docker_utils.stop_container(self.container_id)
if self.logger:
self.logger.debug(
f"Stopped Docker container {self.container_id}",
tag="DOCKER",
)
self.container_id = None

View File

@@ -4,17 +4,13 @@ This module implements the browser strategy pattern for different
browser implementations, including Playwright, CDP, and builtin browsers.
"""
import asyncio
import os
import time
import json
from typing import Optional, Tuple
from playwright.async_api import BrowserContext, Page, ProxySettings
from playwright.async_api import BrowserContext, Page
from ...async_logger import AsyncLogger
from ...async_configs import BrowserConfig, CrawlerRunConfig
from ..utils import get_playwright, get_browser_disable_options
from playwright_stealth import StealthConfig
@@ -50,9 +46,7 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
logger: Logger for recording events and errors
"""
super().__init__(config, logger)
# Add session management
self.sessions = {}
self.session_ttl = 1800 # 30 minutes
# No need to re-initialize sessions and session_ttl as they're now in the base class
async def start(self):
"""Start the browser instance.
@@ -60,142 +54,32 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
Returns:
self: For method chaining
"""
self.playwright = await get_playwright()
# Call the base class start to initialize Playwright
await super().start()
# Build browser arguments using the base class method
browser_args = self._build_browser_args()
# Launch appropriate browser type
if self.config.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.config.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
self.default_context = self.browser
return self
def _build_browser_args(self) -> dict:
"""Build browser launch arguments from config.
Returns:
dict: Browser launch arguments
"""
args = [
"--disable-gpu",
"--disable-gpu-compositing",
"--disable-software-rasterizer",
"--no-sandbox",
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
"--disable-blink-features=AutomationControlled",
"--window-position=400,0",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection",
"--force-color-profile=srgb",
"--mute-audio",
"--disable-background-timer-throttling",
f"--window-size={self.config.viewport_width},{self.config.viewport_height}",
]
if self.config.light_mode:
args.extend(get_browser_disable_options())
if self.config.text_mode:
args.extend(
[
"--blink-settings=imagesEnabled=false",
"--disable-remote-fonts",
"--disable-images",
"--disable-javascript",
"--disable-software-rasterizer",
"--disable-dev-shm-usage",
]
)
if self.config.extra_args:
args.extend(self.config.extra_args)
browser_args = {"headless": self.config.headless, "args": args}
if self.config.chrome_channel:
browser_args["channel"] = self.config.chrome_channel
if self.config.accept_downloads:
browser_args["downloads_path"] = self.config.downloads_path or os.path.join(
os.getcwd(), "downloads"
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.config.proxy or self.config.proxy_config:
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
)
browser_args["proxy"] = proxy_settings
return browser_args
async def create_browser_context(self, crawlerRunConfig: Optional[CrawlerRunConfig] = None) -> BrowserContext:
"""Creates and returns a new browser context with configured settings.
This implementation extends the base class version to handle user_data_dir specifically.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
BrowserContext: Browser context object with the specified configurations
"""
# Handle user_data_dir explicitly to ensure storage persistence
if self.config.user_data_dir:
# Create a storage state file path if none exists
storage_path = os.path.join(self.config.user_data_dir, "Default", "storage_state.json")
# Create the file if it doesn't exist
if not os.path.exists(storage_path):
os.makedirs(os.path.dirname(storage_path), exist_ok=True)
with open(storage_path, "w") as f:
json.dump({}, f)
# Override storage_state with our specific path
self.config.storage_state = storage_path
if self.logger:
self.logger.debug(f"Using persistent storage state at: {storage_path}", tag="BROWSER")
try:
# Launch appropriate browser type
if self.config.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.config.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
# Now call the base class implementation which handles everything else
return await super().create_browser_context(crawlerRunConfig)
def _cleanup_expired_sessions(self):
"""Clean up expired sessions based on TTL."""
current_time = time.time()
expired_sessions = [
sid
for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
asyncio.create_task(self._kill_session(sid))
async def _kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
Args:
session_id: The session ID to kill
"""
if session_id in self.sessions:
context, page, _ = self.sessions[session_id]
await page.close()
del self.sessions[session_id]
self.default_context = self.browser
if self.logger:
self.logger.debug(f"Launched {self.config.browser_type} browser", tag="BROWSER")
except Exception as e:
if self.logger:
self.logger.error(f"Failed to launch browser: {str(e)}", tag="BROWSER")
raise
return self
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
@@ -236,49 +120,9 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
async def close(self):
"""Close the browser and clean up resources."""
if self.config.sleep_on_close:
await asyncio.sleep(0.5)
# If we have a user_data_dir configured, ensure persistence of storage state
if self.config.user_data_dir and self.browser and self.default_context:
for context in self.browser.contexts:
try:
await context.storage_state(path=os.path.join(self.config.user_data_dir, "Default", "storage_state.json"))
if self.logger:
self.logger.debug("Ensuring storage state is persisted before closing browser", tag="BROWSER")
except Exception as e:
if self.logger:
self.logger.warning(
message="Failed to ensure storage persistence: {error}",
tag="BROWSER",
params={"error": str(e)}
)
# Close all sessions
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self._kill_session(session_id)
# Close all contexts we created
for ctx in self.contexts_by_config.values():
try:
await ctx.close()
except Exception as e:
if self.logger:
self.logger.error(
message="Error closing context: {error}",
tag="ERROR",
params={"error": str(e)}
)
self.contexts_by_config.clear()
if self.browser:
await self.browser.close()
self.browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
async def close(self):
"""Close the Playwright browser and clean up resources."""
# The base implementation already handles everything needed for Playwright
# including storage persistence, sessions, contexts, browser and playwright
await super().close()

View File

@@ -11,7 +11,9 @@ import sys
import time
import tempfile
import subprocess
from typing import Optional
from typing import Optional, Tuple, Union
import signal
import psutil
from playwright.async_api import async_playwright
@@ -93,6 +95,8 @@ def is_browser_running(pid: Optional[int]) -> bool:
return False
try:
if type(pid) is str:
pid = int(pid)
# Check if the process exists
if is_windows():
process = subprocess.run(["tasklist", "/FI", f"PID eq {pid}"],
@@ -326,3 +330,136 @@ async def find_optimal_browser_config(total_urls=50, verbose=True, rate_limit_de
"optimal": optimal,
"all_configs": results
}
# Find process ID of the existing browser using os
def find_process_by_port(port: int) -> str:
"""Find process ID listening on a specific port.
Args:
port: Port number to check
Returns:
str: Process ID or empty string if not found
"""
try:
if is_windows():
cmd = f"netstat -ano | findstr :{port}"
result = subprocess.check_output(cmd, shell=True).decode()
return result.strip().split()[-1] if result else ""
else:
cmd = f"lsof -i :{port} -t"
return subprocess.check_output(cmd, shell=True).decode().strip()
except subprocess.CalledProcessError:
return ""
async def check_process_is_running(process: subprocess.Popen, delay: float = 0.5) -> Tuple[bool, Optional[int], bytes, bytes]:
"""Perform a quick check to make sure the browser started successfully."""
if not process:
return False, None, b"", b""
# Check that process started without immediate termination
await asyncio.sleep(delay)
if process.poll() is not None:
# Process already terminated
stdout, stderr = b"", b""
try:
stdout, stderr = process.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
pass
return False, process.returncode, stdout, stderr
return True, 0, b"", b""
def terminate_process(
pid: Union[int, str],
timeout: float = 5.0,
force_kill_timeout: float = 3.0,
logger = None
) -> Tuple[bool, Optional[str]]:
"""
Robustly terminate a process across platforms with verification.
Args:
pid: Process ID to terminate (int or string)
timeout: Seconds to wait for graceful termination before force killing
force_kill_timeout: Seconds to wait after force kill before considering it failed
logger: Optional logger object with error, warning, and info methods
Returns:
Tuple of (success: bool, error_message: Optional[str])
"""
# Convert pid to int if it's a string
if isinstance(pid, str):
try:
pid = int(pid)
except ValueError:
error_msg = f"Invalid PID format: {pid}"
if logger:
logger.error(error_msg)
return False, error_msg
# Check if process exists
if not psutil.pid_exists(pid):
return True, None # Process already terminated
try:
process = psutil.Process(pid)
# First attempt: graceful termination
if logger:
logger.info(f"Attempting graceful termination of process {pid}")
if os.name == 'nt': # Windows
subprocess.run(["taskkill", "/PID", str(pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False)
else: # Unix/Linux/MacOS
process.send_signal(signal.SIGTERM)
# Wait for process to terminate
try:
process.wait(timeout=timeout)
if logger:
logger.info(f"Process {pid} terminated gracefully")
return True, None
except psutil.TimeoutExpired:
if logger:
logger.warning(f"Process {pid} did not terminate gracefully within {timeout} seconds, forcing termination")
# Second attempt: force kill
if os.name == 'nt': # Windows
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False)
else: # Unix/Linux/MacOS
process.send_signal(signal.SIGKILL)
# Verify process is killed
gone, alive = psutil.wait_procs([process], timeout=force_kill_timeout)
if process in alive:
error_msg = f"Failed to kill process {pid} even after force kill"
if logger:
logger.error(error_msg)
return False, error_msg
if logger:
logger.info(f"Process {pid} terminated by force")
return True, None
except psutil.NoSuchProcess:
# Process terminated while we were working with it
if logger:
logger.info(f"Process {pid} already terminated")
return True, None
except Exception as e:
error_msg = f"Error terminating process {pid}: {str(e)}"
if logger:
logger.error(error_msg)
return False, error_msg

View File

@@ -614,9 +614,9 @@ async def run_tests():
# return
# Run browser tests
# results.append(await test_docker_connect_mode())
# results.append(await test_docker_launch_mode())
# results.append(await test_docker_persistent_storage())
results.append(await test_docker_connect_mode())
results.append(await test_docker_launch_mode())
results.append(await test_docker_persistent_storage())
results.append(await test_docker_parallel_pages())
results.append(await test_docker_registry_reuse())

View File

@@ -77,7 +77,7 @@ async def test_builtin_browser_creation():
# Step 4: Get browser info from the strategy
print(f"\n{INFO}4. Getting browser information{RESET}")
browser_info = manager._strategy.get_builtin_browser_info()
browser_info = manager._strategy.get_browser_info()
if browser_info:
print(f"{SUCCESS}Browser info retrieved:{RESET}")
for key, value in browser_info.items():
@@ -205,7 +205,7 @@ async def test_multiple_managers():
# Step 1: Create first manager
print(f"\n{INFO}1. Creating first browser manager{RESET}")
browser_config1 = (BrowserConfig(browser_mode="builtin", headless=True),)
browser_config1 = BrowserConfig(browser_mode="builtin", headless=True)
manager1 = BrowserManager(browser_config=browser_config1, logger=logger)
# Step 2: Create second manager
@@ -781,15 +781,16 @@ async def main():
# await manager.close()
# Run multiple managers test
# await test_multiple_managers()
await test_multiple_managers()
# Run performance scaling test
await test_performance_scaling()
# Run cleanup test
# await cleanup_browsers()
await cleanup_browsers()
# Run edge cases test
# await test_edge_cases()
await test_edge_cases()
print(f"\n{SUCCESS}All tests completed!{RESET}")

View File

@@ -25,6 +25,7 @@ async def test_cdp_launch_connect():
browser_config = BrowserConfig(
use_managed_browser=True,
browser_mode="cdp",
headless=True
)
@@ -70,8 +71,8 @@ async def test_cdp_with_user_data_dir():
logger.info(f"Created temporary user data directory: {user_data_dir}", tag="TEST")
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
browser_mode="cdp",
user_data_dir=user_data_dir
)
@@ -210,7 +211,7 @@ async def run_tests():
results = []
# results.append(await test_cdp_launch_connect())
# results.append(await test_cdp_with_user_data_dir())
results.append(await test_cdp_with_user_data_dir())
results.append(await test_cdp_session_management())
# Print summary

View File

@@ -6,6 +6,7 @@ and serve as functional tests.
import asyncio
import os
import re
import sys
# Add the project root to Python path if running directly
@@ -19,6 +20,53 @@ from crawl4ai.async_logger import AsyncLogger
# Create a logger for clear terminal output
logger = AsyncLogger(verbose=True, log_file=None)
async def test_start_close():
# Create browser config for standard Playwright
browser_config = BrowserConfig(
headless=True,
viewport_width=1280,
viewport_height=800
)
# Create browser manager with the config
manager = BrowserManager(browser_config=browser_config, logger=logger)
try:
for _ in range(4):
# Start the browser
await manager.start()
logger.info("Browser started successfully", tag="TEST")
# Get a page
page, context = await manager.get_page(CrawlerRunConfig())
logger.info("Got page successfully", tag="TEST")
# Navigate to a website
await page.goto("https://example.com")
logger.info("Navigated to example.com", tag="TEST")
# Get page title
title = await page.title()
logger.info(f"Page title: {title}", tag="TEST")
# Clean up
await manager.close()
logger.info("Browser closed successfully", tag="TEST")
await asyncio.sleep(1) # Wait for a moment before restarting
except Exception as e:
logger.error(f"Test failed: {str(e)}", tag="TEST")
# Ensure cleanup
try:
await manager.close()
except:
pass
return False
return True
async def test_playwright_basic():
"""Test basic Playwright browser functionality."""
logger.info("Testing standard Playwright browser", tag="TEST")
@@ -248,9 +296,10 @@ async def run_tests():
"""Run all tests sequentially."""
results = []
results.append(await test_playwright_basic())
results.append(await test_playwright_text_mode())
results.append(await test_playwright_context_reuse())
# results.append(await test_start_close())
# results.append(await test_playwright_basic())
# results.append(await test_playwright_text_mode())
# results.append(await test_playwright_context_reuse())
results.append(await test_playwright_session_management())
# Print summary