Compare commits

..

1 Commits

Author SHA1 Message Date
AHMET YILMAZ
c003cb6e4f fix #1563 (cdp): resolve page leaks and race conditions in concurrent crawling
Fix memory leaks and race conditions when using arun_many() with managed CDP browsers. Each crawl now gets proper page isolation with automatic cleanup while maintaining shared browser context.

Key fixes:
- Close non-session pages after crawling to prevent tab accumulation
- Add thread-safe page creation with locks to avoid concurrent access
- Improve page lifecycle management for managed vs non-managed browsers
- Keep session pages alive for authentication persistence
- Prevent TOCTOU (time-of-check-time-of-use) race conditions

This ensures stable parallel crawling without memory growth or browser instability.
2025-11-07 15:42:37 +08:00
8 changed files with 733 additions and 789 deletions

View File

@@ -1,214 +0,0 @@
# CDP Browser Concurrency Fixes and Improvements
## Overview
This document describes the changes made to fix concurrency issues with CDP (Chrome DevTools Protocol) browsers when using `arun_many` and improve overall browser management.
## Problems Addressed
1. **Race Conditions in Page Creation**: When using managed CDP browsers with concurrent `arun_many` calls, the code attempted to reuse existing pages from `context.pages`, leading to race conditions and "Target page/context closed" errors.
2. **Proxy Configuration Issues**: Proxy credentials were incorrectly embedded in the `--proxy-server` URL, which doesn't work properly with CDP browsers.
3. **Insufficient Startup Checks**: Browser process startup checks were minimal and didn't catch early failures effectively.
4. **Unclear Logging**: Logging messages lacked structure and context, making debugging difficult.
5. **Duplicate Browser Arguments**: Browser launch arguments could contain duplicates despite deduplication attempts.
## Solutions Implemented
### 1. Always Create New Pages in Managed Browser Mode
**File**: `crawl4ai/browser_manager.py` (lines 1106-1113)
**Change**: Modified `get_page()` method to always create new pages instead of attempting to reuse existing ones for managed browsers without `storage_state`.
**Before**:
```python
context = self.default_context
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
if pages:
page = pages[0]
else:
# Create new page only if none exist
async with self._page_lock:
page = await context.new_page()
```
**After**:
```python
context = self.default_context
# Always create new pages instead of reusing existing ones
# This prevents race conditions in concurrent scenarios (arun_many with CDP)
# Serialize page creation to avoid 'Target page/context closed' errors
async with self._page_lock:
page = await context.new_page()
await self._apply_stealth_to_page(page)
```
**Benefits**:
- Eliminates race conditions when multiple tasks call `arun_many` concurrently
- Each request gets a fresh, independent page
- Page lock serializes creation to prevent TOCTOU (Time-of-check to time-of-use) issues
### 2. Fixed Proxy Flag Formatting
**File**: `crawl4ai/browser_manager.py` (lines 103-109)
**Change**: Removed credentials from proxy URL as they should be handled via separate authentication mechanisms in CDP.
**Before**:
```python
elif config.proxy_config:
creds = ""
if config.proxy_config.username and config.proxy_config.password:
creds = f"{config.proxy_config.username}:{config.proxy_config.password}@"
flags.append(f"--proxy-server={creds}{config.proxy_config.server}")
```
**After**:
```python
elif config.proxy_config:
# Note: For CDP/managed browsers, proxy credentials should be handled
# via authentication, not in the URL. Only pass the server address.
flags.append(f"--proxy-server={config.proxy_config.server}")
```
### 3. Enhanced Startup Checks
**File**: `crawl4ai/browser_manager.py` (lines 298-336)
**Changes**:
- Multiple check intervals (0.1s, 0.2s, 0.3s) to catch early failures
- Capture and log stdout/stderr on failure (limited to 200 chars)
- Raise `RuntimeError` with detailed diagnostics on startup failure
- Log process PID on successful startup in verbose mode
**Benefits**:
- Catches browser crashes during startup
- Provides detailed diagnostic information for debugging
- Fails fast with clear error messages
### 4. Improved Logging
**File**: `crawl4ai/browser_manager.py` (lines 218-291)
**Changes**:
- Structured logging with proper parameter substitution
- Log browser type, port, and headless status at launch
- Format and log full command with proper shell escaping
- Better error messages with context
- Consistent use of logger with null checks
**Example**:
```python
if self.logger and self.browser_config.verbose:
self.logger.debug(
"Launching browser: {browser_type} | Port: {port} | Headless: {headless}",
tag="BROWSER",
params={
"browser_type": self.browser_type,
"port": self.debugging_port,
"headless": self.headless
}
)
```
### 5. Deduplicate Browser Launch Arguments
**File**: `crawl4ai/browser_manager.py` (lines 424-425)
**Change**: Added explicit deduplication after merging all flags.
```python
# merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config))
# Deduplicate flags - use dict.fromkeys to preserve order while removing duplicates
flags = list(dict.fromkeys(flags))
```
### 6. Import Refactoring
**Files**: `crawl4ai/browser_manager.py`, `crawl4ai/browser_profiler.py`, `tests/browser/test_cdp_concurrency.py`
**Changes**: Organized all imports according to PEP 8:
1. Standard library imports (alphabetized)
2. Third-party imports (alphabetized)
3. Local imports (alphabetized)
**Benefits**:
- Improved code readability
- Easier to spot missing or unused imports
- Consistent style across the codebase
## Testing
### New Test Suite
**File**: `tests/browser/test_cdp_concurrency.py`
Comprehensive test suite with 8 tests covering:
1. **Basic Concurrent arun_many**: Validates multiple URLs can be crawled concurrently
2. **Sequential arun_many Calls**: Ensures multiple sequential batches work correctly
3. **Stress Test**: Multiple concurrent `arun_many` calls to test page lock effectiveness
4. **Page Isolation**: Verifies pages are truly independent
5. **Different Configurations**: Tests with varying viewport sizes and configs
6. **Error Handling**: Ensures errors in one request don't affect others
7. **Large Batches**: Scalability test with 10+ URLs
8. **Smoke Test Script**: Standalone script for quick validation
### Running Tests
**With pytest** (if available):
```bash
cd /path/to/crawl4ai
pytest tests/browser/test_cdp_concurrency.py -v
```
**Standalone smoke test**:
```bash
cd /path/to/crawl4ai
python3 tests/browser/smoke_test_cdp.py
```
## Migration Guide
### For Users
No breaking changes. Existing code will continue to work, but with better reliability in concurrent scenarios.
### For Contributors
When working with managed browsers:
1. Always use the page lock when creating pages in shared contexts
2. Prefer creating new pages over reusing existing ones for concurrent operations
3. Use structured logging with parameter substitution
4. Follow PEP 8 import organization
## Performance Impact
- **Positive**: Eliminates race conditions and crashes in concurrent scenarios
- **Neutral**: Page creation overhead is negligible compared to page navigation
- **Consideration**: More pages may be created, but they are properly closed after use
## Backward Compatibility
All changes are backward compatible. Session-based page reuse still works as before when `session_id` is provided.
## Related Issues
- Fixes race conditions in concurrent `arun_many` calls with CDP browsers
- Addresses "Target page/context closed" errors
- Improves browser startup reliability
## Future Improvements
Consider:
1. Configurable page pooling with proper lifecycle management
2. More granular locks for different contexts
3. Metrics for page creation/reuse patterns
4. Connection pooling for CDP connections

View File

@@ -1047,14 +1047,28 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
raise e
finally:
# If no session_id is given we should close the page
# Clean up page after crawl completes
# For managed CDP browsers, close pages that are not part of a session to prevent memory leaks
all_contexts = page.context.browser.contexts
total_pages = sum(len(context.pages) for context in all_contexts)
total_pages = sum(len(context.pages) for context in all_contexts)
should_close_page = False
if config.session_id:
# Session pages are kept alive for reuse
pass
elif total_pages <= 1 and (self.browser_config.use_managed_browser or self.browser_config.headless):
elif self.browser_config.use_managed_browser:
# For managed browsers (CDP), close non-session pages to prevent tab accumulation
# This is especially important for arun_many() with multiple concurrent crawls
should_close_page = True
elif total_pages <= 1 and self.browser_config.headless:
# Keep the last page in headless mode to avoid closing the browser
pass
else:
# For non-managed browsers, close the page
should_close_page = True
if should_close_page:
# Detach listeners before closing to prevent potential errors during close
if config.capture_network_requests:
page.remove_listener("request", handle_request_capture)

View File

@@ -1,26 +1,21 @@
# Standard library imports
import asyncio
import hashlib
import time
from typing import List, Optional
import os
import shlex
import sys
import shutil
import tempfile
import psutil
import signal
import subprocess
import sys
import tempfile
import time
import warnings
from typing import List, Optional
# Third-party imports
import psutil
import shlex
from playwright.async_api import BrowserContext
# Local imports
from .async_configs import BrowserConfig, CrawlerRunConfig
from .config import DOWNLOAD_PAGE_TIMEOUT
import hashlib
from .js_snippet import load_js_script
from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from .utils import get_chromium_path
import warnings
BROWSER_DISABLE_OPTIONS = [
@@ -109,9 +104,10 @@ class ManagedBrowser:
if config.proxy:
flags.append(f"--proxy-server={config.proxy}")
elif config.proxy_config:
# Note: For CDP/managed browsers, proxy credentials should be handled
# via authentication, not in the URL. Only pass the server address.
flags.append(f"--proxy-server={config.proxy_config.server}")
creds = ""
if config.proxy_config.username and config.proxy_config.password:
creds = f"{config.proxy_config.username}:{config.proxy_config.password}@"
flags.append(f"--proxy-server={creds}{config.proxy_config.server}")
# dedupe
return list(dict.fromkeys(flags))
@@ -223,27 +219,11 @@ class ManagedBrowser:
os.remove(fp)
except Exception as _e:
# non-fatal — we'll try to start anyway, but log what happened
if self.logger:
self.logger.warning(
"Pre-launch cleanup failed: {error} | Will attempt to start browser anyway",
tag="BROWSER",
params={"error": str(_e)}
)
self.logger.warning(f"pre-launch cleanup failed: {_e}", tag="BROWSER")
# Start browser process
try:
# Log browser launch intent
if self.logger and self.browser_config.verbose:
self.logger.debug(
"Launching browser: {browser_type} | Port: {port} | Headless: {headless}",
tag="BROWSER",
params={
"browser_type": self.browser_type,
"port": self.debugging_port,
"headless": self.headless
}
)
# Use DETACHED_PROCESS flag on Windows to fully detach the process
# On Unix, we'll use preexec_fn=os.setpgrp to start the process in a new process group
if sys.platform == "win32":
@@ -261,36 +241,19 @@ class ManagedBrowser:
preexec_fn=os.setpgrp # Start in a new process group
)
# Log full command if verbose logging is enabled
# If verbose is True print args used to run the process
if self.logger and self.browser_config.verbose:
# Format args for better readability - escape and join
formatted_args = ' '.join(shlex.quote(str(arg)) for arg in args)
self.logger.debug(
"Browser launch command: {command}",
tag="BROWSER",
params={"command": formatted_args}
)
f"Starting browser with args: {' '.join(args)}",
tag="BROWSER"
)
# Perform startup health checks
await asyncio.sleep(0.5) # Initial delay for process startup
# We'll monitor for a short time to make sure it starts properly, but won't keep monitoring
await asyncio.sleep(0.5) # Give browser time to start
await self._initial_startup_check()
await asyncio.sleep(2) # Additional time for browser initialization
cdp_url = f"http://{self.host}:{self.debugging_port}"
if self.logger:
self.logger.info(
"Browser started successfully | CDP URL: {cdp_url}",
tag="BROWSER",
params={"cdp_url": cdp_url}
)
return cdp_url
await asyncio.sleep(2) # Give browser time to start
return f"http://{self.host}:{self.debugging_port}"
except Exception as e:
if self.logger:
self.logger.error(
"Failed to start browser: {error}",
tag="BROWSER",
params={"error": str(e)}
)
await self.cleanup()
raise Exception(f"Failed to start browser: {e}")
@@ -303,41 +266,23 @@ class ManagedBrowser:
return
# Check that process started without immediate termination
# Perform multiple checks with increasing delays to catch early failures
check_intervals = [0.1, 0.2, 0.3] # Total 0.6s
for delay in check_intervals:
await asyncio.sleep(delay)
if self.browser_process.poll() is not None:
# Process already terminated - capture output for debugging
stdout, stderr = b"", b""
try:
stdout, stderr = self.browser_process.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
pass
error_msg = "Browser process terminated during startup"
if stderr:
error_msg += f" | STDERR: {stderr.decode()[:200]}" # Limit output length
if stdout:
error_msg += f" | STDOUT: {stdout.decode()[:200]}"
self.logger.error(
message="{error_msg} | Exit code: {code}",
tag="BROWSER",
params={
"error_msg": error_msg,
"code": self.browser_process.returncode,
},
)
raise RuntimeError(f"Browser failed to start: {error_msg}")
await asyncio.sleep(0.5)
if self.browser_process.poll() is not None:
# Process already terminated
stdout, stderr = b"", b""
try:
stdout, stderr = self.browser_process.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
pass
# Process is still running after checks - log success
if self.logger and self.browser_config.verbose:
self.logger.debug(
"Browser process startup check passed | PID: {pid}",
tag="BROWSER",
params={"pid": self.browser_process.pid}
self.logger.error(
message="Browser process terminated during startup | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": self.browser_process.returncode,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
},
)
async def _monitor_browser_process(self):
@@ -426,8 +371,6 @@ class ManagedBrowser:
flags.append("--headless=new")
# merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config))
# Deduplicate flags - use dict.fromkeys to preserve order while removing duplicates
flags = list(dict.fromkeys(flags))
elif self.browser_type == "firefox":
flags = [
"--remote-debugging-port",
@@ -1092,25 +1035,20 @@ class BrowserManager:
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
# If using a managed browser, just grab the shared default_context
# If using a managed browser, reuse the default context and create new pages
if self.config.use_managed_browser:
context = self.default_context
if self.config.storage_state:
context = await self.create_browser_context(crawlerRunConfig)
ctx = self.default_context # default context, one window only
# Clone runtime state from storage to the shared context
ctx = self.default_context
ctx = await clone_runtime_state(context, ctx, crawlerRunConfig, self.config)
# Avoid concurrent new_page on shared persistent context
# See GH-1198: context.pages can be empty under races
async with self._page_lock:
page = await ctx.new_page()
await self._apply_stealth_to_page(page)
else:
context = self.default_context
# Always create new pages instead of reusing existing ones
# This prevents race conditions in concurrent scenarios (arun_many with CDP)
# Serialize page creation to avoid 'Target page/context closed' errors
async with self._page_lock:
page = await context.new_page()
await self._apply_stealth_to_page(page)
# Always create a new page for concurrent safety
# The page-level isolation prevents race conditions while sharing the same context
async with self._page_lock:
page = await context.new_page()
await self._apply_stealth_to_page(page)
else:
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)

View File

@@ -5,26 +5,22 @@ This module provides a dedicated class for managing browser profiles
that can be used for identity-based crawling with Crawl4AI.
"""
# Standard library imports
import asyncio
import datetime
import json
import os
import shutil
import asyncio
import signal
import subprocess
import sys
import time
import datetime
import uuid
from typing import Any, Dict, List, Optional
# Third-party imports
import shutil
import json
import subprocess
import time
from typing import List, Dict, Optional, Any
from rich.console import Console
# Local imports
from .async_configs import BrowserConfig
from .async_logger import AsyncLogger, AsyncLoggerBase, LogColor
from .browser_manager import ManagedBrowser
from .async_logger import AsyncLogger, AsyncLoggerBase, LogColor
from .utils import get_home_folder

View File

@@ -0,0 +1,594 @@
# CDP Browser Crawling
> **New in v0.7.6**: Efficient concurrent crawling with managed CDP (Chrome DevTools Protocol) browsers. Connect to a running browser instance and perform multiple crawls without spawning new windows.
## 1. Overview
When working with CDP browsers, you can connect to an existing browser instance instead of launching a new one for each crawl. This is particularly useful for:
- **Development**: Keep your browser open with DevTools for debugging
- **Persistent Sessions**: Maintain authentication across multiple crawls
- **Resource Efficiency**: Reuse a single browser instance for multiple operations
- **Concurrent Crawling**: Run multiple crawls simultaneously with proper isolation
**Key Benefits:**
- ✅ Single browser window with multiple tabs (no window clutter)
- ✅ Shared state (cookies, localStorage) across crawls
- ✅ Concurrent safety with automatic page isolation
- ✅ Automatic cleanup to prevent memory leaks
- ✅ Works seamlessly with `arun_many()` for parallel crawling
---
## 2. Quick Start
### 2.1 Starting a CDP Browser
Use the Crawl4AI CLI to start a managed CDP browser:
```bash
# Start CDP browser on default port (9222)
crwl cdp
# Start on custom port
crwl cdp -d 9223
# Start in headless mode
crwl cdp --headless
```
The browser will stay running until you press 'q' or close the terminal.
### 2.2 Basic CDP Connection
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def main():
# Configure CDP connection
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222",
verbose=True
)
# Crawl a single URL
async with AsyncWebCrawler(config=browser_cfg) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig()
)
print(f"Success: {result.success}")
print(f"Content length: {len(result.markdown)}")
if __name__ == "__main__":
asyncio.run(main())
```
---
## 3. Concurrent Crawling with arun_many()
The real power of CDP crawling shines with `arun_many()`. The browser manager automatically handles:
- **Page Isolation**: Each crawl gets its own tab
- **Context Sharing**: All tabs share cookies and localStorage
- **Concurrent Safety**: Proper locking prevents race conditions
- **Auto Cleanup**: Tabs are closed after crawling (except sessions)
### 3.1 Basic Concurrent Crawling
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
async def crawl_multiple_urls():
# URLs to crawl
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
# Configure CDP browser
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222",
verbose=False
)
# Configure crawler (bypass cache for fresh data)
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS
)
# Crawl all URLs concurrently
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(
urls=urls,
config=crawler_cfg
)
# Process results
for result in results:
print(f"\nURL: {result.url}")
if result.success:
print(f"✓ Success | Content length: {len(result.markdown)}")
else:
print(f"✗ Failed: {result.error_message}")
if __name__ == "__main__":
asyncio.run(crawl_multiple_urls())
```
### 3.2 With Session Management
Use sessions to maintain authentication and state across individual crawls:
```python
async def crawl_with_sessions():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# First crawl: Login page
login_result = await crawler.arun(
url="https://example.com/login",
config=CrawlerRunConfig(
session_id="my-session", # Session persists
js_code="document.querySelector('#login').click();"
)
)
# Second crawl: Reuse authenticated session
dashboard_result = await crawler.arun(
url="https://example.com/dashboard",
config=CrawlerRunConfig(
session_id="my-session" # Same session, cookies preserved
)
)
```
---
## 4. How It Works
### 4.1 Browser Context Reuse
When using CDP browsers, Crawl4AI:
1. **Connects** to the existing browser via CDP URL
2. **Reuses** the default browser context (single window)
3. **Creates** new pages (tabs) for each crawl
4. **Locks** page creation to prevent concurrent races
5. **Cleans up** pages after crawling (unless it's a session)
```python
# Internal behavior (simplified)
if self.config.use_managed_browser:
context = self.default_context # Shared context
# Thread-safe page creation
async with self._page_lock:
page = await context.new_page() # New tab per crawl
# After crawl completes
if not config.session_id:
await page.close() # Auto cleanup
```
### 4.2 Page Lifecycle
```mermaid
graph TD
A[Start Crawl] --> B{Has session_id?}
B -->|Yes| C[Reuse existing page]
B -->|No| D[Create new page/tab]
D --> E[Navigate & Extract]
C --> E
E --> F{Is session?}
F -->|Yes| G[Keep page open]
F -->|No| H[Close page]
H --> I[End]
G --> I
```
### 4.3 State Sharing
All pages in the same context share:
- 🍪 **Cookies**: Authentication tokens, preferences
- 💾 **localStorage**: Client-side data storage
- 🔐 **sessionStorage**: Per-tab session data
- 🌐 **Network cache**: Shared HTTP cache
This makes it perfect for crawling authenticated sites or maintaining state across multiple pages.
---
## 5. Configuration Options
### 5.1 BrowserConfig for CDP
```python
browser_cfg = BrowserConfig(
browser_type="chromium", # Must be "chromium" for CDP
cdp_url="http://localhost:9222", # CDP endpoint URL
verbose=True, # Log browser operations
# Optional: Override headers for all requests
headers={
"Accept-Language": "en-US,en;q=0.9",
},
# Optional: Set user agent
user_agent="Mozilla/5.0 ...",
# Optional: Enable stealth mode (requires dedicated browser)
# enable_stealth=False, # Not compatible with CDP
)
```
### 5.2 CrawlerRunConfig Options
```python
crawler_cfg = CrawlerRunConfig(
# Session management
session_id="my-session", # Persist page across calls
# Caching
cache_mode=CacheMode.BYPASS, # Fresh data every time
# Browser location (affects timezone, locale)
locale="en-US",
timezone_id="America/New_York",
geolocation={
"latitude": 40.7128,
"longitude": -74.0060
},
# Proxy (per-crawl override)
proxy_config={
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass"
}
)
```
---
## 6. Advanced Patterns
### 6.1 Streaming Results
Process URLs as they complete instead of waiting for all:
```python
async def stream_crawl_results():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
urls = ["https://example.com" for _ in range(100)]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# Stream results as they complete
async for result in crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(stream=True)
):
if result.success:
print(f"{result.url}: {len(result.markdown)} chars")
# Process immediately instead of waiting for all
await save_to_database(result)
```
### 6.2 Custom Concurrency Control
```python
from crawl4ai import CrawlerRunConfig
# Limit concurrent crawls to 3
crawler_cfg = CrawlerRunConfig(
semaphore_count=3, # Max 3 concurrent requests
mean_delay=0.5, # Average 0.5s delay between requests
max_range=1.0, # +/- 1s random delay
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls, config=crawler_cfg)
```
### 6.3 Multi-Config Crawling
Different configurations for different URL groups:
```python
from crawl4ai import CrawlerRunConfig
# Fast crawl for static pages
fast_config = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=30000
)
# Slow crawl for dynamic pages
slow_config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=60000,
js_code="window.scrollTo(0, document.body.scrollHeight);"
)
configs = [fast_config, slow_config, fast_config]
urls = ["https://static.com", "https://dynamic.com", "https://static2.com"]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls, configs=configs)
```
---
## 7. Best Practices
### 7.1 Resource Management
**DO:**
```python
# Use context manager for automatic cleanup
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls)
# Browser connection closed automatically
```
**DON'T:**
```python
# Manual management risks resource leaks
crawler = AsyncWebCrawler(config=browser_cfg)
await crawler.start()
results = await crawler.arun_many(urls)
# Forgot to call crawler.close()!
```
### 7.2 Session Management
**DO:**
```python
# Use sessions for related crawls
config = CrawlerRunConfig(session_id="user-flow")
await crawler.arun(login_url, config=config)
await crawler.arun(dashboard_url, config=config)
await crawler.kill_session("user-flow") # Clean up when done
```
**DON'T:**
```python
# Creating new session IDs unnecessarily
for i in range(100):
config = CrawlerRunConfig(session_id=f"session-{i}")
await crawler.arun(url, config=config)
# 100 unclosed sessions accumulate!
```
### 7.3 Error Handling
```python
async def robust_crawl(urls):
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
try:
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls)
# Separate successes and failures
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
print(f"{len(successes)} succeeded")
print(f"{len(failures)} failed")
# Retry failures with different config
if failures:
retry_urls = [r.url for r in failures]
retry_config = CrawlerRunConfig(
page_timeout=120000, # Longer timeout
wait_until="networkidle"
)
retry_results = await crawler.arun_many(
retry_urls,
config=retry_config
)
return successes + retry_results
except Exception as e:
print(f"Fatal error: {e}")
return []
```
---
## 8. Troubleshooting
### 8.1 Connection Issues
**Problem**: `Cannot connect to CDP browser`
```python
# Check CDP browser is running
$ lsof -i :9222
# Should show: Chromium PID USER FD TYPE ...
# Or start it if not running
$ crwl cdp
```
**Problem**: `ERR_ABORTED` errors in concurrent crawls
**Fixed in v0.7.6**: This issue has been resolved. Pages are now properly isolated with locking.
### 8.2 Performance Issues
**Problem**: Too many open tabs
```python
# Ensure you're not using session_id for everything
config = CrawlerRunConfig() # No session_id
await crawler.arun_many(urls, config=config)
# Pages auto-close after crawling
```
**Problem**: Memory leaks
```python
# Always use context manager
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# Crawling code here
pass
# Automatic cleanup on exit
```
### 8.3 State Issues
**Problem**: Cookies not persisting
```python
# Use the same context (automatic with CDP)
browser_cfg = BrowserConfig(cdp_url="http://localhost:9222")
# All crawls share cookies automatically
```
**Problem**: Need isolated state
```python
# Use different CDP endpoints or non-CDP browsers
browser_cfg_1 = BrowserConfig(cdp_url="http://localhost:9222")
browser_cfg_2 = BrowserConfig(cdp_url="http://localhost:9223")
# Completely isolated browsers
```
---
## 9. Comparison: CDP vs Regular Browsers
| Feature | CDP Browser | Regular Browser |
|---------|-------------|-----------------|
| **Window Management** | ✅ Single window, multiple tabs | ❌ New window per context |
| **Startup Time** | ✅ Instant (already running) | ⏱️ ~2-3s per launch |
| **State Sharing** | ✅ Shared cookies/localStorage | ⚠️ Isolated by default |
| **Concurrent Safety** | ✅ Automatic locking | ✅ Separate processes |
| **Memory Usage** | ✅ Lower (shared browser) | ⚠️ Higher (multiple processes) |
| **Session Persistence** | ✅ Native support | ✅ Via session_id |
| **Stealth Mode** | ❌ Not compatible | ✅ Full support |
| **Best For** | Development, authenticated crawls | Production, isolated crawls |
---
## 10. Real-World Examples
### 10.1 E-commerce Product Scraping
```python
async def scrape_products():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
# Get product URLs from category page
async with AsyncWebCrawler(config=browser_cfg) as crawler:
category_result = await crawler.arun(
url="https://shop.example.com/category",
config=CrawlerRunConfig(
css_selector=".product-link"
)
)
# Extract product URLs
product_urls = extract_urls(category_result.links)
# Crawl all products concurrently
product_results = await crawler.arun_many(
urls=product_urls,
config=CrawlerRunConfig(
css_selector=".product-details",
semaphore_count=5 # Polite crawling
)
)
return [extract_product_data(r) for r in product_results]
```
### 10.2 News Article Monitoring
```python
import asyncio
from datetime import datetime
async def monitor_news_sites():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
news_sites = [
"https://news.site1.com",
"https://news.site2.com",
"https://news.site3.com"
]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
while True:
print(f"\n[{datetime.now()}] Checking for updates...")
results = await crawler.arun_many(
urls=news_sites,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, # Always fresh
css_selector=".article-headline"
)
)
for result in results:
if result.success:
headlines = extract_headlines(result)
for headline in headlines:
if is_new(headline):
notify_user(headline)
# Check every 5 minutes
await asyncio.sleep(300)
```
---
## 11. Summary
CDP browser crawling offers:
- 🚀 **Performance**: Faster startup, lower resource usage
- 🔄 **State Management**: Shared cookies and authentication
- 🎯 **Concurrent Safety**: Automatic page isolation and cleanup
- 💻 **Developer Friendly**: Visual debugging with DevTools
**When to use CDP:**
- Development and debugging
- Authenticated crawling (login required)
- Sequential crawls needing state
- Resource-constrained environments
**When to use regular browsers:**
- Production deployments
- Maximum isolation required
- Stealth mode needed
- Distributed/cloud crawling
For most use cases, **CDP browsers provide the best balance** of performance, convenience, and safety.

View File

@@ -1,165 +0,0 @@
#!/usr/bin/env python3
"""
Simple smoke test for CDP concurrency fixes.
This can be run without pytest to quickly validate the changes.
"""
import asyncio
import sys
import os
# Add the project root to Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
async def test_basic_cdp():
"""Basic test that CDP browser works"""
print("Test 1: Basic CDP browser test...")
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
assert result.success, f"Failed: {result.error_message}"
assert len(result.html) > 0, "Empty HTML"
print(" ✓ Basic CDP test passed")
return True
except Exception as e:
print(f" ✗ Basic CDP test failed: {e}")
return False
async def test_arun_many_cdp():
"""Test arun_many with CDP browser - the key concurrency fix"""
print("\nTest 2: arun_many with CDP browser...")
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.example.org",
]
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
assert len(results) == len(urls), f"Expected {len(urls)} results, got {len(results)}"
success_count = sum(1 for r in results if r.success)
print(f" ✓ Crawled {success_count}/{len(urls)} URLs successfully")
if success_count >= len(urls) * 0.8: # Allow 20% failure for network issues
print(" ✓ arun_many CDP test passed")
return True
else:
print(f" ✗ Too many failures: {len(urls) - success_count}/{len(urls)}")
return False
except Exception as e:
print(f" ✗ arun_many CDP test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_concurrent_arun_many():
"""Test concurrent arun_many calls - stress test for page lock"""
print("\nTest 3: Concurrent arun_many calls...")
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
# Run two arun_many calls concurrently
task1 = crawler.arun_many(
urls=["https://example.com", "https://httpbin.org/html"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
task2 = crawler.arun_many(
urls=["https://www.example.org", "https://example.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
results1, results2 = await asyncio.gather(task1, task2, return_exceptions=True)
# Check for exceptions
if isinstance(results1, Exception):
print(f" ✗ Task 1 raised exception: {results1}")
return False
if isinstance(results2, Exception):
print(f" ✗ Task 2 raised exception: {results2}")
return False
total_success = sum(1 for r in results1 if r.success) + sum(1 for r in results2 if r.success)
total_requests = len(results1) + len(results2)
print(f"{total_success}/{total_requests} concurrent requests succeeded")
if total_success >= total_requests * 0.7: # Allow 30% failure for concurrent stress
print(" ✓ Concurrent arun_many test passed")
return True
else:
print(f" ✗ Too many concurrent failures")
return False
except Exception as e:
print(f" ✗ Concurrent test failed: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all smoke tests"""
print("=" * 60)
print("CDP Concurrency Smoke Tests")
print("=" * 60)
results = []
# Run tests sequentially
results.append(await test_basic_cdp())
results.append(await test_arun_many_cdp())
results.append(await test_concurrent_arun_many())
print("\n" + "=" * 60)
passed = sum(results)
total = len(results)
if passed == total:
print(f"✓ All {total} smoke tests passed!")
print("=" * 60)
return 0
else:
print(f"{total - passed}/{total} smoke tests failed")
print("=" * 60)
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

View File

@@ -1,282 +0,0 @@
"""
Test CDP browser concurrency with arun_many.
This test suite validates that the fixes for concurrent page creation
in managed browsers (CDP mode) work correctly, particularly:
1. Always creating new pages instead of reusing
2. Page lock serialization prevents race conditions
3. Multiple concurrent arun_many calls work correctly
"""
# Standard library imports
import asyncio
import os
import sys
# Third-party imports
import pytest
# Add the project root to Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
# Local imports
from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode, CrawlerRunConfig
@pytest.mark.asyncio
async def test_cdp_concurrent_arun_many_basic():
"""
Test basic concurrent arun_many with CDP browser.
This tests the fix for always creating new pages.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
urls = [
"https://example.com",
"https://www.python.org",
"https://httpbin.org/html",
]
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Run arun_many - should create new pages for each URL
results = await crawler.arun_many(urls=urls, config=config)
# Verify all URLs were crawled successfully
assert len(results) == len(urls), f"Expected {len(urls)} results, got {len(results)}"
for i, result in enumerate(results):
assert result is not None, f"Result {i} is None"
assert result.success, f"Result {i} failed: {result.error_message}"
assert result.status_code == 200, f"Result {i} has status {result.status_code}"
assert len(result.html) > 0, f"Result {i} has empty HTML"
@pytest.mark.asyncio
async def test_cdp_multiple_sequential_arun_many():
"""
Test multiple sequential arun_many calls with CDP browser.
Each call should work correctly without interference.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
urls_batch1 = [
"https://example.com",
"https://httpbin.org/html",
]
urls_batch2 = [
"https://www.python.org",
"https://example.org",
]
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=browser_config) as crawler:
# First batch
results1 = await crawler.arun_many(urls=urls_batch1, config=config)
assert len(results1) == len(urls_batch1)
for result in results1:
assert result.success, f"First batch failed: {result.error_message}"
# Second batch - should work without issues
results2 = await crawler.arun_many(urls=urls_batch2, config=config)
assert len(results2) == len(urls_batch2)
for result in results2:
assert result.success, f"Second batch failed: {result.error_message}"
@pytest.mark.asyncio
async def test_cdp_concurrent_arun_many_stress():
"""
Stress test: Multiple concurrent arun_many calls with CDP browser.
This is the key test for the concurrency fix - ensures page lock works.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
# Create multiple batches of URLs
num_batches = 3
urls_per_batch = 3
batches = [
[f"https://httpbin.org/delay/{i}?batch={batch}"
for i in range(urls_per_batch)]
for batch in range(num_batches)
]
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Run multiple arun_many calls concurrently
tasks = [
crawler.arun_many(urls=batch, config=config)
for batch in batches
]
# Execute all batches in parallel
all_results = await asyncio.gather(*tasks, return_exceptions=True)
# Verify no exceptions occurred
for i, results in enumerate(all_results):
assert not isinstance(results, Exception), f"Batch {i} raised exception: {results}"
assert len(results) == urls_per_batch, f"Batch {i}: expected {urls_per_batch} results, got {len(results)}"
# Verify each result
for j, result in enumerate(results):
assert result is not None, f"Batch {i}, result {j} is None"
# Some may fail due to network/timing, but should not crash
if result.success:
assert len(result.html) > 0, f"Batch {i}, result {j} has empty HTML"
@pytest.mark.asyncio
async def test_cdp_page_isolation():
"""
Test that pages are properly isolated - changes to one don't affect another.
This validates that we're creating truly independent pages.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
url = "https://example.com"
# Use different JS codes to verify isolation
config1 = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code="document.body.setAttribute('data-test', 'page1');"
)
config2 = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code="document.body.setAttribute('data-test', 'page2');"
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Run both configs concurrently
results = await crawler.arun_many(
urls=[url, url],
configs=[config1, config2]
)
assert len(results) == 2
assert results[0].success and results[1].success
# Both should succeed with their own modifications
# (We can't directly check the data-test attribute, but success indicates isolation)
assert 'Example Domain' in results[0].html
assert 'Example Domain' in results[1].html
@pytest.mark.asyncio
async def test_cdp_with_different_viewport_sizes():
"""
Test concurrent crawling with different viewport configurations.
Ensures context/page creation handles different configs correctly.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
url = "https://example.com"
# Different viewport sizes (though in CDP mode these may be limited)
configs = [
CrawlerRunConfig(cache_mode=CacheMode.BYPASS),
CrawlerRunConfig(cache_mode=CacheMode.BYPASS),
CrawlerRunConfig(cache_mode=CacheMode.BYPASS),
]
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=[url] * len(configs),
configs=configs
)
assert len(results) == len(configs)
for i, result in enumerate(results):
assert result.success, f"Config {i} failed: {result.error_message}"
assert len(result.html) > 0
@pytest.mark.asyncio
async def test_cdp_error_handling_concurrent():
"""
Test that errors in one concurrent request don't affect others.
This ensures proper isolation and error handling.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
urls = [
"https://example.com", # Valid
"https://this-domain-definitely-does-not-exist-12345.com", # Invalid
"https://httpbin.org/html", # Valid
]
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
assert len(results) == len(urls)
# First and third should succeed
assert results[0].success, "First URL should succeed"
assert results[2].success, "Third URL should succeed"
# Second may fail (invalid domain)
# But its failure shouldn't affect the others
@pytest.mark.asyncio
async def test_cdp_large_batch():
"""
Test handling a larger batch of URLs to ensure scalability.
"""
browser_config = BrowserConfig(
use_managed_browser=True,
headless=True,
verbose=False
)
# Create 10 URLs
num_urls = 10
urls = [f"https://httpbin.org/delay/0?id={i}" for i in range(num_urls)]
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
assert len(results) == num_urls
# Count successes
successes = sum(1 for r in results if r.success)
# Allow some failures due to network issues, but most should succeed
assert successes >= num_urls * 0.8, f"Only {successes}/{num_urls} succeeded"
if __name__ == "__main__":
# Run tests with pytest
pytest.main([__file__, "-v", "-s"])

View File

@@ -0,0 +1,63 @@
"""
Test for arun_many with managed CDP browser to ensure each crawl gets its own tab.
"""
import pytest
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
@pytest.mark.asyncio
async def test_arun_many_with_cdp():
"""Test arun_many opens a new tab for each url with managed CDP browser."""
# NOTE: Requires a running CDP browser at localhost:9222
# Can be started with: crwl cdp -d 9222
browser_cfg = BrowserConfig(
browser_type="cdp",
cdp_url="http://localhost:9222",
verbose=False,
)
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls=urls, config=crawler_cfg)
# All results should be successful and distinct
assert len(results) == 3
for result in results:
assert result.success, f"Crawl failed: {result.url} - {result.error_message}"
assert result.markdown is not None
@pytest.mark.asyncio
async def test_arun_many_with_cdp_sequential():
"""Test arun_many sequentially to isolate issues."""
browser_cfg = BrowserConfig(
browser_type="cdp",
cdp_url="http://localhost:9222",
verbose=True,
)
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = []
for url in urls:
result = await crawler.arun(url=url, config=crawler_cfg)
results.append(result)
assert result.success, f"Crawl failed: {result.url} - {result.error_message}"
assert result.markdown is not None
assert len(results) == 3
if __name__ == "__main__":
asyncio.run(test_arun_many_with_cdp())