Compare commits

..

1 Commits

Author SHA1 Message Date
AHMET YILMAZ
c003cb6e4f fix #1563 (cdp): resolve page leaks and race conditions in concurrent crawling
Fix memory leaks and race conditions when using arun_many() with managed CDP browsers. Each crawl now gets proper page isolation with automatic cleanup while maintaining shared browser context.

Key fixes:
- Close non-session pages after crawling to prevent tab accumulation
- Add thread-safe page creation with locks to avoid concurrent access
- Improve page lifecycle management for managed vs non-managed browsers
- Keep session pages alive for authentication persistence
- Prevent TOCTOU (time-of-check-time-of-use) race conditions

This ensures stable parallel crawling without memory growth or browser instability.
2025-11-07 15:42:37 +08:00
9 changed files with 745 additions and 788 deletions

View File

@@ -1047,14 +1047,28 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
raise e raise e
finally: finally:
# If no session_id is given we should close the page # Clean up page after crawl completes
# For managed CDP browsers, close pages that are not part of a session to prevent memory leaks
all_contexts = page.context.browser.contexts all_contexts = page.context.browser.contexts
total_pages = sum(len(context.pages) for context in all_contexts) total_pages = sum(len(context.pages) for context in all_contexts)
should_close_page = False
if config.session_id: if config.session_id:
# Session pages are kept alive for reuse
pass pass
elif total_pages <= 1 and (self.browser_config.use_managed_browser or self.browser_config.headless): elif self.browser_config.use_managed_browser:
# For managed browsers (CDP), close non-session pages to prevent tab accumulation
# This is especially important for arun_many() with multiple concurrent crawls
should_close_page = True
elif total_pages <= 1 and self.browser_config.headless:
# Keep the last page in headless mode to avoid closing the browser
pass pass
else: else:
# For non-managed browsers, close the page
should_close_page = True
if should_close_page:
# Detach listeners before closing to prevent potential errors during close # Detach listeners before closing to prevent potential errors during close
if config.capture_network_requests: if config.capture_network_requests:
page.remove_listener("request", handle_request_capture) page.remove_listener("request", handle_request_capture)

View File

@@ -617,17 +617,7 @@ class AsyncWebCrawler:
else config.chunking_strategy else config.chunking_strategy
) )
sections = chunking.chunk(content) sections = chunking.chunk(content)
# extracted_content = config.extraction_strategy.run(url, sections) extracted_content = config.extraction_strategy.run(url, sections)
# Use async version if available for better parallelism
if hasattr(config.extraction_strategy, 'arun'):
extracted_content = await config.extraction_strategy.arun(url, sections)
else:
# Fallback to sync version run in thread pool to avoid blocking
extracted_content = await asyncio.to_thread(
config.extraction_strategy.run, url, sections
)
extracted_content = json.dumps( extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False extracted_content, indent=4, default=str, ensure_ascii=False
) )

View File

@@ -369,9 +369,6 @@ class ManagedBrowser:
] ]
if self.headless: if self.headless:
flags.append("--headless=new") flags.append("--headless=new")
# Add viewport flag if specified in config
if self.browser_config.viewport_height and self.browser_config.viewport_width:
flags.append(f"--window-size={self.browser_config.viewport_width},{self.browser_config.viewport_height}")
# merge common launch flags # merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config)) flags.extend(self.build_browser_flags(self.browser_config))
elif self.browser_type == "firefox": elif self.browser_type == "firefox":
@@ -1038,34 +1035,20 @@ class BrowserManager:
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time()) self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context return page, context
# If using a managed browser, just grab the shared default_context # If using a managed browser, reuse the default context and create new pages
if self.config.use_managed_browser: if self.config.use_managed_browser:
context = self.default_context
if self.config.storage_state: if self.config.storage_state:
context = await self.create_browser_context(crawlerRunConfig) # Clone runtime state from storage to the shared context
ctx = self.default_context # default context, one window only ctx = self.default_context
ctx = await clone_runtime_state(context, ctx, crawlerRunConfig, self.config) ctx = await clone_runtime_state(context, ctx, crawlerRunConfig, self.config)
# Avoid concurrent new_page on shared persistent context
# See GH-1198: context.pages can be empty under races # Always create a new page for concurrent safety
async with self._page_lock: # The page-level isolation prevents race conditions while sharing the same context
page = await ctx.new_page() async with self._page_lock:
await self._apply_stealth_to_page(page) page = await context.new_page()
else:
context = self.default_context await self._apply_stealth_to_page(page)
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
if pages:
page = pages[0]
else:
# Double-check under lock to avoid TOCTOU and ensure only
# one task calls new_page when pages=[] concurrently
async with self._page_lock:
pages = context.pages
if pages:
page = pages[0]
else:
page = await context.new_page()
await self._apply_stealth_to_page(page)
else: else:
# Otherwise, check if we have an existing context for this config # Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig) config_signature = self._make_config_signature(crawlerRunConfig)

View File

@@ -94,20 +94,6 @@ class ExtractionStrategy(ABC):
extracted_content.extend(future.result()) extracted_content.extend(future.result())
return extracted_content return extracted_content
async def arun(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Async version: Process sections of text in parallel using asyncio.
Default implementation runs the sync version in a thread pool.
Subclasses can override this for true async processing.
:param url: The URL of the webpage.
:param sections: List of sections (strings) to process.
:return: A list of processed JSON blocks.
"""
import asyncio
return await asyncio.to_thread(self.run, url, sections, *q, **kwargs)
class NoExtractionStrategy(ExtractionStrategy): class NoExtractionStrategy(ExtractionStrategy):
""" """
@@ -794,177 +780,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
return extracted_content return extracted_content
async def aextract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
"""
Async version: Extract meaningful blocks or chunks from the given HTML using an LLM.
How it works:
1. Construct a prompt with variables.
2. Make an async request to the LLM using the prompt.
3. Parse the response and extract blocks or chunks.
Args:
url: The URL of the webpage.
ix: Index of the block.
html: The HTML content of the webpage.
Returns:
A list of extracted blocks or chunks.
"""
from .utils import aperform_completion_with_backoff
if self.verbose:
print(f"[LOG] Call LLM for {url} - block index: {ix}")
variable_values = {
"URL": url,
"HTML": escape_json_string(sanitize_html(html)),
}
prompt_with_variables = PROMPT_EXTRACT_BLOCKS
if self.instruction:
variable_values["REQUEST"] = self.instruction
prompt_with_variables = PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION
if self.extract_type == "schema" and self.schema:
variable_values["SCHEMA"] = json.dumps(self.schema, indent=2)
prompt_with_variables = PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION
if self.extract_type == "schema" and not self.schema:
prompt_with_variables = PROMPT_EXTRACT_INFERRED_SCHEMA
for variable in variable_values:
prompt_with_variables = prompt_with_variables.replace(
"{" + variable + "}", variable_values[variable]
)
try:
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
base_url=self.llm_config.base_url,
json_response=self.force_json_response,
extra_args=self.extra_args,
)
# Track usage
usage = TokenUsage(
completion_tokens=response.usage.completion_tokens,
prompt_tokens=response.usage.prompt_tokens,
total_tokens=response.usage.total_tokens,
completion_tokens_details=response.usage.completion_tokens_details.__dict__
if response.usage.completion_tokens_details
else {},
prompt_tokens_details=response.usage.prompt_tokens_details.__dict__
if response.usage.prompt_tokens_details
else {},
)
self.usages.append(usage)
# Update totals
self.total_usage.completion_tokens += usage.completion_tokens
self.total_usage.prompt_tokens += usage.prompt_tokens
self.total_usage.total_tokens += usage.total_tokens
try:
content = response.choices[0].message.content
blocks = None
if self.force_json_response:
blocks = json.loads(content)
if isinstance(blocks, dict):
if len(blocks) == 1 and isinstance(list(blocks.values())[0], list):
blocks = list(blocks.values())[0]
else:
blocks = [blocks]
elif isinstance(blocks, list):
blocks = blocks
else:
blocks = extract_xml_data(["blocks"], content)["blocks"]
blocks = json.loads(blocks)
for block in blocks:
block["error"] = False
except Exception:
parsed, unparsed = split_and_parse_json_objects(
response.choices[0].message.content
)
blocks = parsed
if unparsed:
blocks.append(
{"index": 0, "error": True, "tags": ["error"], "content": unparsed}
)
if self.verbose:
print(
"[LOG] Extracted",
len(blocks),
"blocks from URL:",
url,
"block index:",
ix,
)
return blocks
except Exception as e:
if self.verbose:
print(f"[LOG] Error in LLM extraction: {e}")
return [
{
"index": ix,
"error": True,
"tags": ["error"],
"content": str(e),
}
]
async def arun(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
"""
Async version: Process sections with true parallelism using asyncio.gather.
Args:
url: The URL of the webpage.
sections: List of sections (strings) to process.
Returns:
A list of extracted blocks or chunks.
"""
import asyncio
merged_sections = self._merge(
sections,
self.chunk_token_threshold,
overlap=int(self.chunk_token_threshold * self.overlap_rate),
)
extracted_content = []
# Create tasks for all sections to run in parallel
tasks = [
self.aextract(url, ix, sanitize_input_encode(section))
for ix, section in enumerate(merged_sections)
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in results:
if isinstance(result, Exception):
if self.verbose:
print(f"Error in async extraction: {result}")
extracted_content.append(
{
"index": 0,
"error": True,
"tags": ["error"],
"content": str(result),
}
)
else:
extracted_content.extend(result)
return extracted_content
def show_usage(self) -> None: def show_usage(self) -> None:
"""Print a detailed token usage report showing total and per-request usage.""" """Print a detailed token usage report showing total and per-request usage."""
print("\n=== Token Usage Summary ===") print("\n=== Token Usage Summary ===")

View File

@@ -1825,82 +1825,6 @@ def perform_completion_with_backoff(
# ] # ]
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
json_response=False,
base_url=None,
**kwargs,
):
"""
Async version: Perform an API completion request with exponential backoff.
How it works:
1. Sends an async completion request to the API.
2. Retries on rate-limit errors with exponential delays (async).
3. Returns the API response or an error after all retries.
Args:
provider (str): The name of the API provider.
prompt_with_variables (str): The input prompt for the completion request.
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
**kwargs: Additional arguments for the API request.
Returns:
dict: The API response or an error message after all retries.
"""
from litellm import acompletion
from litellm.exceptions import RateLimitError
import asyncio
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
if kwargs.get("extra_args"):
extra_args.update(kwargs["extra_args"])
for attempt in range(max_attempts):
try:
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
)
return response # Return the successful response
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
await asyncio.sleep(delay)
else:
# Return an error response after exhausting all retries
return [
{
"index": 0,
"tags": ["error"],
"content": ["Rate limit error. Please try again later."],
}
]
except Exception as e:
raise e # Raise any other exceptions immediately
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None): def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
""" """
Extract content blocks from website HTML using an AI provider. Extract content blocks from website HTML using an AI provider.

View File

@@ -0,0 +1,594 @@
# CDP Browser Crawling
> **New in v0.7.6**: Efficient concurrent crawling with managed CDP (Chrome DevTools Protocol) browsers. Connect to a running browser instance and perform multiple crawls without spawning new windows.
## 1. Overview
When working with CDP browsers, you can connect to an existing browser instance instead of launching a new one for each crawl. This is particularly useful for:
- **Development**: Keep your browser open with DevTools for debugging
- **Persistent Sessions**: Maintain authentication across multiple crawls
- **Resource Efficiency**: Reuse a single browser instance for multiple operations
- **Concurrent Crawling**: Run multiple crawls simultaneously with proper isolation
**Key Benefits:**
- ✅ Single browser window with multiple tabs (no window clutter)
- ✅ Shared state (cookies, localStorage) across crawls
- ✅ Concurrent safety with automatic page isolation
- ✅ Automatic cleanup to prevent memory leaks
- ✅ Works seamlessly with `arun_many()` for parallel crawling
---
## 2. Quick Start
### 2.1 Starting a CDP Browser
Use the Crawl4AI CLI to start a managed CDP browser:
```bash
# Start CDP browser on default port (9222)
crwl cdp
# Start on custom port
crwl cdp -d 9223
# Start in headless mode
crwl cdp --headless
```
The browser will stay running until you press 'q' or close the terminal.
### 2.2 Basic CDP Connection
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def main():
# Configure CDP connection
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222",
verbose=True
)
# Crawl a single URL
async with AsyncWebCrawler(config=browser_cfg) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig()
)
print(f"Success: {result.success}")
print(f"Content length: {len(result.markdown)}")
if __name__ == "__main__":
asyncio.run(main())
```
---
## 3. Concurrent Crawling with arun_many()
The real power of CDP crawling shines with `arun_many()`. The browser manager automatically handles:
- **Page Isolation**: Each crawl gets its own tab
- **Context Sharing**: All tabs share cookies and localStorage
- **Concurrent Safety**: Proper locking prevents race conditions
- **Auto Cleanup**: Tabs are closed after crawling (except sessions)
### 3.1 Basic Concurrent Crawling
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
async def crawl_multiple_urls():
# URLs to crawl
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
# Configure CDP browser
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222",
verbose=False
)
# Configure crawler (bypass cache for fresh data)
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS
)
# Crawl all URLs concurrently
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(
urls=urls,
config=crawler_cfg
)
# Process results
for result in results:
print(f"\nURL: {result.url}")
if result.success:
print(f"✓ Success | Content length: {len(result.markdown)}")
else:
print(f"✗ Failed: {result.error_message}")
if __name__ == "__main__":
asyncio.run(crawl_multiple_urls())
```
### 3.2 With Session Management
Use sessions to maintain authentication and state across individual crawls:
```python
async def crawl_with_sessions():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# First crawl: Login page
login_result = await crawler.arun(
url="https://example.com/login",
config=CrawlerRunConfig(
session_id="my-session", # Session persists
js_code="document.querySelector('#login').click();"
)
)
# Second crawl: Reuse authenticated session
dashboard_result = await crawler.arun(
url="https://example.com/dashboard",
config=CrawlerRunConfig(
session_id="my-session" # Same session, cookies preserved
)
)
```
---
## 4. How It Works
### 4.1 Browser Context Reuse
When using CDP browsers, Crawl4AI:
1. **Connects** to the existing browser via CDP URL
2. **Reuses** the default browser context (single window)
3. **Creates** new pages (tabs) for each crawl
4. **Locks** page creation to prevent concurrent races
5. **Cleans up** pages after crawling (unless it's a session)
```python
# Internal behavior (simplified)
if self.config.use_managed_browser:
context = self.default_context # Shared context
# Thread-safe page creation
async with self._page_lock:
page = await context.new_page() # New tab per crawl
# After crawl completes
if not config.session_id:
await page.close() # Auto cleanup
```
### 4.2 Page Lifecycle
```mermaid
graph TD
A[Start Crawl] --> B{Has session_id?}
B -->|Yes| C[Reuse existing page]
B -->|No| D[Create new page/tab]
D --> E[Navigate & Extract]
C --> E
E --> F{Is session?}
F -->|Yes| G[Keep page open]
F -->|No| H[Close page]
H --> I[End]
G --> I
```
### 4.3 State Sharing
All pages in the same context share:
- 🍪 **Cookies**: Authentication tokens, preferences
- 💾 **localStorage**: Client-side data storage
- 🔐 **sessionStorage**: Per-tab session data
- 🌐 **Network cache**: Shared HTTP cache
This makes it perfect for crawling authenticated sites or maintaining state across multiple pages.
---
## 5. Configuration Options
### 5.1 BrowserConfig for CDP
```python
browser_cfg = BrowserConfig(
browser_type="chromium", # Must be "chromium" for CDP
cdp_url="http://localhost:9222", # CDP endpoint URL
verbose=True, # Log browser operations
# Optional: Override headers for all requests
headers={
"Accept-Language": "en-US,en;q=0.9",
},
# Optional: Set user agent
user_agent="Mozilla/5.0 ...",
# Optional: Enable stealth mode (requires dedicated browser)
# enable_stealth=False, # Not compatible with CDP
)
```
### 5.2 CrawlerRunConfig Options
```python
crawler_cfg = CrawlerRunConfig(
# Session management
session_id="my-session", # Persist page across calls
# Caching
cache_mode=CacheMode.BYPASS, # Fresh data every time
# Browser location (affects timezone, locale)
locale="en-US",
timezone_id="America/New_York",
geolocation={
"latitude": 40.7128,
"longitude": -74.0060
},
# Proxy (per-crawl override)
proxy_config={
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass"
}
)
```
---
## 6. Advanced Patterns
### 6.1 Streaming Results
Process URLs as they complete instead of waiting for all:
```python
async def stream_crawl_results():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
urls = ["https://example.com" for _ in range(100)]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# Stream results as they complete
async for result in crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(stream=True)
):
if result.success:
print(f"{result.url}: {len(result.markdown)} chars")
# Process immediately instead of waiting for all
await save_to_database(result)
```
### 6.2 Custom Concurrency Control
```python
from crawl4ai import CrawlerRunConfig
# Limit concurrent crawls to 3
crawler_cfg = CrawlerRunConfig(
semaphore_count=3, # Max 3 concurrent requests
mean_delay=0.5, # Average 0.5s delay between requests
max_range=1.0, # +/- 1s random delay
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls, config=crawler_cfg)
```
### 6.3 Multi-Config Crawling
Different configurations for different URL groups:
```python
from crawl4ai import CrawlerRunConfig
# Fast crawl for static pages
fast_config = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=30000
)
# Slow crawl for dynamic pages
slow_config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=60000,
js_code="window.scrollTo(0, document.body.scrollHeight);"
)
configs = [fast_config, slow_config, fast_config]
urls = ["https://static.com", "https://dynamic.com", "https://static2.com"]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls, configs=configs)
```
---
## 7. Best Practices
### 7.1 Resource Management
**DO:**
```python
# Use context manager for automatic cleanup
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls)
# Browser connection closed automatically
```
**DON'T:**
```python
# Manual management risks resource leaks
crawler = AsyncWebCrawler(config=browser_cfg)
await crawler.start()
results = await crawler.arun_many(urls)
# Forgot to call crawler.close()!
```
### 7.2 Session Management
**DO:**
```python
# Use sessions for related crawls
config = CrawlerRunConfig(session_id="user-flow")
await crawler.arun(login_url, config=config)
await crawler.arun(dashboard_url, config=config)
await crawler.kill_session("user-flow") # Clean up when done
```
**DON'T:**
```python
# Creating new session IDs unnecessarily
for i in range(100):
config = CrawlerRunConfig(session_id=f"session-{i}")
await crawler.arun(url, config=config)
# 100 unclosed sessions accumulate!
```
### 7.3 Error Handling
```python
async def robust_crawl(urls):
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
try:
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls)
# Separate successes and failures
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
print(f"{len(successes)} succeeded")
print(f"{len(failures)} failed")
# Retry failures with different config
if failures:
retry_urls = [r.url for r in failures]
retry_config = CrawlerRunConfig(
page_timeout=120000, # Longer timeout
wait_until="networkidle"
)
retry_results = await crawler.arun_many(
retry_urls,
config=retry_config
)
return successes + retry_results
except Exception as e:
print(f"Fatal error: {e}")
return []
```
---
## 8. Troubleshooting
### 8.1 Connection Issues
**Problem**: `Cannot connect to CDP browser`
```python
# Check CDP browser is running
$ lsof -i :9222
# Should show: Chromium PID USER FD TYPE ...
# Or start it if not running
$ crwl cdp
```
**Problem**: `ERR_ABORTED` errors in concurrent crawls
**Fixed in v0.7.6**: This issue has been resolved. Pages are now properly isolated with locking.
### 8.2 Performance Issues
**Problem**: Too many open tabs
```python
# Ensure you're not using session_id for everything
config = CrawlerRunConfig() # No session_id
await crawler.arun_many(urls, config=config)
# Pages auto-close after crawling
```
**Problem**: Memory leaks
```python
# Always use context manager
async with AsyncWebCrawler(config=browser_cfg) as crawler:
# Crawling code here
pass
# Automatic cleanup on exit
```
### 8.3 State Issues
**Problem**: Cookies not persisting
```python
# Use the same context (automatic with CDP)
browser_cfg = BrowserConfig(cdp_url="http://localhost:9222")
# All crawls share cookies automatically
```
**Problem**: Need isolated state
```python
# Use different CDP endpoints or non-CDP browsers
browser_cfg_1 = BrowserConfig(cdp_url="http://localhost:9222")
browser_cfg_2 = BrowserConfig(cdp_url="http://localhost:9223")
# Completely isolated browsers
```
---
## 9. Comparison: CDP vs Regular Browsers
| Feature | CDP Browser | Regular Browser |
|---------|-------------|-----------------|
| **Window Management** | ✅ Single window, multiple tabs | ❌ New window per context |
| **Startup Time** | ✅ Instant (already running) | ⏱️ ~2-3s per launch |
| **State Sharing** | ✅ Shared cookies/localStorage | ⚠️ Isolated by default |
| **Concurrent Safety** | ✅ Automatic locking | ✅ Separate processes |
| **Memory Usage** | ✅ Lower (shared browser) | ⚠️ Higher (multiple processes) |
| **Session Persistence** | ✅ Native support | ✅ Via session_id |
| **Stealth Mode** | ❌ Not compatible | ✅ Full support |
| **Best For** | Development, authenticated crawls | Production, isolated crawls |
---
## 10. Real-World Examples
### 10.1 E-commerce Product Scraping
```python
async def scrape_products():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
# Get product URLs from category page
async with AsyncWebCrawler(config=browser_cfg) as crawler:
category_result = await crawler.arun(
url="https://shop.example.com/category",
config=CrawlerRunConfig(
css_selector=".product-link"
)
)
# Extract product URLs
product_urls = extract_urls(category_result.links)
# Crawl all products concurrently
product_results = await crawler.arun_many(
urls=product_urls,
config=CrawlerRunConfig(
css_selector=".product-details",
semaphore_count=5 # Polite crawling
)
)
return [extract_product_data(r) for r in product_results]
```
### 10.2 News Article Monitoring
```python
import asyncio
from datetime import datetime
async def monitor_news_sites():
browser_cfg = BrowserConfig(
browser_type="chromium",
cdp_url="http://localhost:9222"
)
news_sites = [
"https://news.site1.com",
"https://news.site2.com",
"https://news.site3.com"
]
async with AsyncWebCrawler(config=browser_cfg) as crawler:
while True:
print(f"\n[{datetime.now()}] Checking for updates...")
results = await crawler.arun_many(
urls=news_sites,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, # Always fresh
css_selector=".article-headline"
)
)
for result in results:
if result.success:
headlines = extract_headlines(result)
for headline in headlines:
if is_new(headline):
notify_user(headline)
# Check every 5 minutes
await asyncio.sleep(300)
```
---
## 11. Summary
CDP browser crawling offers:
- 🚀 **Performance**: Faster startup, lower resource usage
- 🔄 **State Management**: Shared cookies and authentication
- 🎯 **Concurrent Safety**: Automatic page isolation and cleanup
- 💻 **Developer Friendly**: Visual debugging with DevTools
**When to use CDP:**
- Development and debugging
- Authenticated crawling (login required)
- Sequential crawls needing state
- Resource-constrained environments
**When to use regular browsers:**
- Production deployments
- Maximum isolation required
- Stealth mode needed
- Distributed/cloud crawling
For most use cases, **CDP browsers provide the best balance** of performance, convenience, and safety.

View File

@@ -1,304 +1,98 @@
# Proxy & Security # Proxy
This guide covers proxy configuration and security features in Crawl4AI, including SSL certificate analysis and proxy rotation strategies.
## Understanding Proxy Configuration
Crawl4AI recommends configuring proxies per request through `CrawlerRunConfig.proxy_config`. This gives you precise control, enables rotation strategies, and keeps examples simple enough to copy, paste, and run.
## Basic Proxy Setup ## Basic Proxy Setup
Configure proxies that apply to each crawl operation: Simple proxy configuration with `BrowserConfig`:
```python ```python
import asyncio from crawl4ai.async_configs import BrowserConfig
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, ProxyConfig
run_config = CrawlerRunConfig(proxy_config=ProxyConfig(server="http://proxy.example.com:8080")) # Using HTTP proxy
# run_config = CrawlerRunConfig(proxy_config={"server": "http://proxy.example.com:8080"}) browser_config = BrowserConfig(proxy_config={"server": "http://proxy.example.com:8080"})
# run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080") async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
# Using SOCKS proxy
async def main(): browser_config = BrowserConfig(proxy_config={"server": "socks5://proxy.example.com:1080"})
browser_config = BrowserConfig() async with AsyncWebCrawler(config=browser_config) as crawler:
async with AsyncWebCrawler(config=browser_config) as crawler: result = await crawler.arun(url="https://example.com")
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
``` ```
!!! note "Why request-level?" ## Authenticated Proxy
`CrawlerRunConfig.proxy_config` keeps each request self-contained, so swapping proxies or rotation strategies is just a matter of building a new run configuration.
## Supported Proxy Formats Use an authenticated proxy with `BrowserConfig`:
The `ProxyConfig.from_string()` method supports multiple formats:
```python ```python
from crawl4ai import ProxyConfig from crawl4ai.async_configs import BrowserConfig
# HTTP proxy with authentication browser_config = BrowserConfig(proxy_config={
proxy1 = ProxyConfig.from_string("http://user:pass@192.168.1.1:8080") "server": "http://[host]:[port]",
"username": "[username]",
# HTTPS proxy "password": "[password]",
proxy2 = ProxyConfig.from_string("https://proxy.example.com:8080") })
async with AsyncWebCrawler(config=browser_config) as crawler:
# SOCKS5 proxy result = await crawler.arun(url="https://example.com")
proxy3 = ProxyConfig.from_string("socks5://proxy.example.com:1080")
# Simple IP:port format
proxy4 = ProxyConfig.from_string("192.168.1.1:8080")
# IP:port:user:pass format
proxy5 = ProxyConfig.from_string("192.168.1.1:8080:user:pass")
``` ```
## Authenticated Proxies
For proxies requiring authentication: ## Rotating Proxies
Example using a proxy rotation service dynamically:
```python ```python
import asyncio
from crawl4ai import AsyncWebCrawler,BrowserConfig, CrawlerRunConfig, ProxyConfig
run_config = CrawlerRunConfig(
proxy_config=ProxyConfig(
server="http://proxy.example.com:8080",
username="your_username",
password="your_password",
)
)
# Or dictionary style:
# run_config = CrawlerRunConfig(proxy_config={
# "server": "http://proxy.example.com:8080",
# "username": "your_username",
# "password": "your_password",
# })
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
```
## Environment Variable Configuration
Load proxies from environment variables for easy configuration:
```python
import os
from crawl4ai import ProxyConfig, CrawlerRunConfig
# Set environment variable
os.environ["PROXIES"] = "ip1:port1:user1:pass1,ip2:port2:user2:pass2,ip3:port3"
# Load all proxies
proxies = ProxyConfig.from_env()
print(f"Loaded {len(proxies)} proxies")
# Use first proxy
if proxies:
run_config = CrawlerRunConfig(proxy_config=proxies[0])
```
## Rotating Proxies
Crawl4AI supports automatic proxy rotation to distribute requests across multiple proxy servers. Rotation is applied per request using a rotation strategy on `CrawlerRunConfig`.
### Proxy Rotation (recommended)
```python
import asyncio
import re import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, ProxyConfig from crawl4ai import (
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
RoundRobinProxyStrategy,
)
import asyncio
from crawl4ai import ProxyConfig
async def main(): async def main():
# Load proxies from environment # Load proxies and create rotation strategy
proxies = ProxyConfig.from_env() proxies = ProxyConfig.from_env()
#eg: export PROXIES="ip1:port1:username1:password1,ip2:port2:username2:password2"
if not proxies: if not proxies:
print("No proxies found! Set PROXIES environment variable.") print("No proxies found in environment. Set PROXIES env variable!")
return return
# Create rotation strategy
proxy_strategy = RoundRobinProxyStrategy(proxies) proxy_strategy = RoundRobinProxyStrategy(proxies)
# Configure per-request with proxy rotation # Create configs
browser_config = BrowserConfig(headless=True, verbose=False) browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig( run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, cache_mode=CacheMode.BYPASS,
proxy_rotation_strategy=proxy_strategy, proxy_rotation_strategy=proxy_strategy
) )
async with AsyncWebCrawler(config=browser_config) as crawler: async with AsyncWebCrawler(config=browser_config) as crawler:
urls = ["https://httpbin.org/ip"] * (len(proxies) * 2) # Test each proxy twice urls = ["https://httpbin.org/ip"] * (len(proxies) * 2) # Test each proxy twice
print(f"🚀 Testing {len(proxies)} proxies with rotation...") print("\n📈 Initializing crawler with proxy rotation...")
results = await crawler.arun_many(urls=urls, config=run_config) async with AsyncWebCrawler(config=browser_config) as crawler:
print("\n🚀 Starting batch crawl with proxy rotation...")
results = await crawler.arun_many(
urls=urls,
config=run_config
)
for result in results:
if result.success:
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html)
current_proxy = run_config.proxy_config if run_config.proxy_config else None
for i, result in enumerate(results): if current_proxy and ip_match:
if result.success: print(f"URL {result.url}")
# Extract IP from response print(f"Proxy {current_proxy.server} -> Response IP: {ip_match.group(0)}")
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html) verified = ip_match.group(0) == current_proxy.ip
if ip_match: if verified:
detected_ip = ip_match.group(0) print(f"✅ Proxy working! IP matches: {current_proxy.ip}")
proxy_index = i % len(proxies) else:
expected_ip = proxies[proxy_index].ip print("❌ Proxy failed or IP mismatch!")
print("---")
print(f"✅ Request {i+1}: Proxy {proxy_index+1} -> IP {detected_ip}") asyncio.run(main())
if detected_ip == expected_ip:
print(" 🎯 IP matches proxy configuration")
else:
print(f" ⚠️ IP mismatch (expected {expected_ip})")
else:
print(f"❌ Request {i+1}: Could not extract IP from response")
else:
print(f"❌ Request {i+1}: Failed - {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
``` ```
## SSL Certificate Analysis
Combine proxy usage with SSL certificate inspection for enhanced security analysis. SSL certificate fetching is configured per request via `CrawlerRunConfig`.
### Per-Request SSL Certificate Analysis
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
run_config = CrawlerRunConfig(
proxy_config={
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass",
},
fetch_ssl_certificate=True, # Enable SSL certificate analysis for this request
)
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
if result.success:
print(f"✅ Crawled via proxy: {result.url}")
# Analyze SSL certificate
if result.ssl_certificate:
cert = result.ssl_certificate
print("🔒 SSL Certificate Info:")
print(f" Issuer: {cert.issuer}")
print(f" Subject: {cert.subject}")
print(f" Valid until: {cert.valid_until}")
print(f" Fingerprint: {cert.fingerprint}")
# Export certificate
cert.to_json("certificate.json")
print("💾 Certificate exported to certificate.json")
else:
print("⚠️ No SSL certificate information available")
if __name__ == "__main__":
asyncio.run(main())
```
## Security Best Practices
### 1. Proxy Rotation for Anonymity
```python
from crawl4ai import CrawlerRunConfig, ProxyConfig
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy
# Use multiple proxies to avoid IP blocking
proxies = ProxyConfig.from_env("PROXIES")
strategy = RoundRobinProxyStrategy(proxies)
# Configure rotation per request (recommended)
run_config = CrawlerRunConfig(proxy_rotation_strategy=strategy)
# For a fixed proxy across all requests, just reuse the same run_config instance
static_run_config = run_config
```
### 2. SSL Certificate Verification
```python
from crawl4ai import CrawlerRunConfig
# Always verify SSL certificates when possible
# Per-request (affects specific requests)
run_config = CrawlerRunConfig(fetch_ssl_certificate=True)
```
### 3. Environment Variable Security
```bash
# Use environment variables for sensitive proxy credentials
# Avoid hardcoding usernames/passwords in code
export PROXIES="ip1:port1:user1:pass1,ip2:port2:user2:pass2"
```
### 4. SOCKS5 for Enhanced Security
```python
from crawl4ai import CrawlerRunConfig
# Prefer SOCKS5 proxies for better protocol support
run_config = CrawlerRunConfig(proxy_config="socks5://proxy.example.com:1080")
```
## Migration from Deprecated `proxy` Parameter
!!! warning "Deprecation Notice"
The legacy `proxy` argument on `BrowserConfig` is deprecated. Configure proxies through `CrawlerRunConfig.proxy_config` so each request fully describes its network settings.
```python
# Old (deprecated) approach
# from crawl4ai import BrowserConfig
# browser_config = BrowserConfig(proxy="http://proxy.example.com:8080")
# New (preferred) approach
from crawl4ai import CrawlerRunConfig
run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080")
```
### Safe Logging of Proxies
```python
from crawl4ai import ProxyConfig
def safe_proxy_repr(proxy: ProxyConfig):
if getattr(proxy, "username", None):
return f"{proxy.server} (auth: ****)"
return proxy.server
```
## Troubleshooting
### Common Issues
???+ question "Proxy connection failed"
- Verify the proxy server is reachable from your network.
- Double-check authentication credentials.
- Ensure the protocol matches (`http`, `https`, or `socks5`).
???+ question "SSL certificate errors"
- Some proxies break SSL inspection; switch proxies if you see repeated failures.
- Consider temporarily disabling certificate fetching to isolate the issue.
???+ question "Environment variables not loading"
- Confirm `PROXIES` (or your custom env var) is set before running the script.
- Check formatting: `ip:port:user:pass,ip:port:user:pass`.
???+ question "Proxy rotation not working"
- Ensure `ProxyConfig.from_env()` actually loaded entries (`len(proxies) > 0`).
- Attach `proxy_rotation_strategy` to `CrawlerRunConfig`.
- Validate the proxy definitions you pass into the strategy.

View File

@@ -0,0 +1,63 @@
"""
Test for arun_many with managed CDP browser to ensure each crawl gets its own tab.
"""
import pytest
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
@pytest.mark.asyncio
async def test_arun_many_with_cdp():
"""Test arun_many opens a new tab for each url with managed CDP browser."""
# NOTE: Requires a running CDP browser at localhost:9222
# Can be started with: crwl cdp -d 9222
browser_cfg = BrowserConfig(
browser_type="cdp",
cdp_url="http://localhost:9222",
verbose=False,
)
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = await crawler.arun_many(urls=urls, config=crawler_cfg)
# All results should be successful and distinct
assert len(results) == 3
for result in results:
assert result.success, f"Crawl failed: {result.url} - {result.error_message}"
assert result.markdown is not None
@pytest.mark.asyncio
async def test_arun_many_with_cdp_sequential():
"""Test arun_many sequentially to isolate issues."""
browser_cfg = BrowserConfig(
browser_type="cdp",
cdp_url="http://localhost:9222",
verbose=True,
)
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://www.python.org",
]
crawler_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
results = []
for url in urls:
result = await crawler.arun(url=url, config=crawler_cfg)
results.append(result)
assert result.success, f"Crawl failed: {result.url} - {result.error_message}"
assert result.markdown is not None
assert len(results) == 3
if __name__ == "__main__":
asyncio.run(test_arun_many_with_cdp())

View File

@@ -1,220 +0,0 @@
"""
Final verification test for Issue #1055 fix
This test demonstrates that LLM extraction now runs in parallel
when using arun_many with multiple URLs.
"""
import os
import sys
import time
import asyncio
grandparent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(grandparent_dir)
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
LLMExtractionStrategy,
LLMConfig,
)
from pydantic import BaseModel
class SimpleData(BaseModel):
title: str
summary: str
def print_section(title):
print("\n" + "=" * 80)
print(title)
print("=" * 80 + "\n")
async def test_without_llm():
"""Baseline: Test crawling without LLM extraction"""
print_section("TEST 1: Crawling WITHOUT LLM Extraction")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs without LLM extraction...")
print("Expected: Fast and parallel\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" Average: {duration/len(urls):.2f}s per URL")
return duration
async def test_with_llm_before_fix():
"""Demonstrate the problem: Sequential execution with LLM"""
print_section("TEST 2: What Issue #1055 Reported (LLM Sequential Behavior)")
print("The issue reported that with LLM extraction, URLs would crawl")
print("one after another instead of in parallel.")
print("\nWithout our fix, this would show:")
print(" - URL 1 fetches → extracts → completes")
print(" - URL 2 fetches → extracts → completes")
print(" - URL 3 fetches → extracts → completes")
print("\nTotal time would be approximately sum of all individual times.")
async def test_with_llm_after_fix():
"""Demonstrate the fix: Parallel execution with LLM"""
print_section("TEST 3: After Fix - LLM Extraction in Parallel")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs WITH LLM extraction...")
print("Expected: Parallel execution with our fix\n")
completion_times = {}
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
for result in results:
elapsed = time.time() - start_time
completion_times[result.url] = elapsed
print(f" [{elapsed:5.2f}s] ✓ {result.url[:50]}")
duration = time.time() - start_time
print(f"\n✅ Total time: {duration:.2f}s")
print(f" Successful: {sum(1 for url in urls if url in completion_times)}/{len(urls)}")
# Analyze parallelism
times = list(completion_times.values())
if len(times) >= 2:
# If parallel, completion times should be staggered, not evenly spaced
time_diffs = [times[i+1] - times[i] for i in range(len(times)-1)]
avg_diff = sum(time_diffs) / len(time_diffs)
print(f"\nParallelism Analysis:")
print(f" Completion time differences: {[f'{d:.2f}s' for d in time_diffs]}")
print(f" Average difference: {avg_diff:.2f}s")
# In parallel mode, some tasks complete close together
# In sequential mode, they're evenly spaced (avg ~2-3s apart)
if avg_diff < duration / len(urls):
print(f" ✅ PARALLEL: Tasks completed with overlapping execution")
else:
print(f" ⚠️ SEQUENTIAL: Tasks completed one after another")
return duration
async def test_multiple_arun_calls():
"""Test multiple individual arun() calls in parallel"""
print_section("TEST 4: Multiple arun() Calls with asyncio.gather")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Running {len(urls)} arun() calls with asyncio.gather()...")
print("Expected: True parallel execution\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
tasks = [crawler.arun(url, config=config) for url in urls]
results = await asyncio.gather(*tasks)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" This proves the async LLM extraction works correctly")
return duration
async def main():
print("\n" + "🚀" * 40)
print("ISSUE #1055 FIX VERIFICATION")
print("Testing: Sequential → Parallel LLM Extraction")
print("🚀" * 40)
# Run tests
await test_without_llm()
await test_with_llm_before_fix()
time_with_llm = await test_with_llm_after_fix()
time_gather = await test_multiple_arun_calls()
# Final summary
print_section("FINAL VERDICT")
print("✅ Fix Verified!")
print("\nWhat changed:")
print(" • Created aperform_completion_with_backoff() using litellm.acompletion")
print(" • Added arun() method to ExtractionStrategy base class")
print(" • Implemented parallel arun() in LLMExtractionStrategy")
print(" • Updated AsyncWebCrawler to use arun() when available")
print("\nResult:")
print(" • LLM extraction now runs in parallel across multiple URLs")
print(" • Backward compatible - existing strategies still work")
print(" • No breaking changes to the API")
print("\n✨ Issue #1055 is RESOLVED!")
print("\n" + "=" * 80 + "\n")
if __name__ == "__main__":
asyncio.run(main())