feat: add undetected browser support with adapter pattern

This commit is contained in:
unclecode
2025-07-14 17:29:50 +08:00
parent 7b80eb6b99
commit 5c33cbcca2
5 changed files with 2495 additions and 5 deletions

File diff suppressed because it is too large Load Diff

293
crawl4ai/browser_adapter.py Normal file
View File

@@ -0,0 +1,293 @@
# browser_adapter.py
"""
Browser adapter for Crawl4AI to support both Playwright and undetected browsers
with minimal changes to existing codebase.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
import time
import json
# Import both, but use conditionally
try:
from playwright.async_api import Page
except ImportError:
Page = Any
try:
from patchright.async_api import Page as UndetectedPage
except ImportError:
UndetectedPage = Any
class BrowserAdapter(ABC):
"""Abstract adapter for browser-specific operations"""
@abstractmethod
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Execute JavaScript in the page"""
pass
@abstractmethod
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console message capturing, returns handler function if needed"""
pass
@abstractmethod
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capturing, returns handler function if needed"""
pass
@abstractmethod
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Retrieve captured console messages (for undetected browsers)"""
pass
@abstractmethod
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up console event listeners"""
pass
@abstractmethod
def get_imports(self) -> tuple:
"""Get the appropriate imports for this adapter"""
pass
class PlaywrightAdapter(BrowserAdapter):
"""Adapter for standard Playwright"""
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Standard Playwright evaluate"""
if arg is not None:
return await page.evaluate(expression, arg)
return await page.evaluate(expression)
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using Playwright's event system"""
def handle_console_capture(msg):
try:
message_type = "unknown"
try:
message_type = msg.type
except:
pass
message_text = "unknown"
try:
message_text = msg.text
except:
pass
entry = {
"type": message_type,
"text": message_text,
"timestamp": time.time()
}
captured_console.append(entry)
except Exception as e:
captured_console.append({
"type": "console_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("console", handle_console_capture)
return handle_console_capture
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using Playwright's event system"""
def handle_pageerror_capture(err):
try:
error_message = "Unknown error"
try:
error_message = err.message
except:
pass
error_stack = ""
try:
error_stack = err.stack
except:
pass
captured_console.append({
"type": "error",
"text": error_message,
"stack": error_stack,
"timestamp": time.time()
})
except Exception as e:
captured_console.append({
"type": "pageerror_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("pageerror", handle_pageerror_capture)
return handle_pageerror_capture
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Not needed for Playwright - messages are captured via events"""
return []
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Remove event listeners"""
if handle_console:
page.remove_listener("console", handle_console)
if handle_error:
page.remove_listener("pageerror", handle_error)
def get_imports(self) -> tuple:
"""Return Playwright imports"""
from playwright.async_api import Page, Error
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError
class UndetectedAdapter(BrowserAdapter):
"""Adapter for undetected browser automation with stealth features"""
def __init__(self):
self._console_script_injected = {}
async def evaluate(self, page: UndetectedPage, expression: str, arg: Any = None) -> Any:
"""Undetected browser evaluate with isolated context"""
# For most evaluations, use isolated context for stealth
# Only use non-isolated when we need to access our injected console capture
isolated = not (
"__console" in expression or
"__captured" in expression or
"__error" in expression or
"window.__" in expression
)
if arg is not None:
return await page.evaluate(expression, arg, isolated_context=isolated)
return await page.evaluate(expression, isolated_context=isolated)
async def setup_console_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Initialize console capture
window.__capturedConsole = [];
window.__capturedErrors = [];
// Store original console methods
const originalConsole = {};
['log', 'info', 'warn', 'error', 'debug'].forEach(method => {
originalConsole[method] = console[method];
console[method] = function(...args) {
try {
window.__capturedConsole.push({
type: method,
text: args.map(arg => {
try {
if (typeof arg === 'object') {
return JSON.stringify(arg);
}
return String(arg);
} catch (e) {
return '[Object]';
}
}).join(' '),
timestamp: Date.now()
});
} catch (e) {
// Fail silently to avoid detection
}
// Call original method
originalConsole[method].apply(console, args);
};
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def setup_error_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Capture errors
window.addEventListener('error', (event) => {
try {
window.__capturedErrors.push({
type: 'error',
text: event.message,
stack: event.error ? event.error.stack : '',
filename: event.filename,
lineno: event.lineno,
colno: event.colno,
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
// Capture unhandled promise rejections
window.addEventListener('unhandledrejection', (event) => {
try {
window.__capturedErrors.push({
type: 'unhandledrejection',
text: event.reason ? String(event.reason) : 'Unhandled Promise Rejection',
stack: event.reason && event.reason.stack ? event.reason.stack : '',
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def retrieve_console_messages(self, page: UndetectedPage) -> List[Dict]:
"""Retrieve captured console messages and errors from the page"""
messages = []
try:
# Get console messages
console_messages = await page.evaluate(
"() => { const msgs = window.__capturedConsole || []; window.__capturedConsole = []; return msgs; }",
isolated_context=False
)
messages.extend(console_messages)
# Get errors
errors = await page.evaluate(
"() => { const errs = window.__capturedErrors || []; window.__capturedErrors = []; return errs; }",
isolated_context=False
)
messages.extend(errors)
# Convert timestamps from JS to Python format
for msg in messages:
if 'timestamp' in msg and isinstance(msg['timestamp'], (int, float)):
msg['timestamp'] = msg['timestamp'] / 1000.0 # Convert from ms to seconds
except Exception:
# If retrieval fails, return empty list
pass
return messages
async def cleanup_console_capture(self, page: UndetectedPage, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up for undetected browser - retrieve final messages"""
# For undetected browser, we don't have event listeners to remove
# but we should retrieve any final messages
final_messages = await self.retrieve_console_messages(page)
return final_messages
def get_imports(self) -> tuple:
"""Return undetected browser imports"""
from patchright.async_api import Page, Error
from patchright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError

View File

@@ -588,21 +588,26 @@ class BrowserManager:
_playwright_instance = None
@classmethod
async def get_playwright(cls):
from playwright.async_api import async_playwright
async def get_playwright(cls, use_undetected: bool = False):
if use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
cls._playwright_instance = await async_playwright().start()
return cls._playwright_instance
def __init__(self, browser_config: BrowserConfig, logger=None):
def __init__(self, browser_config: BrowserConfig, logger=None, use_undetected: bool = False):
"""
Initialize the BrowserManager with a browser configuration.
Args:
browser_config (BrowserConfig): Configuration object containing all browser settings
logger: Logger instance for recording events and errors
use_undetected (bool): Whether to use undetected browser (Patchright)
"""
self.config: BrowserConfig = browser_config
self.logger = logger
self.use_undetected = use_undetected
# Browser state
self.browser = None
@@ -645,7 +650,10 @@ class BrowserManager:
if self.playwright is not None:
await self.close()
from playwright.async_api import async_playwright
if self.use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
self.playwright = await async_playwright().start()

View File

@@ -1056,7 +1056,7 @@ Your output must:
</output_requirements>
"""
GENERATE_SCRIPT_PROMPT = """You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
GENERATE_SCRIPT_PROMPT = r"""You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
Your scripts run **before the crawl** to handle dynamic content, user interactions, and other obstacles. You are a master of two tools: raw **JavaScript** and the high-level **Crawl4ai Script (c4a)**.

View File

@@ -0,0 +1,58 @@
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
DefaultMarkdownGenerator,
PruningContentFilter,
CrawlResult
)
# Import the custom strategy and adapter from the _ud file
from crawl4ai.async_crawler_strategy_ud import AsyncPlaywrightCrawlerStrategy
from crawl4ai.browser_adapter import UndetectedAdapter
async def main():
# Create browser config
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create the crawler strategy with the undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Configure the crawl
crawler_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter()
),
capture_console_messages=True, # Enable console capture to test adapter
)
# Test on a site that typically detects bots
print("Testing undetected adapter...")
result: CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config
)
print(f"Status: {result.status_code}")
print(f"Success: {result.success}")
print(f"Console messages captured: {len(result.console_messages or [])}")
print(f"Markdown content (first 500 chars):\n{result.markdown.raw_markdown[:500]}")
if __name__ == "__main__":
asyncio.run(main())