Compare commits

..

5 Commits

Author SHA1 Message Date
AHMET YILMAZ
0541b61405 feat(browser-profiler): implement cross-platform keyboard listeners and improve quit handling 2025-08-08 11:18:34 +08:00
Nasrin
6735c68288 Merge pull request #1170 from prokopis3/fix/create-profile
fix(browser_profiler): cross-platform 'q' to quit - create profile
2025-08-06 16:29:14 +08:00
prokopis3
c4d625fb3c chore(profile-test): fix filename typo ( test_crteate_profile.py → test_create_profile.py )
- Rename file to correct spelling
- No content changes
2025-06-12 14:38:32 +03:00
prokopis3
ef722766f0 fix(browser_profiler): improve keyboard input handling
- fix handling of special keys in Windows msvcrt implementation
- Guard against UnicodeDecodeError from multi-byte key sequences
- Filter out non-printable characters and control sequences
- Add error handling to prevent coroutine crashes
- Add unit test to verify keyboard input handling

Key changes:
- Safe UTF-8 decoding with try/except for special keys
- Skip non-printable and multi-byte character sequences
- Add broad exception handling in keyboard listener

Test runs on Windows only due to msvcrt dependency.
2025-06-12 14:33:12 +03:00
prokopis3
4bcb7171a3 fix(browser_profiler): cross-platform 'q' to quit
This commit introduces platform-specific handling for the 'q' key press to quit the browser profiler, ensuring compatibility with both Windows and Unix-like systems. It also adds a check to see if the browser process has already exited, terminating the input listener if so.

- Implemented `msvcrt` for Windows to capture keyboard input without requiring a newline.
- Retained `termios`, `tty`, and `select` for Unix-like systems.
- Added a check for browser process termination to gracefully exit the input listener.
- Updated logger messages to use colored output for better user experience.
2025-05-30 14:43:18 +03:00
45 changed files with 813 additions and 8039 deletions

View File

@@ -618,16 +618,16 @@ Read the full details in our [0.7.0 Release Notes](https://docs.crawl4ai.com/blo
# Process results
raw_df = pd.DataFrame()
for result in results:
if result.success and result.tables:
if result.success and result.media["tables"]:
raw_df = pd.DataFrame(
result.tables[0]["rows"],
columns=result.tables[0]["headers"],
result.media["tables"][0]["rows"],
columns=result.media["tables"][0]["headers"],
)
break
print(raw_df.head())
finally:
await crawler.close()
await crawler.stop()
```
- **🚀 Browser Pooling**: Pages launch hot with pre-warmed browser instances for lower latency and memory usage

View File

@@ -88,13 +88,6 @@ from .script import (
ErrorDetail
)
# Browser Adapters
from .browser_adapter import (
BrowserAdapter,
PlaywrightAdapter,
UndetectedAdapter
)
from .utils import (
start_colab_display_server,
setup_colab_environment
@@ -181,10 +174,6 @@ __all__ = [
"CompilationResult",
"ValidationResult",
"ErrorDetail",
# Browser Adapters
"BrowserAdapter",
"PlaywrightAdapter",
"UndetectedAdapter",
"LinkPreviewConfig"
]

View File

@@ -390,8 +390,6 @@ class BrowserConfig:
light_mode (bool): Disables certain background features for performance gains. Default: False.
extra_args (list): Additional command-line arguments passed to the browser.
Default: [].
enable_stealth (bool): If True, applies playwright-stealth to bypass basic bot detection.
Cannot be used with use_undetected browser mode. Default: False.
"""
def __init__(
@@ -432,7 +430,6 @@ class BrowserConfig:
extra_args: list = None,
debugging_port: int = 9222,
host: str = "localhost",
enable_stealth: bool = False,
):
self.browser_type = browser_type
self.headless = headless
@@ -473,7 +470,6 @@ class BrowserConfig:
self.verbose = verbose
self.debugging_port = debugging_port
self.host = host
self.enable_stealth = enable_stealth
fa_user_agenr_generator = ValidUAGenerator()
if self.user_agent_mode == "random":
@@ -505,13 +501,6 @@ class BrowserConfig:
# If persistent context is requested, ensure managed browser is enabled
if self.use_persistent_context:
self.use_managed_browser = True
# Validate stealth configuration
if self.enable_stealth and self.use_managed_browser and self.browser_mode == "builtin":
raise ValueError(
"enable_stealth cannot be used with browser_mode='builtin'. "
"Stealth mode requires a dedicated browser instance."
)
@staticmethod
def from_kwargs(kwargs: dict) -> "BrowserConfig":
@@ -548,7 +537,6 @@ class BrowserConfig:
extra_args=kwargs.get("extra_args", []),
debugging_port=kwargs.get("debugging_port", 9222),
host=kwargs.get("host", "localhost"),
enable_stealth=kwargs.get("enable_stealth", False),
)
def to_dict(self):
@@ -583,7 +571,6 @@ class BrowserConfig:
"verbose": self.verbose,
"debugging_port": self.debugging_port,
"host": self.host,
"enable_stealth": self.enable_stealth,
}

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,6 @@ from .async_logger import AsyncLogger
from .ssl_certificate import SSLCertificate
from .user_agent_generator import ValidUAGenerator
from .browser_manager import BrowserManager
from .browser_adapter import BrowserAdapter, PlaywrightAdapter, UndetectedAdapter
import aiofiles
import aiohttp
@@ -72,7 +71,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
def __init__(
self, browser_config: BrowserConfig = None, logger: AsyncLogger = None, browser_adapter: BrowserAdapter = None, **kwargs
self, browser_config: BrowserConfig = None, logger: AsyncLogger = None, **kwargs
):
"""
Initialize the AsyncPlaywrightCrawlerStrategy with a browser configuration.
@@ -81,16 +80,11 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
browser_config (BrowserConfig): Configuration object containing browser settings.
If None, will be created from kwargs for backwards compatibility.
logger: Logger instance for recording events and errors.
browser_adapter (BrowserAdapter): Browser adapter for handling browser-specific operations.
If None, defaults to PlaywrightAdapter.
**kwargs: Additional arguments for backwards compatibility and extending functionality.
"""
# Initialize browser config, either from provided object or kwargs
self.browser_config = browser_config or BrowserConfig.from_kwargs(kwargs)
self.logger = logger
# Initialize browser adapter
self.adapter = browser_adapter or PlaywrightAdapter()
# Initialize session management
self._downloaded_files = []
@@ -110,9 +104,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Initialize browser manager with config
self.browser_manager = BrowserManager(
browser_config=self.browser_config,
logger=self.logger,
use_undetected=isinstance(self.adapter, UndetectedAdapter)
browser_config=self.browser_config, logger=self.logger
)
async def __aenter__(self):
@@ -330,7 +322,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
try:
result = await self.adapter.evaluate(page, wrapper_js)
result = await page.evaluate(wrapper_js)
return result
except Exception as e:
if "Error evaluating condition" in str(e):
@@ -375,7 +367,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Replace the iframe with a div containing the extracted content
_iframe = iframe_content.replace("`", "\\`")
await self.adapter.evaluate(page,
await page.evaluate(
f"""
() => {{
const iframe = document.getElementById('iframe-{i}');
@@ -636,16 +628,91 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
page.on("requestfailed", handle_request_failed_capture)
# Console Message Capturing
handle_console = None
handle_error = None
if config.capture_console_messages:
# Set up console capture using adapter
handle_console = await self.adapter.setup_console_capture(page, captured_console)
handle_error = await self.adapter.setup_error_capture(page, captured_console)
def handle_console_capture(msg):
try:
message_type = "unknown"
try:
message_type = msg.type
except:
pass
message_text = "unknown"
try:
message_text = msg.text
except:
pass
# Basic console message with minimal content
entry = {
"type": message_type,
"text": message_text,
"timestamp": time.time()
}
captured_console.append(entry)
except Exception as e:
if self.logger:
self.logger.warning(f"Error capturing console message: {e}", tag="CAPTURE")
# Still add something to the list even on error
captured_console.append({
"type": "console_capture_error",
"error": str(e),
"timestamp": time.time()
})
def handle_pageerror_capture(err):
try:
error_message = "Unknown error"
try:
error_message = err.message
except:
pass
error_stack = ""
try:
error_stack = err.stack
except:
pass
captured_console.append({
"type": "error",
"text": error_message,
"stack": error_stack,
"timestamp": time.time()
})
except Exception as e:
if self.logger:
self.logger.warning(f"Error capturing page error: {e}", tag="CAPTURE")
captured_console.append({
"type": "pageerror_capture_error",
"error": str(e),
"timestamp": time.time()
})
# Add event listeners directly
page.on("console", handle_console_capture)
page.on("pageerror", handle_pageerror_capture)
# Set up console logging if requested
# Note: For undetected browsers, console logging won't work directly
# but captured messages can still be logged after retrieval
if config.log_console:
def log_consol(
msg, console_log_type="debug"
): # Corrected the parameter syntax
if console_log_type == "error":
self.logger.error(
message=f"Console error: {msg}", # Use f-string for variable interpolation
tag="CONSOLE"
)
elif console_log_type == "debug":
self.logger.debug(
message=f"Console: {msg}", # Use f-string for variable interpolation
tag="CONSOLE"
)
page.on("console", log_consol)
page.on("pageerror", lambda e: log_consol(e, "error"))
try:
# Get SSL certificate information if requested and URL is HTTPS
@@ -931,7 +998,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await page.wait_for_load_state("domcontentloaded", timeout=5)
except PlaywrightTimeoutError:
pass
await self.adapter.evaluate(page, update_image_dimensions_js)
await page.evaluate(update_image_dimensions_js)
except Exception as e:
self.logger.error(
message="Error updating image dimensions: {error}",
@@ -960,7 +1027,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
for selector in selectors:
try:
content = await self.adapter.evaluate(page,
content = await page.evaluate(
f"""Array.from(document.querySelectorAll("{selector}"))
.map(el => el.outerHTML)
.join('')"""
@@ -1018,11 +1085,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await asyncio.sleep(delay)
return await page.content()
# For undetected browsers, retrieve console messages before returning
if config.capture_console_messages and hasattr(self.adapter, 'retrieve_console_messages'):
final_messages = await self.adapter.retrieve_console_messages(page)
captured_console.extend(final_messages)
# Return complete response
return AsyncCrawlResponse(
html=html,
@@ -1061,13 +1123,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
page.remove_listener("response", handle_response_capture)
page.remove_listener("requestfailed", handle_request_failed_capture)
if config.capture_console_messages:
# Retrieve any final console messages for undetected browsers
if hasattr(self.adapter, 'retrieve_console_messages'):
final_messages = await self.adapter.retrieve_console_messages(page)
captured_console.extend(final_messages)
# Clean up console capture
await self.adapter.cleanup_console_capture(page, handle_console, handle_error)
page.remove_listener("console", handle_console_capture)
page.remove_listener("pageerror", handle_pageerror_capture)
# Close the page
await page.close()
@@ -1297,7 +1354,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
# Execute virtual scroll capture
result = await self.adapter.evaluate(page, virtual_scroll_js, config.to_dict())
result = await page.evaluate(virtual_scroll_js, config.to_dict())
if result.get("replaced", False):
self.logger.success(
@@ -1381,7 +1438,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
remove_overlays_js = load_js_script("remove_overlay_elements")
try:
await self.adapter.evaluate(page,
await page.evaluate(
f"""
(() => {{
try {{
@@ -1786,7 +1843,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.
# """
result = await self.adapter.evaluate(page,
result = await page.evaluate(
f"""
(async () => {{
try {{
@@ -1908,7 +1965,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
for script in scripts:
try:
# Execute the script and wait for network idle
result = await self.adapter.evaluate(page,
result = await page.evaluate(
f"""
(() => {{
return new Promise((resolve) => {{
@@ -1992,7 +2049,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Returns:
Boolean indicating visibility
"""
return await self.adapter.evaluate(page,
return await page.evaluate(
"""
() => {
const element = document.body;
@@ -2033,7 +2090,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Dict containing scroll status and position information
"""
try:
result = await self.adapter.evaluate(page,
result = await page.evaluate(
f"""() => {{
try {{
const startX = window.scrollX;
@@ -2090,7 +2147,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Returns:
Dict containing width and height of the page
"""
return await self.adapter.evaluate(page,
return await page.evaluate(
"""
() => {
const {scrollWidth, scrollHeight} = document.documentElement;
@@ -2110,7 +2167,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
bool: True if page needs scrolling
"""
try:
need_scroll = await self.adapter.evaluate(page,
need_scroll = await page.evaluate(
"""
() => {
const scrollHeight = document.documentElement.scrollHeight;
@@ -2129,3 +2186,265 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return True # Default to scrolling if check fails
####################################################################################################
# HTTP Crawler Strategy
####################################################################################################
class HTTPCrawlerError(Exception):
"""Base error class for HTTP crawler specific exceptions"""
pass
class ConnectionTimeoutError(HTTPCrawlerError):
"""Raised when connection timeout occurs"""
pass
class HTTPStatusError(HTTPCrawlerError):
"""Raised for unexpected status codes"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
super().__init__(f"HTTP {status_code}: {message}")
class AsyncHTTPCrawlerStrategy(AsyncCrawlerStrategy):
"""
Fast, lightweight HTTP-only crawler strategy optimized for memory efficiency.
"""
__slots__ = ('logger', 'max_connections', 'dns_cache_ttl', 'chunk_size', '_session', 'hooks', 'browser_config')
DEFAULT_TIMEOUT: Final[int] = 30
DEFAULT_CHUNK_SIZE: Final[int] = 64 * 1024
DEFAULT_MAX_CONNECTIONS: Final[int] = min(32, (os.cpu_count() or 1) * 4)
DEFAULT_DNS_CACHE_TTL: Final[int] = 300
VALID_SCHEMES: Final = frozenset({'http', 'https', 'file', 'raw'})
_BASE_HEADERS: Final = MappingProxyType({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def __init__(
self,
browser_config: Optional[HTTPCrawlerConfig] = None,
logger: Optional[AsyncLogger] = None,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
dns_cache_ttl: int = DEFAULT_DNS_CACHE_TTL,
chunk_size: int = DEFAULT_CHUNK_SIZE
):
"""Initialize the HTTP crawler with config"""
self.browser_config = browser_config or HTTPCrawlerConfig()
self.logger = logger
self.max_connections = max_connections
self.dns_cache_ttl = dns_cache_ttl
self.chunk_size = chunk_size
self._session: Optional[aiohttp.ClientSession] = None
self.hooks = {
k: partial(self._execute_hook, k)
for k in ('before_request', 'after_request', 'on_error')
}
# Set default hooks
self.set_hook('before_request', lambda *args, **kwargs: None)
self.set_hook('after_request', lambda *args, **kwargs: None)
self.set_hook('on_error', lambda *args, **kwargs: None)
async def __aenter__(self) -> AsyncHTTPCrawlerStrategy:
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@contextlib.asynccontextmanager
async def _session_context(self):
try:
if not self._session:
await self.start()
yield self._session
finally:
pass
def set_hook(self, hook_type: str, hook_func: Callable) -> None:
if hook_type in self.hooks:
self.hooks[hook_type] = partial(self._execute_hook, hook_type, hook_func)
else:
raise ValueError(f"Invalid hook type: {hook_type}")
async def _execute_hook(
self,
hook_type: str,
hook_func: Callable,
*args: Any,
**kwargs: Any
) -> Any:
if asyncio.iscoroutinefunction(hook_func):
return await hook_func(*args, **kwargs)
return hook_func(*args, **kwargs)
async def start(self) -> None:
if not self._session:
connector = aiohttp.TCPConnector(
limit=self.max_connections,
ttl_dns_cache=self.dns_cache_ttl,
use_dns_cache=True,
force_close=False
)
self._session = aiohttp.ClientSession(
headers=dict(self._BASE_HEADERS),
connector=connector,
timeout=ClientTimeout(total=self.DEFAULT_TIMEOUT)
)
async def close(self) -> None:
if self._session and not self._session.closed:
try:
await asyncio.wait_for(self._session.close(), timeout=5.0)
except asyncio.TimeoutError:
if self.logger:
self.logger.warning(
message="Session cleanup timed out",
tag="CLEANUP"
)
finally:
self._session = None
async def _stream_file(self, path: str) -> AsyncGenerator[memoryview, None]:
async with aiofiles.open(path, mode='rb') as f:
while chunk := await f.read(self.chunk_size):
yield memoryview(chunk)
async def _handle_file(self, path: str) -> AsyncCrawlResponse:
if not os.path.exists(path):
raise FileNotFoundError(f"Local file not found: {path}")
chunks = []
async for chunk in self._stream_file(path):
chunks.append(chunk.tobytes().decode('utf-8', errors='replace'))
return AsyncCrawlResponse(
html=''.join(chunks),
response_headers={},
status_code=200
)
async def _handle_raw(self, content: str) -> AsyncCrawlResponse:
return AsyncCrawlResponse(
html=content,
response_headers={},
status_code=200
)
async def _handle_http(
self,
url: str,
config: CrawlerRunConfig
) -> AsyncCrawlResponse:
async with self._session_context() as session:
timeout = ClientTimeout(
total=config.page_timeout or self.DEFAULT_TIMEOUT,
connect=10,
sock_read=30
)
headers = dict(self._BASE_HEADERS)
if self.browser_config.headers:
headers.update(self.browser_config.headers)
request_kwargs = {
'timeout': timeout,
'allow_redirects': self.browser_config.follow_redirects,
'ssl': self.browser_config.verify_ssl,
'headers': headers
}
if self.browser_config.method == "POST":
if self.browser_config.data:
request_kwargs['data'] = self.browser_config.data
if self.browser_config.json:
request_kwargs['json'] = self.browser_config.json
await self.hooks['before_request'](url, request_kwargs)
try:
async with session.request(self.browser_config.method, url, **request_kwargs) as response:
content = memoryview(await response.read())
if not (200 <= response.status < 300):
raise HTTPStatusError(
response.status,
f"Unexpected status code for {url}"
)
encoding = response.charset
if not encoding:
encoding = chardet.detect(content.tobytes())['encoding'] or 'utf-8'
result = AsyncCrawlResponse(
html=content.tobytes().decode(encoding, errors='replace'),
response_headers=dict(response.headers),
status_code=response.status,
redirected_url=str(response.url)
)
await self.hooks['after_request'](result)
return result
except aiohttp.ServerTimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except aiohttp.ClientConnectorError as e:
await self.hooks['on_error'](e)
raise ConnectionError(f"Connection failed: {str(e)}")
except aiohttp.ClientError as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP client error: {str(e)}")
except asyncio.exceptions.TimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except Exception as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP request failed: {str(e)}")
async def crawl(
self,
url: str,
config: Optional[CrawlerRunConfig] = None,
**kwargs
) -> AsyncCrawlResponse:
config = config or CrawlerRunConfig.from_kwargs(kwargs)
parsed = urlparse(url)
scheme = parsed.scheme.rstrip('/')
if scheme not in self.VALID_SCHEMES:
raise ValueError(f"Unsupported URL scheme: {scheme}")
try:
if scheme == 'file':
return await self._handle_file(parsed.path)
elif scheme == 'raw':
return await self._handle_raw(parsed.path)
else: # http or https
return await self._handle_http(url, config)
except Exception as e:
if self.logger:
self.logger.error(
message="Crawl failed: {error}",
tag="CRAWL",
params={"error": str(e), "url": url}
)
raise

View File

@@ -1,293 +0,0 @@
# browser_adapter.py
"""
Browser adapter for Crawl4AI to support both Playwright and undetected browsers
with minimal changes to existing codebase.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
import time
import json
# Import both, but use conditionally
try:
from playwright.async_api import Page
except ImportError:
Page = Any
try:
from patchright.async_api import Page as UndetectedPage
except ImportError:
UndetectedPage = Any
class BrowserAdapter(ABC):
"""Abstract adapter for browser-specific operations"""
@abstractmethod
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Execute JavaScript in the page"""
pass
@abstractmethod
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console message capturing, returns handler function if needed"""
pass
@abstractmethod
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capturing, returns handler function if needed"""
pass
@abstractmethod
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Retrieve captured console messages (for undetected browsers)"""
pass
@abstractmethod
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up console event listeners"""
pass
@abstractmethod
def get_imports(self) -> tuple:
"""Get the appropriate imports for this adapter"""
pass
class PlaywrightAdapter(BrowserAdapter):
"""Adapter for standard Playwright"""
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Standard Playwright evaluate"""
if arg is not None:
return await page.evaluate(expression, arg)
return await page.evaluate(expression)
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using Playwright's event system"""
def handle_console_capture(msg):
try:
message_type = "unknown"
try:
message_type = msg.type
except:
pass
message_text = "unknown"
try:
message_text = msg.text
except:
pass
entry = {
"type": message_type,
"text": message_text,
"timestamp": time.time()
}
captured_console.append(entry)
except Exception as e:
captured_console.append({
"type": "console_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("console", handle_console_capture)
return handle_console_capture
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using Playwright's event system"""
def handle_pageerror_capture(err):
try:
error_message = "Unknown error"
try:
error_message = err.message
except:
pass
error_stack = ""
try:
error_stack = err.stack
except:
pass
captured_console.append({
"type": "error",
"text": error_message,
"stack": error_stack,
"timestamp": time.time()
})
except Exception as e:
captured_console.append({
"type": "pageerror_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("pageerror", handle_pageerror_capture)
return handle_pageerror_capture
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Not needed for Playwright - messages are captured via events"""
return []
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Remove event listeners"""
if handle_console:
page.remove_listener("console", handle_console)
if handle_error:
page.remove_listener("pageerror", handle_error)
def get_imports(self) -> tuple:
"""Return Playwright imports"""
from playwright.async_api import Page, Error
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError
class UndetectedAdapter(BrowserAdapter):
"""Adapter for undetected browser automation with stealth features"""
def __init__(self):
self._console_script_injected = {}
async def evaluate(self, page: UndetectedPage, expression: str, arg: Any = None) -> Any:
"""Undetected browser evaluate with isolated context"""
# For most evaluations, use isolated context for stealth
# Only use non-isolated when we need to access our injected console capture
isolated = not (
"__console" in expression or
"__captured" in expression or
"__error" in expression or
"window.__" in expression
)
if arg is not None:
return await page.evaluate(expression, arg, isolated_context=isolated)
return await page.evaluate(expression, isolated_context=isolated)
async def setup_console_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Initialize console capture
window.__capturedConsole = [];
window.__capturedErrors = [];
// Store original console methods
const originalConsole = {};
['log', 'info', 'warn', 'error', 'debug'].forEach(method => {
originalConsole[method] = console[method];
console[method] = function(...args) {
try {
window.__capturedConsole.push({
type: method,
text: args.map(arg => {
try {
if (typeof arg === 'object') {
return JSON.stringify(arg);
}
return String(arg);
} catch (e) {
return '[Object]';
}
}).join(' '),
timestamp: Date.now()
});
} catch (e) {
// Fail silently to avoid detection
}
// Call original method
originalConsole[method].apply(console, args);
};
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def setup_error_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Capture errors
window.addEventListener('error', (event) => {
try {
window.__capturedErrors.push({
type: 'error',
text: event.message,
stack: event.error ? event.error.stack : '',
filename: event.filename,
lineno: event.lineno,
colno: event.colno,
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
// Capture unhandled promise rejections
window.addEventListener('unhandledrejection', (event) => {
try {
window.__capturedErrors.push({
type: 'unhandledrejection',
text: event.reason ? String(event.reason) : 'Unhandled Promise Rejection',
stack: event.reason && event.reason.stack ? event.reason.stack : '',
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def retrieve_console_messages(self, page: UndetectedPage) -> List[Dict]:
"""Retrieve captured console messages and errors from the page"""
messages = []
try:
# Get console messages
console_messages = await page.evaluate(
"() => { const msgs = window.__capturedConsole || []; window.__capturedConsole = []; return msgs; }",
isolated_context=False
)
messages.extend(console_messages)
# Get errors
errors = await page.evaluate(
"() => { const errs = window.__capturedErrors || []; window.__capturedErrors = []; return errs; }",
isolated_context=False
)
messages.extend(errors)
# Convert timestamps from JS to Python format
for msg in messages:
if 'timestamp' in msg and isinstance(msg['timestamp'], (int, float)):
msg['timestamp'] = msg['timestamp'] / 1000.0 # Convert from ms to seconds
except Exception:
# If retrieval fails, return empty list
pass
return messages
async def cleanup_console_capture(self, page: UndetectedPage, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up for undetected browser - retrieve final messages"""
# For undetected browser, we don't have event listeners to remove
# but we should retrieve any final messages
final_messages = await self.retrieve_console_messages(page)
return final_messages
def get_imports(self) -> tuple:
"""Return undetected browser imports"""
from patchright.async_api import Page, Error
from patchright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError

View File

@@ -573,26 +573,21 @@ class BrowserManager:
_playwright_instance = None
@classmethod
async def get_playwright(cls, use_undetected: bool = False):
if use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
async def get_playwright(cls):
from playwright.async_api import async_playwright
cls._playwright_instance = await async_playwright().start()
return cls._playwright_instance
def __init__(self, browser_config: BrowserConfig, logger=None, use_undetected: bool = False):
def __init__(self, browser_config: BrowserConfig, logger=None):
"""
Initialize the BrowserManager with a browser configuration.
Args:
browser_config (BrowserConfig): Configuration object containing all browser settings
logger: Logger instance for recording events and errors
use_undetected (bool): Whether to use undetected browser (Patchright)
"""
self.config: BrowserConfig = browser_config
self.logger = logger
self.use_undetected = use_undetected
# Browser state
self.browser = None
@@ -606,11 +601,7 @@ class BrowserManager:
# Keep track of contexts by a "config signature," so each unique config reuses a single context
self.contexts_by_config = {}
self._contexts_lock = asyncio.Lock()
# Stealth-related attributes
self._stealth_instance = None
self._stealth_cm = None
self._contexts_lock = asyncio.Lock()
# Initialize ManagedBrowser if needed
if self.config.use_managed_browser:
@@ -639,21 +630,9 @@ class BrowserManager:
if self.playwright is not None:
await self.close()
if self.use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
from playwright.async_api import async_playwright
# Initialize playwright with or without stealth
if self.config.enable_stealth and not self.use_undetected:
# Import stealth only when needed
from playwright_stealth import Stealth
# Use the recommended stealth wrapper approach
self._stealth_instance = Stealth()
self._stealth_cm = self._stealth_instance.use_async(async_playwright())
self.playwright = await self._stealth_cm.__aenter__()
else:
self.playwright = await async_playwright().start()
self.playwright = await async_playwright().start()
if self.config.cdp_url or self.config.use_managed_browser:
self.config.use_managed_browser = True
@@ -1115,19 +1094,5 @@ class BrowserManager:
self.managed_browser = None
if self.playwright:
# Handle stealth context manager cleanup if it exists
if hasattr(self, '_stealth_cm') and self._stealth_cm is not None:
try:
await self._stealth_cm.__aexit__(None, None, None)
except Exception as e:
if self.logger:
self.logger.error(
message="Error closing stealth context: {error}",
tag="ERROR",
params={"error": str(e)}
)
self._stealth_cm = None
self._stealth_instance = None
else:
await self.playwright.stop()
await self.playwright.stop()
self.playwright = None

View File

@@ -65,6 +65,213 @@ class BrowserProfiler:
self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json")
os.makedirs(self.builtin_browser_dir, exist_ok=True)
def _is_windows(self) -> bool:
"""Check if running on Windows platform."""
return sys.platform.startswith('win') or sys.platform == 'cygwin'
def _is_macos(self) -> bool:
"""Check if running on macOS platform."""
return sys.platform == 'darwin'
def _is_linux(self) -> bool:
"""Check if running on Linux platform."""
return sys.platform.startswith('linux')
def _get_quit_message(self, tag: str) -> str:
"""Get appropriate quit message based on context."""
if tag == "PROFILE":
return "Closing browser and saving profile..."
elif tag == "CDP":
return "Closing browser..."
else:
return "Closing browser..."
async def _listen_windows(self, user_done_event, check_browser_process, tag: str):
"""Windows-specific keyboard listener using msvcrt."""
try:
import msvcrt
except ImportError:
raise ImportError("msvcrt module not available on this platform")
while True:
try:
# Check for keyboard input
if msvcrt.kbhit():
raw = msvcrt.getch()
# Handle Unicode decoding more robustly
key = None
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
try:
# Try different encodings
key = raw.decode("latin1")
except UnicodeDecodeError:
# Skip if we can't decode
continue
# Validate key
if not key or len(key) != 1:
continue
# Check for printable characters only
if not key.isprintable():
continue
# Check for quit command
if key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except Exception as e:
self.logger.warning(f"Error in Windows keyboard listener: {e}", tag=tag)
# Continue trying instead of failing completely
await asyncio.sleep(0.1)
continue
async def _listen_unix(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Unix/Linux/macOS keyboard listener using termios and select."""
try:
import termios
import tty
import select
except ImportError:
raise ImportError("termios/tty/select modules not available on this platform")
# Get stdin file descriptor
try:
fd = sys.stdin.fileno()
except (AttributeError, OSError):
raise ImportError("stdin is not a terminal")
# Save original terminal settings
old_settings = None
try:
old_settings = termios.tcgetattr(fd)
except termios.error as e:
raise ImportError(f"Cannot get terminal attributes: {e}")
try:
# Switch to non-canonical mode (cbreak mode)
tty.setcbreak(fd)
while True:
try:
# Use select to check if input is available (non-blocking)
# Timeout of 0.5 seconds to periodically check browser process
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
# Read one character
key = sys.stdin.read(1)
if key and key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except (KeyboardInterrupt, EOFError):
# Handle Ctrl+C or EOF gracefully
self.logger.info("Keyboard interrupt received", tag=tag)
user_done_event.set()
return
except Exception as e:
self.logger.warning(f"Error in Unix keyboard listener: {e}", tag=tag)
await asyncio.sleep(0.1)
continue
finally:
# Always restore terminal settings
if old_settings is not None:
try:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except Exception as e:
self.logger.error(f"Failed to restore terminal settings: {e}", tag=tag)
async def _listen_fallback(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Fallback keyboard listener using simple input() method."""
self.logger.info("Using fallback input mode. Type 'q' and press Enter to quit.", tag=tag)
# Run input in a separate thread to avoid blocking
import threading
import queue
input_queue = queue.Queue()
def input_thread():
"""Thread function to handle input."""
try:
while not user_done_event.is_set():
try:
# Use input() with a prompt
user_input = input("Press 'q' + Enter to quit: ").strip().lower()
input_queue.put(user_input)
if user_input == 'q':
break
except (EOFError, KeyboardInterrupt):
input_queue.put('q')
break
except Exception as e:
self.logger.warning(f"Error in input thread: {e}", tag=tag)
break
except Exception as e:
self.logger.error(f"Input thread failed: {e}", tag=tag)
# Start input thread
thread = threading.Thread(target=input_thread, daemon=True)
thread.start()
try:
while not user_done_event.is_set():
# Check for user input
try:
user_input = input_queue.get_nowait()
if user_input == 'q':
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
except queue.Empty:
pass
# Check if browser process ended
if await check_browser_process():
return
# Small delay
await asyncio.sleep(0.5)
except Exception as e:
self.logger.error(f"Fallback listener failed: {e}", tag=tag)
user_done_event.set()
async def create_profile(self,
profile_name: Optional[str] = None,
browser_config: Optional[BrowserConfig] = None) -> Optional[str]:
@@ -180,42 +387,38 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
self.logger.info(
"Press {segment} when you've finished using the browser...",
tag="PROFILE",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
"""Check if browser process is still running."""
if (
managed_browser.browser_process
and managed_browser.browser_process.poll() is not None
):
self.logger.info(
"Browser already closed. Ending input listener.", tag="PROFILE"
)
user_done_event.set()
return True
return False
# Try platform-specific implementations with fallback
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN)
user_done_event.set()
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "PROFILE")
else:
await self._listen_unix(user_done_event, check_browser_process, "PROFILE")
except Exception as e:
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="PROFILE")
self.logger.info("Falling back to simple input mode...", tag="PROFILE")
await self._listen_fallback(user_done_event, check_browser_process, "PROFILE")
try:
from playwright.async_api import async_playwright
@@ -682,42 +885,33 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
self.logger.info(
"Press {segment} to stop the browser and exit...",
tag="CDP",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
"""Check if browser process is still running."""
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return True
return False
# Try platform-specific implementations with fallback
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "CDP")
else:
await self._listen_unix(user_done_event, check_browser_process, "CDP")
except Exception as e:
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="CDP")
self.logger.info("Falling back to simple input mode...", tag="CDP")
await self._listen_fallback(user_done_event, check_browser_process, "CDP")
# Function to retrieve and display CDP JSON config
async def get_cdp_json(port):

View File

@@ -119,32 +119,6 @@ def install_playwright():
logger.warning(
f"Please run '{sys.executable} -m playwright install --with-deps' manually after the installation."
)
# Install Patchright browsers for undetected browser support
logger.info("Installing Patchright browsers for undetected mode...", tag="INIT")
try:
subprocess.check_call(
[
sys.executable,
"-m",
"patchright",
"install",
"--with-deps",
"--force",
"chromium",
]
)
logger.success(
"Patchright installation completed successfully.", tag="COMPLETE"
)
except subprocess.CalledProcessError:
logger.warning(
f"Please run '{sys.executable} -m patchright install --with-deps' manually after the installation."
)
except Exception:
logger.warning(
f"Please run '{sys.executable} -m patchright install --with-deps' manually after the installation."
)
def run_migration():

View File

@@ -1056,7 +1056,7 @@ Your output must:
</output_requirements>
"""
GENERATE_SCRIPT_PROMPT = r"""You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
GENERATE_SCRIPT_PROMPT = """You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
Your scripts run **before the crawl** to handle dynamic content, user interactions, and other obstacles. You are a master of two tools: raw **JavaScript** and the high-level **Crawl4ai Script (c4a)**.

View File

@@ -419,15 +419,13 @@ async def handle_crawl_request(
urls: List[str],
browser_config: dict,
crawler_config: dict,
config: dict,
hooks_config: Optional[dict] = None
config: dict
) -> dict:
"""Handle non-streaming crawl requests with optional hooks."""
"""Handle non-streaming crawl requests."""
start_mem_mb = _get_memory_mb() # <--- Get memory before
start_time = time.time()
mem_delta_mb = None
peak_mem_mb = start_mem_mb
hook_manager = None
try:
urls = [('https://' + url) if not url.startswith(('http://', 'https://')) else url for url in urls]
@@ -447,19 +445,6 @@ async def handle_crawl_request(
# crawler: AsyncWebCrawler = AsyncWebCrawler(config=browser_config)
# await crawler.start()
# Attach hooks if provided
hooks_status = {}
if hooks_config:
from hook_manager import attach_user_hooks_to_crawler, UserHookManager
hook_manager = UserHookManager(timeout=hooks_config.get('timeout', 30))
hooks_status, hook_manager = await attach_user_hooks_to_crawler(
crawler,
hooks_config.get('code', {}),
timeout=hooks_config.get('timeout', 30),
hook_manager=hook_manager
)
logger.info(f"Hooks attachment status: {hooks_status['status']}")
base_config = config["crawler"]["base_config"]
# Iterate on key-value pairs in global_config then use haseattr to set them
for key, value in base_config.items():
@@ -473,10 +458,6 @@ async def handle_crawl_request(
config=crawler_config,
dispatcher=dispatcher)
results = await partial_func()
# Ensure results is always a list
if not isinstance(results, list):
results = [results]
# await crawler.close()
@@ -491,68 +472,19 @@ async def handle_crawl_request(
# Process results to handle PDF bytes
processed_results = []
for result in results:
try:
# Check if result has model_dump method (is a proper CrawlResult)
if hasattr(result, 'model_dump'):
result_dict = result.model_dump()
elif isinstance(result, dict):
result_dict = result
else:
# Handle unexpected result type
logger.warning(f"Unexpected result type: {type(result)}")
result_dict = {
"url": str(result) if hasattr(result, '__str__') else "unknown",
"success": False,
"error_message": f"Unexpected result type: {type(result).__name__}"
}
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None and isinstance(result_dict.get('pdf'), bytes):
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
processed_results.append(result_dict)
except Exception as e:
logger.error(f"Error processing result: {e}")
processed_results.append({
"url": "unknown",
"success": False,
"error_message": str(e)
})
result_dict = result.model_dump()
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
processed_results.append(result_dict)
response = {
return {
"success": True,
"results": processed_results,
"server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb
}
# Add hooks information if hooks were used
if hooks_config and hook_manager:
from hook_manager import UserHookManager
if isinstance(hook_manager, UserHookManager):
try:
# Ensure all hook data is JSON serializable
import json
hook_data = {
"status": hooks_status,
"execution_log": hook_manager.execution_log,
"errors": hook_manager.errors,
"summary": hook_manager.get_summary()
}
# Test that it's serializable
json.dumps(hook_data)
response["hooks"] = hook_data
except (TypeError, ValueError) as e:
logger.error(f"Hook data not JSON serializable: {e}")
response["hooks"] = {
"status": {"status": "error", "message": "Hook data serialization failed"},
"execution_log": [],
"errors": [{"error": str(e)}],
"summary": {}
}
return response
except Exception as e:
logger.error(f"Crawl error: {str(e)}", exc_info=True)
@@ -581,11 +513,9 @@ async def handle_stream_crawl_request(
urls: List[str],
browser_config: dict,
crawler_config: dict,
config: dict,
hooks_config: Optional[dict] = None
) -> Tuple[AsyncWebCrawler, AsyncGenerator, Optional[Dict]]:
"""Handle streaming crawl requests with optional hooks."""
hooks_info = None
config: dict
) -> Tuple[AsyncWebCrawler, AsyncGenerator]:
"""Handle streaming crawl requests."""
try:
browser_config = BrowserConfig.load(browser_config)
# browser_config.verbose = True # Set to False or remove for production stress testing
@@ -606,20 +536,6 @@ async def handle_stream_crawl_request(
# crawler = AsyncWebCrawler(config=browser_config)
# await crawler.start()
# Attach hooks if provided
if hooks_config:
from hook_manager import attach_user_hooks_to_crawler, UserHookManager
hook_manager = UserHookManager(timeout=hooks_config.get('timeout', 30))
hooks_status, hook_manager = await attach_user_hooks_to_crawler(
crawler,
hooks_config.get('code', {}),
timeout=hooks_config.get('timeout', 30),
hook_manager=hook_manager
)
logger.info(f"Hooks attachment status for streaming: {hooks_status['status']}")
# Include hook manager in hooks_info for proper tracking
hooks_info = {'status': hooks_status, 'manager': hook_manager}
results_gen = await crawler.arun_many(
urls=urls,
@@ -627,7 +543,7 @@ async def handle_stream_crawl_request(
dispatcher=dispatcher
)
return crawler, results_gen, hooks_info
return crawler, results_gen
except Exception as e:
# Make sure to close crawler if started during an error here

View File

@@ -1,512 +0,0 @@
"""
Hook Manager for User-Provided Hook Functions
Handles validation, compilation, and safe execution of user-provided hook code
"""
import ast
import asyncio
import traceback
from typing import Dict, Callable, Optional, Tuple, List, Any
import logging
logger = logging.getLogger(__name__)
class UserHookManager:
"""Manages user-provided hook functions with error isolation"""
# Expected signatures for each hook point
HOOK_SIGNATURES = {
"on_browser_created": ["browser"],
"on_page_context_created": ["page", "context"],
"before_goto": ["page", "context", "url"],
"after_goto": ["page", "context", "url", "response"],
"on_user_agent_updated": ["page", "context", "user_agent"],
"on_execution_started": ["page", "context"],
"before_retrieve_html": ["page", "context"],
"before_return_html": ["page", "context", "html"]
}
# Default timeout for hook execution (in seconds)
DEFAULT_TIMEOUT = 30
def __init__(self, timeout: int = DEFAULT_TIMEOUT):
self.timeout = timeout
self.errors: List[Dict[str, Any]] = []
self.compiled_hooks: Dict[str, Callable] = {}
self.execution_log: List[Dict[str, Any]] = []
def validate_hook_structure(self, hook_code: str, hook_point: str) -> Tuple[bool, str]:
"""
Validate the structure of user-provided hook code
Args:
hook_code: The Python code string containing the hook function
hook_point: The hook point name (e.g., 'on_page_context_created')
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Parse the code
tree = ast.parse(hook_code)
# Check if it's empty
if not tree.body:
return False, "Hook code is empty"
# Find the function definition
func_def = None
for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
func_def = node
break
if not func_def:
return False, "Hook must contain a function definition (def or async def)"
# Check if it's async (all hooks should be async)
if not isinstance(func_def, ast.AsyncFunctionDef):
return False, f"Hook function must be async (use 'async def' instead of 'def')"
# Get function name for better error messages
func_name = func_def.name
# Validate parameters
expected_params = self.HOOK_SIGNATURES.get(hook_point, [])
if not expected_params:
return False, f"Unknown hook point: {hook_point}"
func_params = [arg.arg for arg in func_def.args.args]
# Check if it has **kwargs for flexibility
has_kwargs = func_def.args.kwarg is not None
# Must have at least the expected parameters
missing_params = []
for expected in expected_params:
if expected not in func_params:
missing_params.append(expected)
if missing_params and not has_kwargs:
return False, f"Hook function '{func_name}' must accept parameters: {', '.join(expected_params)} (missing: {', '.join(missing_params)})"
# Check if it returns something (should return page or browser)
has_return = any(isinstance(node, ast.Return) for node in ast.walk(func_def))
if not has_return:
# Warning, not error - we'll handle this
logger.warning(f"Hook function '{func_name}' should return the {expected_params[0]} object")
return True, "Valid"
except SyntaxError as e:
return False, f"Syntax error at line {e.lineno}: {str(e)}"
except Exception as e:
return False, f"Failed to parse hook code: {str(e)}"
def compile_hook(self, hook_code: str, hook_point: str) -> Optional[Callable]:
"""
Compile user-provided hook code into a callable function
Args:
hook_code: The Python code string
hook_point: The hook point name
Returns:
Compiled function or None if compilation failed
"""
try:
# Create a safe namespace for the hook
# Use a more complete builtins that includes __import__
import builtins
safe_builtins = {}
# Add safe built-in functions
allowed_builtins = [
'print', 'len', 'str', 'int', 'float', 'bool',
'list', 'dict', 'set', 'tuple', 'range', 'enumerate',
'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max',
'sorted', 'reversed', 'abs', 'round', 'isinstance', 'type',
'getattr', 'hasattr', 'setattr', 'callable', 'iter', 'next',
'__import__', '__build_class__' # Required for exec
]
for name in allowed_builtins:
if hasattr(builtins, name):
safe_builtins[name] = getattr(builtins, name)
namespace = {
'__name__': f'user_hook_{hook_point}',
'__builtins__': safe_builtins
}
# Add commonly needed imports
exec("import asyncio", namespace)
exec("import json", namespace)
exec("import re", namespace)
exec("from typing import Dict, List, Optional", namespace)
# Execute the code to define the function
exec(hook_code, namespace)
# Find the async function in the namespace
for name, obj in namespace.items():
if callable(obj) and not name.startswith('_') and asyncio.iscoroutinefunction(obj):
return obj
# If no async function found, look for any function
for name, obj in namespace.items():
if callable(obj) and not name.startswith('_'):
logger.warning(f"Found non-async function '{name}' - wrapping it")
# Wrap sync function in async
async def async_wrapper(*args, **kwargs):
return obj(*args, **kwargs)
return async_wrapper
raise ValueError("No callable function found in hook code")
except Exception as e:
error = {
'hook_point': hook_point,
'error': f"Failed to compile hook: {str(e)}",
'type': 'compilation_error',
'traceback': traceback.format_exc()
}
self.errors.append(error)
logger.error(f"Hook compilation failed for {hook_point}: {str(e)}")
return None
async def execute_hook_safely(
self,
hook_func: Callable,
hook_point: str,
*args,
**kwargs
) -> Tuple[Any, Optional[Dict]]:
"""
Execute a user hook with error isolation and timeout
Args:
hook_func: The compiled hook function
hook_point: The hook point name
*args, **kwargs: Arguments to pass to the hook
Returns:
Tuple of (result, error_dict)
"""
start_time = asyncio.get_event_loop().time()
try:
# Add timeout to prevent infinite loops
result = await asyncio.wait_for(
hook_func(*args, **kwargs),
timeout=self.timeout
)
# Log successful execution
execution_time = asyncio.get_event_loop().time() - start_time
self.execution_log.append({
'hook_point': hook_point,
'status': 'success',
'execution_time': execution_time,
'timestamp': start_time
})
return result, None
except asyncio.TimeoutError:
error = {
'hook_point': hook_point,
'error': f'Hook execution timed out ({self.timeout}s limit)',
'type': 'timeout',
'execution_time': self.timeout
}
self.errors.append(error)
self.execution_log.append({
'hook_point': hook_point,
'status': 'timeout',
'error': error['error'],
'execution_time': self.timeout,
'timestamp': start_time
})
# Return the first argument (usually page/browser) to continue
return args[0] if args else None, error
except Exception as e:
execution_time = asyncio.get_event_loop().time() - start_time
error = {
'hook_point': hook_point,
'error': str(e),
'type': type(e).__name__,
'traceback': traceback.format_exc(),
'execution_time': execution_time
}
self.errors.append(error)
self.execution_log.append({
'hook_point': hook_point,
'status': 'failed',
'error': str(e),
'error_type': type(e).__name__,
'execution_time': execution_time,
'timestamp': start_time
})
# Return the first argument (usually page/browser) to continue
return args[0] if args else None, error
def get_summary(self) -> Dict[str, Any]:
"""Get a summary of hook execution"""
total_hooks = len(self.execution_log)
successful = sum(1 for log in self.execution_log if log['status'] == 'success')
failed = sum(1 for log in self.execution_log if log['status'] == 'failed')
timed_out = sum(1 for log in self.execution_log if log['status'] == 'timeout')
return {
'total_executions': total_hooks,
'successful': successful,
'failed': failed,
'timed_out': timed_out,
'success_rate': (successful / total_hooks * 100) if total_hooks > 0 else 0,
'total_errors': len(self.errors)
}
class IsolatedHookWrapper:
"""Wraps user hooks with error isolation and reporting"""
def __init__(self, hook_manager: UserHookManager):
self.hook_manager = hook_manager
def create_hook_wrapper(self, user_hook: Callable, hook_point: str) -> Callable:
"""
Create a wrapper that isolates hook errors from main process
Args:
user_hook: The compiled user hook function
hook_point: The hook point name
Returns:
Wrapped async function that handles errors gracefully
"""
async def wrapped_hook(*args, **kwargs):
"""Wrapped hook with error isolation"""
# Get the main return object (page/browser)
# This ensures we always have something to return
return_obj = None
if args:
return_obj = args[0]
elif 'page' in kwargs:
return_obj = kwargs['page']
elif 'browser' in kwargs:
return_obj = kwargs['browser']
try:
# Execute user hook with safety
result, error = await self.hook_manager.execute_hook_safely(
user_hook,
hook_point,
*args,
**kwargs
)
if error:
# Hook failed but we continue with original object
logger.warning(f"User hook failed at {hook_point}: {error['error']}")
return return_obj
# Hook succeeded - return its result or the original object
if result is None:
logger.debug(f"Hook at {hook_point} returned None, using original object")
return return_obj
return result
except Exception as e:
# This should rarely happen due to execute_hook_safely
logger.error(f"Unexpected error in hook wrapper for {hook_point}: {e}")
return return_obj
# Set function name for debugging
wrapped_hook.__name__ = f"wrapped_{hook_point}"
return wrapped_hook
async def process_user_hooks(
hooks_input: Dict[str, str],
timeout: int = 30
) -> Tuple[Dict[str, Callable], List[Dict], UserHookManager]:
"""
Process and compile user-provided hook functions
Args:
hooks_input: Dictionary mapping hook points to code strings
timeout: Timeout for each hook execution
Returns:
Tuple of (compiled_hooks, validation_errors, hook_manager)
"""
hook_manager = UserHookManager(timeout=timeout)
wrapper = IsolatedHookWrapper(hook_manager)
compiled_hooks = {}
validation_errors = []
for hook_point, hook_code in hooks_input.items():
# Skip empty hooks
if not hook_code or not hook_code.strip():
continue
# Validate hook point
if hook_point not in UserHookManager.HOOK_SIGNATURES:
validation_errors.append({
'hook_point': hook_point,
'error': f'Unknown hook point. Valid points: {", ".join(UserHookManager.HOOK_SIGNATURES.keys())}',
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
continue
# Validate structure
is_valid, message = hook_manager.validate_hook_structure(hook_code, hook_point)
if not is_valid:
validation_errors.append({
'hook_point': hook_point,
'error': message,
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
continue
# Compile the hook
hook_func = hook_manager.compile_hook(hook_code, hook_point)
if hook_func:
# Wrap with error isolation
wrapped_hook = wrapper.create_hook_wrapper(hook_func, hook_point)
compiled_hooks[hook_point] = wrapped_hook
logger.info(f"Successfully compiled hook for {hook_point}")
else:
validation_errors.append({
'hook_point': hook_point,
'error': 'Failed to compile hook function - check syntax and structure',
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
return compiled_hooks, validation_errors, hook_manager
async def process_user_hooks_with_manager(
hooks_input: Dict[str, str],
hook_manager: UserHookManager
) -> Tuple[Dict[str, Callable], List[Dict]]:
"""
Process and compile user-provided hook functions with existing manager
Args:
hooks_input: Dictionary mapping hook points to code strings
hook_manager: Existing UserHookManager instance
Returns:
Tuple of (compiled_hooks, validation_errors)
"""
wrapper = IsolatedHookWrapper(hook_manager)
compiled_hooks = {}
validation_errors = []
for hook_point, hook_code in hooks_input.items():
# Skip empty hooks
if not hook_code or not hook_code.strip():
continue
# Validate hook point
if hook_point not in UserHookManager.HOOK_SIGNATURES:
validation_errors.append({
'hook_point': hook_point,
'error': f'Unknown hook point. Valid points: {", ".join(UserHookManager.HOOK_SIGNATURES.keys())}',
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
continue
# Validate structure
is_valid, message = hook_manager.validate_hook_structure(hook_code, hook_point)
if not is_valid:
validation_errors.append({
'hook_point': hook_point,
'error': message,
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
continue
# Compile the hook
hook_func = hook_manager.compile_hook(hook_code, hook_point)
if hook_func:
# Wrap with error isolation
wrapped_hook = wrapper.create_hook_wrapper(hook_func, hook_point)
compiled_hooks[hook_point] = wrapped_hook
logger.info(f"Successfully compiled hook for {hook_point}")
else:
validation_errors.append({
'hook_point': hook_point,
'error': 'Failed to compile hook function - check syntax and structure',
'code_preview': hook_code[:100] + '...' if len(hook_code) > 100 else hook_code
})
return compiled_hooks, validation_errors
async def attach_user_hooks_to_crawler(
crawler, # AsyncWebCrawler instance
user_hooks: Dict[str, str],
timeout: int = 30,
hook_manager: Optional[UserHookManager] = None
) -> Tuple[Dict[str, Any], UserHookManager]:
"""
Attach user-provided hooks to crawler with full error reporting
Args:
crawler: AsyncWebCrawler instance
user_hooks: Dictionary mapping hook points to code strings
timeout: Timeout for each hook execution
hook_manager: Optional existing UserHookManager instance
Returns:
Tuple of (status_dict, hook_manager)
"""
# Use provided hook_manager or create a new one
if hook_manager is None:
hook_manager = UserHookManager(timeout=timeout)
# Process hooks with the hook_manager
compiled_hooks, validation_errors = await process_user_hooks_with_manager(
user_hooks, hook_manager
)
# Log validation errors
if validation_errors:
logger.warning(f"Hook validation errors: {validation_errors}")
# Attach successfully compiled hooks
attached_hooks = []
for hook_point, wrapped_hook in compiled_hooks.items():
try:
crawler.crawler_strategy.set_hook(hook_point, wrapped_hook)
attached_hooks.append(hook_point)
logger.info(f"Attached hook to {hook_point}")
except Exception as e:
logger.error(f"Failed to attach hook to {hook_point}: {e}")
validation_errors.append({
'hook_point': hook_point,
'error': f'Failed to attach hook: {str(e)}'
})
status = 'success' if not validation_errors else ('partial' if attached_hooks else 'failed')
status_dict = {
'status': status,
'attached_hooks': attached_hooks,
'validation_errors': validation_errors,
'total_hooks_provided': len(user_hooks),
'successfully_attached': len(attached_hooks),
'failed_validation': len(validation_errors)
}
return status_dict, hook_manager

View File

@@ -9,50 +9,6 @@ class CrawlRequest(BaseModel):
browser_config: Optional[Dict] = Field(default_factory=dict)
crawler_config: Optional[Dict] = Field(default_factory=dict)
class HookConfig(BaseModel):
"""Configuration for user-provided hooks"""
code: Dict[str, str] = Field(
default_factory=dict,
description="Map of hook points to Python code strings"
)
timeout: int = Field(
default=30,
ge=1,
le=120,
description="Timeout in seconds for each hook execution"
)
class Config:
schema_extra = {
"example": {
"code": {
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Block images to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif}", lambda route: route.abort())
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Scroll to load lazy content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(2000)
return page
"""
},
"timeout": 30
}
}
class CrawlRequestWithHooks(CrawlRequest):
"""Extended crawl request with hooks support"""
hooks: Optional[HookConfig] = Field(
default=None,
description="Optional user-provided hook functions"
)
class MarkdownRequest(BaseModel):
"""Request body for the /md endpoint."""
url: str = Field(..., description="Absolute http/https URL to fetch")

View File

@@ -23,7 +23,7 @@ from api import (
stream_results
)
from schemas import (
CrawlRequestWithHooks,
CrawlRequest,
MarkdownRequest,
RawCode,
HTMLRequest,
@@ -414,72 +414,6 @@ async def get_schema():
"crawler": CrawlerRunConfig().dump()}
@app.get("/hooks/info")
async def get_hooks_info():
"""Get information about available hook points and their signatures"""
from hook_manager import UserHookManager
hook_info = {}
for hook_point, params in UserHookManager.HOOK_SIGNATURES.items():
hook_info[hook_point] = {
"parameters": params,
"description": get_hook_description(hook_point),
"example": get_hook_example(hook_point)
}
return JSONResponse({
"available_hooks": hook_info,
"timeout_limits": {
"min": 1,
"max": 120,
"default": 30
}
})
def get_hook_description(hook_point: str) -> str:
"""Get description for each hook point"""
descriptions = {
"on_browser_created": "Called after browser instance is created",
"on_page_context_created": "Called after page and context are created - ideal for authentication",
"before_goto": "Called before navigating to the target URL",
"after_goto": "Called after navigation is complete",
"on_user_agent_updated": "Called when user agent is updated",
"on_execution_started": "Called when custom JavaScript execution begins",
"before_retrieve_html": "Called before retrieving the final HTML - ideal for scrolling",
"before_return_html": "Called just before returning the HTML content"
}
return descriptions.get(hook_point, "")
def get_hook_example(hook_point: str) -> str:
"""Get example code for each hook point"""
examples = {
"on_page_context_created": """async def hook(page, context, **kwargs):
# Add authentication cookie
await context.add_cookies([{
'name': 'session',
'value': 'my-session-id',
'domain': '.example.com'
}])
return page""",
"before_retrieve_html": """async def hook(page, context, **kwargs):
# Scroll to load lazy content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(2000)
return page""",
"before_goto": """async def hook(page, context, url, **kwargs):
# Set custom headers
await page.set_extra_http_headers({
'X-Custom-Header': 'value'
})
return page"""
}
return examples.get(hook_point, "# Implement your hook logic here\nreturn page")
@app.get(config["observability"]["health_check"]["endpoint"])
async def health():
return {"status": "ok", "timestamp": time.time(), "version": __version__}
@@ -495,30 +429,19 @@ async def metrics():
@mcp_tool("crawl")
async def crawl(
request: Request,
crawl_request: CrawlRequestWithHooks,
crawl_request: CrawlRequest,
_td: Dict = Depends(token_dep),
):
"""
Crawl a list of URLs and return the results as JSON.
Supports optional user-provided hook functions for customization.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
# Prepare hooks config if provided
hooks_config = None
if crawl_request.hooks:
hooks_config = {
'code': crawl_request.hooks.code,
'timeout': crawl_request.hooks.timeout
}
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config
)
return JSONResponse(res)
@@ -527,42 +450,25 @@ async def crawl(
@limiter.limit(config["rate_limiting"]["default_limit"])
async def crawl_stream(
request: Request,
crawl_request: CrawlRequestWithHooks,
crawl_request: CrawlRequest,
_td: Dict = Depends(token_dep),
):
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
# Prepare hooks config if provided
hooks_config = None
if crawl_request.hooks:
hooks_config = {
'code': crawl_request.hooks.code,
'timeout': crawl_request.hooks.timeout
}
crawler, gen, hooks_info = await handle_stream_crawl_request(
crawler, gen = await handle_stream_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config
)
# Add hooks info to response headers if available
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Stream-Status": "active",
}
if hooks_info:
import json
headers["X-Hooks-Status"] = json.dumps(hooks_info['status']['status'])
return StreamingResponse(
stream_results(crawler, gen),
media_type="application/x-ndjson",
headers=headers,
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Stream-Status": "active",
},
)

View File

@@ -3,8 +3,8 @@ C4A-Script API Usage Examples
Shows how to use the new Result-based API in various scenarios
"""
from crawl4ai.script.c4a_compile import compile, validate, compile_file
from crawl4ai.script.c4a_result import CompilationResult, ValidationResult
from c4a_compile import compile, validate, compile_file
from c4a_result import CompilationResult, ValidationResult
import json

View File

@@ -3,7 +3,7 @@ C4A-Script Hello World
A concise example showing how to use the C4A-Script compiler
"""
from crawl4ai.script.c4a_compile import compile
from c4a_compile import compile
# Define your C4A-Script
script = """

View File

@@ -3,7 +3,7 @@ C4A-Script Hello World - Error Example
Shows how error handling works
"""
from crawl4ai.script.c4a_compile import compile
from c4a_compile import compile
# Define a script with an error (missing THEN)
script = """

View File

@@ -1,513 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive test demonstrating all hook types from hooks_example.py
adapted for the Docker API with real URLs
"""
import requests
import json
import time
from typing import Dict, Any
# API_BASE_URL = "http://localhost:11234"
API_BASE_URL = "http://localhost:11235"
def test_all_hooks_demo():
"""Demonstrate all 8 hook types with practical examples"""
print("=" * 70)
print("Testing: All Hooks Comprehensive Demo")
print("=" * 70)
hooks_code = {
"on_browser_created": """
async def hook(browser, **kwargs):
# Hook called after browser is created
print("[HOOK] on_browser_created - Browser is ready!")
# Browser-level configurations would go here
return browser
""",
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Hook called after a new page and context are created
print("[HOOK] on_page_context_created - New page created!")
# Set viewport size for consistent rendering
await page.set_viewport_size({"width": 1920, "height": 1080})
# Add cookies for the session (using httpbin.org domain)
await context.add_cookies([
{
"name": "test_session",
"value": "abc123xyz",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Block ads and tracking scripts to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg}", lambda route: route.abort())
await context.route("**/analytics/*", lambda route: route.abort())
await context.route("**/ads/*", lambda route: route.abort())
print("[HOOK] Viewport set, cookies added, and ads blocked")
return page
""",
"on_user_agent_updated": """
async def hook(page, context, user_agent, **kwargs):
# Hook called when user agent is updated
print(f"[HOOK] on_user_agent_updated - User agent: {user_agent[:50]}...")
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
# Hook called before navigating to each URL
print(f"[HOOK] before_goto - About to visit: {url}")
# Add custom headers for the request
await page.set_extra_http_headers({
"X-Custom-Header": "crawl4ai-test",
"Accept-Language": "en-US,en;q=0.9",
"DNT": "1"
})
return page
""",
"after_goto": """
async def hook(page, context, url, response, **kwargs):
# Hook called after navigating to each URL
print(f"[HOOK] after_goto - Successfully loaded: {url}")
# Wait a moment for dynamic content to load
await page.wait_for_timeout(1000)
# Check if specific elements exist (with error handling)
try:
# For httpbin.org, wait for body element
await page.wait_for_selector("body", timeout=2000)
print("[HOOK] Body element found and loaded")
except:
print("[HOOK] Timeout waiting for body, continuing anyway")
return page
""",
"on_execution_started": """
async def hook(page, context, **kwargs):
# Hook called after custom JavaScript execution
print("[HOOK] on_execution_started - Custom JS executed!")
# You could inject additional JavaScript here if needed
await page.evaluate("console.log('[INJECTED] Hook JS running');")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Hook called before retrieving the HTML content
print("[HOOK] before_retrieve_html - Preparing to get HTML")
# Scroll to bottom to trigger lazy loading
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(500)
# Scroll back to top
await page.evaluate("window.scrollTo(0, 0);")
await page.wait_for_timeout(500)
# One more scroll to middle for good measure
await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 2);")
print("[HOOK] Scrolling completed for lazy-loaded content")
return page
""",
"before_return_html": """
async def hook(page, context, html, **kwargs):
# Hook called before returning the HTML content
print(f"[HOOK] before_return_html - HTML length: {len(html)} characters")
# Log some page metrics
metrics = await page.evaluate('''() => {
return {
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length
}
}''')
print(f"[HOOK] Page metrics - Images: {metrics['images']}, Links: {metrics['links']}, Scripts: {metrics['scripts']}")
return page
"""
}
# Create request payload
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 30
},
"crawler_config": {
"js_code": "window.scrollTo(0, document.body.scrollHeight);",
"wait_for": "body",
"cache_mode": "bypass"
}
}
print("\nSending request with all 8 hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("\n✅ Request successful!")
# Check hooks execution
if 'hooks' in data:
hooks_info = data['hooks']
print("\n📊 Hooks Execution Summary:")
print(f" Status: {hooks_info['status']['status']}")
print(f" Attached hooks: {len(hooks_info['status']['attached_hooks'])}")
for hook_name in hooks_info['status']['attached_hooks']:
print(f"{hook_name}")
if 'summary' in hooks_info:
summary = hooks_info['summary']
print(f"\n📈 Execution Statistics:")
print(f" Total executions: {summary['total_executions']}")
print(f" Successful: {summary['successful']}")
print(f" Failed: {summary['failed']}")
print(f" Timed out: {summary['timed_out']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
if hooks_info.get('execution_log'):
print(f"\n📝 Execution Log:")
for log_entry in hooks_info['execution_log']:
status_icon = "" if log_entry['status'] == 'success' else ""
exec_time = log_entry.get('execution_time', 0)
print(f" {status_icon} {log_entry['hook_point']}: {exec_time:.3f}s")
# Check crawl results
if 'results' in data and len(data['results']) > 0:
print(f"\n📄 Crawl Results:")
for result in data['results']:
print(f" URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
if result.get('html'):
print(f" HTML length: {len(result['html'])} characters")
else:
print(f"❌ Error: {response.status_code}")
try:
error_data = response.json()
print(f"Error details: {json.dumps(error_data, indent=2)}")
except:
print(f"Error text: {response.text[:500]}")
def test_authentication_flow():
"""Test a complete authentication flow with multiple hooks"""
print("\n" + "=" * 70)
print("Testing: Authentication Flow with Multiple Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Setting up authentication context")
# Add authentication cookies
await context.add_cookies([
{
"name": "auth_token",
"value": "fake_jwt_token_here",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Set localStorage items (for SPA authentication)
await page.evaluate('''
localStorage.setItem('user_id', '12345');
localStorage.setItem('auth_time', new Date().toISOString());
''')
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
print(f"[HOOK] Adding auth headers for {url}")
# Add Authorization header
import base64
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}',
'X-API-Key': 'test-api-key-123'
})
return page
"""
}
payload = {
"urls": [
"https://httpbin.org/basic-auth/user/passwd"
],
"hooks": {
"code": hooks_code,
"timeout": 15
}
}
print("\nTesting authentication with httpbin endpoints...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Authentication test completed")
if 'results' in data:
for i, result in enumerate(data['results']):
print(f"\n URL {i+1}: {result['url']}")
if result.get('success'):
# Check for authentication success indicators
html_content = result.get('html', '')
if '"authenticated"' in html_content and 'true' in html_content:
print(" ✅ Authentication successful! Basic auth worked.")
else:
print(" ⚠️ Page loaded but auth status unclear")
else:
print(f" ❌ Failed: {result.get('error_message', 'Unknown error')}")
else:
print(f"❌ Error: {response.status_code}")
def test_performance_optimization_hooks():
"""Test hooks for performance optimization"""
print("\n" + "=" * 70)
print("Testing: Performance Optimization Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Optimizing page for performance")
# Block resource-heavy content
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg,ico}", lambda route: route.abort())
await context.route("**/*.{woff,woff2,ttf,otf}", lambda route: route.abort())
await context.route("**/*.{mp4,webm,ogg,mp3,wav}", lambda route: route.abort())
await context.route("**/googletagmanager.com/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
await context.route("**/doubleclick.net/*", lambda route: route.abort())
await context.route("**/facebook.com/*", lambda route: route.abort())
# Disable animations and transitions
await page.add_style_tag(content='''
*, *::before, *::after {
animation-duration: 0s !important;
animation-delay: 0s !important;
transition-duration: 0s !important;
transition-delay: 0s !important;
}
''')
print("[HOOK] Performance optimizations applied")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Removing unnecessary elements before extraction")
# Remove ads, popups, and other unnecessary elements
await page.evaluate('''() => {
// Remove common ad containers
const adSelectors = [
'.ad', '.ads', '.advertisement', '[id*="ad-"]', '[class*="ad-"]',
'.popup', '.modal', '.overlay', '.cookie-banner', '.newsletter-signup'
];
adSelectors.forEach(selector => {
document.querySelectorAll(selector).forEach(el => el.remove());
});
// Remove script tags to clean up HTML
document.querySelectorAll('script').forEach(el => el.remove());
// Remove style tags we don't need
document.querySelectorAll('style').forEach(el => el.remove());
}''')
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 10
}
}
print("\nTesting performance optimization hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("✅ Performance optimization test completed")
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
if result.get('html'):
print(f" HTML size: {len(result['html'])} characters")
print(" Resources blocked, ads removed, animations disabled")
else:
print(f"❌ Error: {response.status_code}")
def test_content_extraction_hooks():
"""Test hooks for intelligent content extraction"""
print("\n" + "=" * 70)
print("Testing: Content Extraction Hooks")
print("=" * 70)
hooks_code = {
"after_goto": """
async def hook(page, context, url, response, **kwargs):
print(f"[HOOK] Waiting for dynamic content on {url}")
# Wait for any lazy-loaded content
await page.wait_for_timeout(2000)
# Trigger any "Load More" buttons
try:
load_more = await page.query_selector('[class*="load-more"], [class*="show-more"], button:has-text("Load More")')
if load_more:
await load_more.click()
await page.wait_for_timeout(1000)
print("[HOOK] Clicked 'Load More' button")
except:
pass
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Extracting structured data")
# Extract metadata
metadata = await page.evaluate('''() => {
const getMeta = (name) => {
const element = document.querySelector(`meta[name="${name}"], meta[property="${name}"]`);
return element ? element.getAttribute('content') : null;
};
return {
title: document.title,
description: getMeta('description') || getMeta('og:description'),
author: getMeta('author'),
keywords: getMeta('keywords'),
ogTitle: getMeta('og:title'),
ogImage: getMeta('og:image'),
canonical: document.querySelector('link[rel="canonical"]')?.href,
jsonLd: Array.from(document.querySelectorAll('script[type="application/ld+json"]'))
.map(el => el.textContent).filter(Boolean)
};
}''')
print(f"[HOOK] Extracted metadata: {json.dumps(metadata, indent=2)}")
# Infinite scroll handling
for i in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
print(f"[HOOK] Scroll iteration {i+1}/3")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"],
"hooks": {
"code": hooks_code,
"timeout": 20
}
}
print("\nTesting content extraction hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Content extraction test completed")
if 'hooks' in data and 'summary' in data['hooks']:
summary = data['hooks']['summary']
print(f" Hooks executed: {summary['successful']}/{summary['total_executions']}")
if 'results' in data:
for result in data['results']:
print(f"\n URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
else:
print(f"❌ Error: {response.status_code}")
def main():
"""Run comprehensive hook tests"""
print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing")
print("Based on docs/examples/hooks_example.py")
print("=" * 70)
tests = [
("All Hooks Demo", test_all_hooks_demo),
("Authentication Flow", test_authentication_flow),
("Performance Optimization", test_performance_optimization_hooks),
("Content Extraction", test_content_extraction_hooks),
]
for i, (name, test_func) in enumerate(tests, 1):
print(f"\n📌 Test {i}/{len(tests)}: {name}")
try:
test_func()
print(f"{name} completed")
except Exception as e:
print(f"{name} failed: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 70)
print("🎉 All comprehensive hook tests completed!")
print("=" * 70)
if __name__ == "__main__":
main()

View File

@@ -1,57 +0,0 @@
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
DefaultMarkdownGenerator,
PruningContentFilter,
CrawlResult,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create browser config
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create the crawler strategy with the undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Configure the crawl
crawler_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter()
),
capture_console_messages=True, # Enable console capture to test adapter
)
# Test on a site that typically detects bots
print("Testing undetected adapter...")
result: CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config
)
print(f"Status: {result.status_code}")
print(f"Success: {result.success}")
print(f"Console messages captured: {len(result.console_messages or [])}")
print(f"Markdown content (first 500 chars):\n{result.markdown.raw_markdown[:500]}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,59 +0,0 @@
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Example 1: Stealth Mode
async def stealth_mode_example():
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Example 2: Undetected Browser
async def undetected_browser_example():
browser_config = BrowserConfig(
headless=False
)
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Example 3: Both Combined
async def combined_example():
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Run examples
if __name__ == "__main__":
asyncio.run(stealth_mode_example())
asyncio.run(undetected_browser_example())
asyncio.run(combined_example())

View File

@@ -1,522 +0,0 @@
"""
Stealth Mode Example with Crawl4AI
This example demonstrates how to use the stealth mode feature to bypass basic bot detection.
The stealth mode uses playwright-stealth to modify browser fingerprints and behaviors
that are commonly used to detect automated browsers.
Key features demonstrated:
1. Comparing crawling with and without stealth mode
2. Testing against bot detection sites
3. Accessing sites that block automated browsers
4. Best practices for stealth crawling
"""
import asyncio
import json
from typing import Dict, Any
from colorama import Fore, Style, init
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
# Initialize colorama for colored output
init()
# Create a logger for better output
logger = AsyncLogger(verbose=True)
async def test_bot_detection(use_stealth: bool = False) -> Dict[str, Any]:
"""Test against a bot detection service"""
logger.info(
f"Testing bot detection with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
# Configure browser with or without stealth
browser_config = BrowserConfig(
headless=False, # Use False to see the browser in action
enable_stealth=use_stealth,
viewport_width=1280,
viewport_height=800
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# JavaScript to extract bot detection results
detection_script = """
// Comprehensive bot detection checks
(() => {
const detectionResults = {
// Basic WebDriver detection
webdriver: navigator.webdriver,
// Chrome specific
chrome: !!window.chrome,
chromeRuntime: !!window.chrome?.runtime,
// Automation indicators
automationControlled: navigator.webdriver,
// Permissions API
permissionsPresent: !!navigator.permissions?.query,
// Plugins
pluginsLength: navigator.plugins.length,
pluginsArray: Array.from(navigator.plugins).map(p => p.name),
// Languages
languages: navigator.languages,
language: navigator.language,
// User agent
userAgent: navigator.userAgent,
// Screen and window properties
screen: {
width: screen.width,
height: screen.height,
availWidth: screen.availWidth,
availHeight: screen.availHeight,
colorDepth: screen.colorDepth,
pixelDepth: screen.pixelDepth
},
// WebGL vendor
webglVendor: (() => {
try {
const canvas = document.createElement('canvas');
const gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl');
const ext = gl.getExtension('WEBGL_debug_renderer_info');
return gl.getParameter(ext.UNMASKED_VENDOR_WEBGL);
} catch (e) {
return 'Error';
}
})(),
// Platform
platform: navigator.platform,
// Hardware concurrency
hardwareConcurrency: navigator.hardwareConcurrency,
// Device memory
deviceMemory: navigator.deviceMemory,
// Connection
connection: navigator.connection?.effectiveType
};
// Log results for console capture
console.log('DETECTION_RESULTS:', JSON.stringify(detectionResults, null, 2));
// Return results
return detectionResults;
})();
"""
# Crawl bot detection test page
config = CrawlerRunConfig(
js_code=detection_script,
capture_console_messages=True,
wait_until="networkidle",
delay_before_return_html=2.0 # Give time for all checks to complete
)
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=config
)
if result.success:
# Extract detection results from console
detection_data = None
for msg in result.console_messages or []:
if "DETECTION_RESULTS:" in msg.get("text", ""):
try:
json_str = msg["text"].replace("DETECTION_RESULTS:", "").strip()
detection_data = json.loads(json_str)
except:
pass
# Also try to get from JavaScript execution result
if not detection_data and result.js_execution_result:
detection_data = result.js_execution_result
return {
"success": True,
"url": result.url,
"detection_data": detection_data,
"page_title": result.metadata.get("title", ""),
"stealth_enabled": use_stealth
}
else:
return {
"success": False,
"error": result.error_message,
"stealth_enabled": use_stealth
}
async def test_cloudflare_site(use_stealth: bool = False) -> Dict[str, Any]:
"""Test accessing a Cloudflare-protected site"""
logger.info(
f"Testing Cloudflare site with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
browser_config = BrowserConfig(
headless=True, # Cloudflare detection works better in headless mode with stealth
enable_stealth=use_stealth,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=30000, # 30 seconds
delay_before_return_html=3.0
)
# Test on a site that often shows Cloudflare challenges
result = await crawler.arun(
url="https://nowsecure.nl",
config=config
)
# Check if we hit Cloudflare challenge
cloudflare_detected = False
if result.html:
cloudflare_indicators = [
"Checking your browser",
"Just a moment",
"cf-browser-verification",
"cf-challenge",
"ray ID"
]
cloudflare_detected = any(indicator in result.html for indicator in cloudflare_indicators)
return {
"success": result.success,
"url": result.url,
"cloudflare_challenge": cloudflare_detected,
"status_code": result.status_code,
"page_title": result.metadata.get("title", "") if result.metadata else "",
"stealth_enabled": use_stealth,
"html_snippet": result.html[:500] if result.html else ""
}
async def test_anti_bot_site(use_stealth: bool = False) -> Dict[str, Any]:
"""Test against sites with anti-bot measures"""
logger.info(
f"Testing anti-bot site with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
browser_config = BrowserConfig(
headless=False,
enable_stealth=use_stealth,
# Additional browser arguments that help with stealth
extra_args=[
"--disable-blink-features=AutomationControlled",
"--disable-features=site-per-process"
] if not use_stealth else [] # These are automatically applied with stealth
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Some sites check for specific behaviors
behavior_script = """
(async () => {
// Simulate human-like behavior
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
// Random mouse movement
const moveX = Math.random() * 100;
const moveY = Math.random() * 100;
// Simulate reading time
await sleep(1000 + Math.random() * 2000);
// Scroll slightly
window.scrollBy(0, 100 + Math.random() * 200);
console.log('Human behavior simulation complete');
return true;
})()
"""
config = CrawlerRunConfig(
js_code=behavior_script,
wait_until="networkidle",
delay_before_return_html=5.0, # Longer delay to appear more human
capture_console_messages=True
)
# Test on a site that implements anti-bot measures
result = await crawler.arun(
url="https://www.g2.com/",
config=config
)
# Check for common anti-bot blocks
blocked_indicators = [
"Access Denied",
"403 Forbidden",
"Security Check",
"Verify you are human",
"captcha",
"challenge"
]
blocked = False
if result.html:
blocked = any(indicator.lower() in result.html.lower() for indicator in blocked_indicators)
return {
"success": result.success and not blocked,
"url": result.url,
"blocked": blocked,
"status_code": result.status_code,
"page_title": result.metadata.get("title", "") if result.metadata else "",
"stealth_enabled": use_stealth
}
async def compare_results():
"""Run all tests with and without stealth mode and compare results"""
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Crawl4AI Stealth Mode Comparison{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
# Test 1: Bot Detection
print(f"{Fore.YELLOW}1. Bot Detection Test (bot.sannysoft.com){Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_detection = await test_bot_detection(use_stealth=False)
if regular_detection["success"] and regular_detection["detection_data"]:
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
data = regular_detection["detection_data"]
print(f" • WebDriver detected: {data.get('webdriver', 'Unknown')}")
print(f" • Chrome: {data.get('chrome', 'Unknown')}")
print(f" • Languages: {data.get('languages', 'Unknown')}")
print(f" • Plugins: {data.get('pluginsLength', 'Unknown')}")
print(f" • User Agent: {data.get('userAgent', 'Unknown')[:60]}...")
# With stealth
stealth_detection = await test_bot_detection(use_stealth=True)
if stealth_detection["success"] and stealth_detection["detection_data"]:
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
data = stealth_detection["detection_data"]
print(f" • WebDriver detected: {data.get('webdriver', 'Unknown')}")
print(f" • Chrome: {data.get('chrome', 'Unknown')}")
print(f" • Languages: {data.get('languages', 'Unknown')}")
print(f" • Plugins: {data.get('pluginsLength', 'Unknown')}")
print(f" • User Agent: {data.get('userAgent', 'Unknown')[:60]}...")
# Test 2: Cloudflare Site
print(f"\n\n{Fore.YELLOW}2. Cloudflare Protected Site Test{Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_cf = await test_cloudflare_site(use_stealth=False)
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
print(f" • Success: {regular_cf['success']}")
print(f" • Cloudflare Challenge: {regular_cf['cloudflare_challenge']}")
print(f" • Status Code: {regular_cf['status_code']}")
print(f" • Page Title: {regular_cf['page_title']}")
# With stealth
stealth_cf = await test_cloudflare_site(use_stealth=True)
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
print(f" • Success: {stealth_cf['success']}")
print(f" • Cloudflare Challenge: {stealth_cf['cloudflare_challenge']}")
print(f" • Status Code: {stealth_cf['status_code']}")
print(f" • Page Title: {stealth_cf['page_title']}")
# Test 3: Anti-bot Site
print(f"\n\n{Fore.YELLOW}3. Anti-Bot Site Test{Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_antibot = await test_anti_bot_site(use_stealth=False)
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
print(f" • Success: {regular_antibot['success']}")
print(f" • Blocked: {regular_antibot['blocked']}")
print(f" • Status Code: {regular_antibot['status_code']}")
print(f" • Page Title: {regular_antibot['page_title']}")
# With stealth
stealth_antibot = await test_anti_bot_site(use_stealth=True)
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
print(f" • Success: {stealth_antibot['success']}")
print(f" • Blocked: {stealth_antibot['blocked']}")
print(f" • Status Code: {stealth_antibot['status_code']}")
print(f" • Page Title: {stealth_antibot['page_title']}")
# Summary
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Summary:{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"\nStealth mode helps bypass basic bot detection by:")
print(f" • Hiding webdriver property")
print(f" • Modifying browser fingerprints")
print(f" • Adjusting navigator properties")
print(f" • Emulating real browser plugin behavior")
print(f"\n{Fore.YELLOW}Note:{Style.RESET_ALL} Stealth mode is not a silver bullet.")
print(f"Advanced anti-bot systems may still detect automation.")
print(f"Always respect robots.txt and website terms of service.")
async def stealth_best_practices():
"""Demonstrate best practices for using stealth mode"""
print(f"\n\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Stealth Mode Best Practices{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
# Best Practice 1: Combine with realistic behavior
print(f"{Fore.YELLOW}1. Combine with Realistic Behavior:{Style.RESET_ALL}")
browser_config = BrowserConfig(
headless=False,
enable_stealth=True,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Simulate human-like behavior
human_behavior_script = """
(async () => {
// Wait random time between actions
const randomWait = () => Math.random() * 2000 + 1000;
// Simulate reading
await new Promise(resolve => setTimeout(resolve, randomWait()));
// Smooth scroll
const smoothScroll = async () => {
const totalHeight = document.body.scrollHeight;
const viewHeight = window.innerHeight;
let currentPosition = 0;
while (currentPosition < totalHeight - viewHeight) {
const scrollAmount = Math.random() * 300 + 100;
window.scrollBy({
top: scrollAmount,
behavior: 'smooth'
});
currentPosition += scrollAmount;
await new Promise(resolve => setTimeout(resolve, randomWait()));
}
};
await smoothScroll();
console.log('Human-like behavior simulation completed');
return true;
})()
"""
config = CrawlerRunConfig(
js_code=human_behavior_script,
wait_until="networkidle",
delay_before_return_html=3.0,
capture_console_messages=True
)
result = await crawler.arun(
url="https://example.com",
config=config
)
print(f" ✓ Simulated human-like scrolling and reading patterns")
print(f" ✓ Added random delays between actions")
print(f" ✓ Result: {result.success}")
# Best Practice 2: Use appropriate viewport and user agent
print(f"\n{Fore.YELLOW}2. Use Realistic Viewport and User Agent:{Style.RESET_ALL}")
# Get a realistic user agent
from crawl4ai.user_agent_generator import UserAgentGenerator
ua_generator = UserAgentGenerator()
browser_config = BrowserConfig(
headless=True,
enable_stealth=True,
viewport_width=1920,
viewport_height=1080,
user_agent=ua_generator.generate(device_type="desktop", browser_type="chrome")
)
print(f" ✓ Using realistic viewport: 1920x1080")
print(f" ✓ Using current Chrome user agent")
print(f" ✓ Stealth mode will ensure consistency")
# Best Practice 3: Manage request rate
print(f"\n{Fore.YELLOW}3. Manage Request Rate:{Style.RESET_ALL}")
print(f" ✓ Add delays between requests")
print(f" ✓ Randomize timing patterns")
print(f" ✓ Respect robots.txt")
# Best Practice 4: Session management
print(f"\n{Fore.YELLOW}4. Use Session Management:{Style.RESET_ALL}")
browser_config = BrowserConfig(
headless=False,
enable_stealth=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Create a session for multiple requests
session_id = "stealth_session_1"
config = CrawlerRunConfig(
session_id=session_id,
wait_until="domcontentloaded"
)
# First request
result1 = await crawler.arun(
url="https://example.com",
config=config
)
# Subsequent request reuses the same browser context
result2 = await crawler.arun(
url="https://example.com/about",
config=config
)
print(f" ✓ Reused browser session for multiple requests")
print(f" ✓ Maintains cookies and state between requests")
print(f" ✓ More efficient and realistic browsing pattern")
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
async def main():
"""Run all examples"""
# Run comparison tests
await compare_results()
# Show best practices
await stealth_best_practices()
print(f"\n{Fore.GREEN}Examples completed!{Style.RESET_ALL}")
print(f"\n{Fore.YELLOW}Remember:{Style.RESET_ALL}")
print(f"• Stealth mode helps with basic bot detection")
print(f"• Always respect website terms of service")
print(f"• Consider rate limiting and ethical scraping practices")
print(f"• For advanced protection, consider additional measures")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,215 +0,0 @@
"""
Quick Start: Using Stealth Mode in Crawl4AI
This example shows practical use cases for the stealth mode feature.
Stealth mode helps bypass basic bot detection mechanisms.
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def example_1_basic_stealth():
"""Example 1: Basic stealth mode usage"""
print("\n=== Example 1: Basic Stealth Mode ===")
# Enable stealth mode in browser config
browser_config = BrowserConfig(
enable_stealth=True, # This is the key parameter
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
print(f"✓ Crawled {result.url} successfully")
print(f"✓ Title: {result.metadata.get('title', 'N/A')}")
async def example_2_stealth_with_screenshot():
"""Example 2: Stealth mode with screenshot to show detection results"""
print("\n=== Example 2: Stealth Mode Visual Verification ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=False # Set to False to see the browser
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=config
)
if result.success:
print(f"✓ Successfully crawled bot detection site")
print(f"✓ With stealth enabled, many detection tests should show as passed")
if result.screenshot:
# Save screenshot for verification
import base64
with open("stealth_detection_results.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f"✓ Screenshot saved as 'stealth_detection_results.png'")
print(f" Check the screenshot to see detection results!")
async def example_3_stealth_for_protected_sites():
"""Example 3: Using stealth for sites with bot protection"""
print("\n=== Example 3: Stealth for Protected Sites ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=True,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Add human-like behavior
config = CrawlerRunConfig(
wait_until="networkidle",
delay_before_return_html=2.0, # Wait 2 seconds
js_code="""
// Simulate human-like scrolling
window.scrollTo({
top: document.body.scrollHeight / 2,
behavior: 'smooth'
});
"""
)
# Try accessing a site that might have bot protection
result = await crawler.arun(
url="https://www.g2.com/products/slack/reviews",
config=config
)
if result.success:
print(f"✓ Successfully accessed protected site")
print(f"✓ Retrieved {len(result.html)} characters of HTML")
else:
print(f"✗ Failed to access site: {result.error_message}")
async def example_4_stealth_with_sessions():
"""Example 4: Stealth mode with session management"""
print("\n=== Example 4: Stealth + Session Management ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
session_id = "my_stealth_session"
# First request - establish session
config = CrawlerRunConfig(
session_id=session_id,
wait_until="domcontentloaded"
)
result1 = await crawler.arun(
url="https://news.ycombinator.com",
config=config
)
print(f"✓ First request completed: {result1.url}")
# Second request - reuse session
await asyncio.sleep(2) # Brief delay between requests
result2 = await crawler.arun(
url="https://news.ycombinator.com/best",
config=config
)
print(f"✓ Second request completed: {result2.url}")
print(f"✓ Session reused, maintaining cookies and state")
async def example_5_stealth_comparison():
"""Example 5: Compare results with and without stealth using screenshots"""
print("\n=== Example 5: Stealth Mode Comparison ===")
test_url = "https://bot.sannysoft.com"
# First test WITHOUT stealth
print("\nWithout stealth:")
regular_config = BrowserConfig(
enable_stealth=False,
headless=True
)
async with AsyncWebCrawler(config=regular_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(url=test_url, config=config)
if result.success and result.screenshot:
import base64
with open("comparison_without_stealth.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f" ✓ Screenshot saved: comparison_without_stealth.png")
print(f" Many tests will show as FAILED (red)")
# Then test WITH stealth
print("\nWith stealth:")
stealth_config = BrowserConfig(
enable_stealth=True,
headless=True
)
async with AsyncWebCrawler(config=stealth_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(url=test_url, config=config)
if result.success and result.screenshot:
import base64
with open("comparison_with_stealth.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f" ✓ Screenshot saved: comparison_with_stealth.png")
print(f" More tests should show as PASSED (green)")
print("\nCompare the two screenshots to see the difference!")
async def main():
"""Run all examples"""
print("Crawl4AI Stealth Mode Examples")
print("==============================")
# Run basic example
await example_1_basic_stealth()
# Run screenshot verification example
await example_2_stealth_with_screenshot()
# Run protected site example
await example_3_stealth_for_protected_sites()
# Run session example
await example_4_stealth_with_sessions()
# Run comparison example
await example_5_stealth_comparison()
print("\n" + "="*50)
print("Tips for using stealth mode effectively:")
print("- Use realistic viewport sizes (1920x1080, 1366x768)")
print("- Add delays between requests to appear more human")
print("- Combine with session management for better results")
print("- Remember: stealth mode is for legitimate scraping only")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,62 +0,0 @@
"""
Simple test to verify stealth mode is working
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def test_stealth():
"""Test stealth mode effectiveness"""
# Test WITHOUT stealth
print("=== WITHOUT Stealth ===")
config1 = BrowserConfig(
headless=False,
enable_stealth=False
)
async with AsyncWebCrawler(config=config1) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(
wait_until="networkidle",
screenshot=True
)
)
print(f"Success: {result.success}")
# Take screenshot
if result.screenshot:
with open("without_stealth.png", "wb") as f:
import base64
f.write(base64.b64decode(result.screenshot))
print("Screenshot saved: without_stealth.png")
# Test WITH stealth
print("\n=== WITH Stealth ===")
config2 = BrowserConfig(
headless=False,
enable_stealth=True
)
async with AsyncWebCrawler(config=config2) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(
wait_until="networkidle",
screenshot=True
)
)
print(f"Success: {result.success}")
# Take screenshot
if result.screenshot:
with open("with_stealth.png", "wb") as f:
import base64
f.write(base64.b64decode(result.screenshot))
print("Screenshot saved: with_stealth.png")
print("\nCheck the screenshots to see the difference in bot detection results!")
if __name__ == "__main__":
asyncio.run(test_stealth())

View File

@@ -1,74 +0,0 @@
"""
Basic Undetected Browser Test
Simple example to test if undetected mode works
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig
async def test_regular_mode():
"""Test with regular browser"""
print("Testing Regular Browser Mode...")
browser_config = BrowserConfig(
headless=False,
verbose=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://www.example.com")
print(f"Regular Mode - Success: {result.success}")
print(f"Regular Mode - Status: {result.status_code}")
print(f"Regular Mode - Content length: {len(result.markdown.raw_markdown)}")
print(f"Regular Mode - First 100 chars: {result.markdown.raw_markdown[:100]}...")
return result.success
async def test_undetected_mode():
"""Test with undetected browser"""
print("\nTesting Undetected Browser Mode...")
from crawl4ai import UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
browser_config = BrowserConfig(
headless=False,
verbose=True
)
# Create undetected adapter
undetected_adapter = UndetectedAdapter()
# Create strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
result = await crawler.arun(url="https://www.example.com")
print(f"Undetected Mode - Success: {result.success}")
print(f"Undetected Mode - Status: {result.status_code}")
print(f"Undetected Mode - Content length: {len(result.markdown.raw_markdown)}")
print(f"Undetected Mode - First 100 chars: {result.markdown.raw_markdown[:100]}...")
return result.success
async def main():
"""Run both tests"""
print("🤖 Crawl4AI Basic Adapter Test\n")
# Test regular mode
regular_success = await test_regular_mode()
# Test undetected mode
undetected_success = await test_undetected_mode()
# Summary
print("\n" + "="*50)
print("Summary:")
print(f"Regular Mode: {'✅ Success' if regular_success else '❌ Failed'}")
print(f"Undetected Mode: {'✅ Success' if undetected_success else '❌ Failed'}")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,155 +0,0 @@
"""
Bot Detection Test - Compare Regular vs Undetected
Tests browser fingerprinting differences at bot.sannysoft.com
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter,
CrawlResult
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Bot detection test site
TEST_URL = "https://bot.sannysoft.com"
def analyze_bot_detection(result: CrawlResult) -> dict:
"""Analyze bot detection results from the page"""
detections = {
"webdriver": False,
"headless": False,
"automation": False,
"user_agent": False,
"total_tests": 0,
"failed_tests": 0
}
if not result.success or not result.html:
return detections
# Look for specific test results in the HTML
html_lower = result.html.lower()
# Check for common bot indicators
if "webdriver" in html_lower and ("fail" in html_lower or "true" in html_lower):
detections["webdriver"] = True
detections["failed_tests"] += 1
if "headless" in html_lower and ("fail" in html_lower or "true" in html_lower):
detections["headless"] = True
detections["failed_tests"] += 1
if "automation" in html_lower and "detected" in html_lower:
detections["automation"] = True
detections["failed_tests"] += 1
# Count total tests (approximate)
detections["total_tests"] = html_lower.count("test") + html_lower.count("check")
return detections
async def test_browser_mode(adapter_name: str, adapter=None):
"""Test a browser mode and return results"""
print(f"\n{'='*60}")
print(f"Testing: {adapter_name}")
print(f"{'='*60}")
browser_config = BrowserConfig(
headless=False, # Run in headed mode for better results
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
if adapter:
# Use undetected mode
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
crawler = AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
)
else:
# Use regular mode
crawler = AsyncWebCrawler(config=browser_config)
async with crawler:
config = CrawlerRunConfig(
delay_before_return_html=3.0, # Let detection scripts run
wait_for_images=True,
screenshot=True,
simulate_user=False, # Don't simulate for accurate detection
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
if result.success:
# Analyze detection results
detections = analyze_bot_detection(result)
print(f"\n🔍 Bot Detection Analysis:")
print(f" - WebDriver Detected: {'❌ Yes' if detections['webdriver'] else '✅ No'}")
print(f" - Headless Detected: {'❌ Yes' if detections['headless'] else '✅ No'}")
print(f" - Automation Detected: {'❌ Yes' if detections['automation'] else '✅ No'}")
print(f" - Failed Tests: {detections['failed_tests']}")
# Show some content
if result.markdown.raw_markdown:
print(f"\nContent preview:")
lines = result.markdown.raw_markdown.split('\n')
for line in lines[:20]: # Show first 20 lines
if any(keyword in line.lower() for keyword in ['test', 'pass', 'fail', 'yes', 'no']):
print(f" {line.strip()}")
return result, detections if result.success else {}
async def main():
"""Run the comparison"""
print("🤖 Crawl4AI - Bot Detection Test")
print(f"Testing at: {TEST_URL}")
print("This site runs various browser fingerprinting tests\n")
# Test regular browser
regular_result, regular_detections = await test_browser_mode("Regular Browser")
# Small delay
await asyncio.sleep(2)
# Test undetected browser
undetected_adapter = UndetectedAdapter()
undetected_result, undetected_detections = await test_browser_mode(
"Undetected Browser",
undetected_adapter
)
# Summary comparison
print(f"\n{'='*60}")
print("COMPARISON SUMMARY")
print(f"{'='*60}")
print(f"\n{'Test':<25} {'Regular':<15} {'Undetected':<15}")
print(f"{'-'*55}")
if regular_detections and undetected_detections:
print(f"{'WebDriver Detection':<25} {'❌ Detected' if regular_detections['webdriver'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['webdriver'] else '✅ Passed':<15}")
print(f"{'Headless Detection':<25} {'❌ Detected' if regular_detections['headless'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['headless'] else '✅ Passed':<15}")
print(f"{'Automation Detection':<25} {'❌ Detected' if regular_detections['automation'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['automation'] else '✅ Passed':<15}")
print(f"{'Failed Tests':<25} {regular_detections['failed_tests']:<15} {undetected_detections['failed_tests']:<15}")
print(f"\n{'='*60}")
if undetected_detections.get('failed_tests', 0) < regular_detections.get('failed_tests', 1):
print("✅ Undetected browser performed better at evading detection!")
else:
print(" Both browsers had similar detection results")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,164 +0,0 @@
"""
Undetected Browser Test - Cloudflare Protected Site
Tests the difference between regular and undetected modes on a Cloudflare-protected site
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Test URL with Cloudflare protection
TEST_URL = "https://nowsecure.nl"
async def test_regular_browser():
"""Test with regular browser - likely to be blocked"""
print("=" * 60)
print("Testing with Regular Browser")
print("=" * 60)
browser_config = BrowserConfig(
headless=False,
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
delay_before_return_html=2.0,
simulate_user=True,
magic=True, # Try with magic mode too
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
print(f"✓ HTML Length: {len(result.html)}")
# Check for Cloudflare challenge
if result.html:
cf_indicators = [
"Checking your browser",
"Please stand by",
"cloudflare",
"cf-browser-verification",
"Access denied",
"Ray ID"
]
detected = False
for indicator in cf_indicators:
if indicator.lower() in result.html.lower():
print(f"⚠️ Cloudflare Challenge Detected: '{indicator}' found")
detected = True
break
if not detected and len(result.markdown.raw_markdown) > 100:
print("✅ Successfully bypassed Cloudflare!")
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
elif not detected:
print("⚠️ Page loaded but content seems minimal")
return result
async def test_undetected_browser():
"""Test with undetected browser - should bypass Cloudflare"""
print("\n" + "=" * 60)
print("Testing with Undetected Browser")
print("=" * 60)
browser_config = BrowserConfig(
headless=False, # Headless is easier to detect
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
# Create undetected adapter
undetected_adapter = UndetectedAdapter()
# Create strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
config = CrawlerRunConfig(
delay_before_return_html=2.0,
simulate_user=True,
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
print(f"✓ HTML Length: {len(result.html)}")
# Check for Cloudflare challenge
if result.html:
cf_indicators = [
"Checking your browser",
"Please stand by",
"cloudflare",
"cf-browser-verification",
"Access denied",
"Ray ID"
]
detected = False
for indicator in cf_indicators:
if indicator.lower() in result.html.lower():
print(f"⚠️ Cloudflare Challenge Detected: '{indicator}' found")
detected = True
break
if not detected and len(result.markdown.raw_markdown) > 100:
print("✅ Successfully bypassed Cloudflare!")
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
elif not detected:
print("⚠️ Page loaded but content seems minimal")
return result
async def main():
"""Compare regular vs undetected browser"""
print("🤖 Crawl4AI - Cloudflare Bypass Test")
print(f"Testing URL: {TEST_URL}\n")
# Test regular browser
regular_result = await test_regular_browser()
# Small delay
await asyncio.sleep(2)
# Test undetected browser
undetected_result = await test_undetected_browser()
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Regular Browser:")
print(f" - Success: {regular_result.success}")
print(f" - Content Length: {len(regular_result.markdown.raw_markdown) if regular_result.markdown else 0}")
print(f"\nUndetected Browser:")
print(f" - Success: {undetected_result.success}")
print(f" - Content Length: {len(undetected_result.markdown.raw_markdown) if undetected_result.markdown else 0}")
if undetected_result.success and len(undetected_result.markdown.raw_markdown) > len(regular_result.markdown.raw_markdown):
print("\n✅ Undetected browser successfully bypassed protection!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,184 +0,0 @@
"""
Undetected vs Regular Browser Comparison
This example demonstrates the difference between regular and undetected browser modes
when accessing sites with bot detection services.
Based on tested anti-bot services:
- Cloudflare
- Kasada
- Akamai
- DataDome
- Bet365
- And others
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
PlaywrightAdapter,
UndetectedAdapter,
CrawlResult
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Test URLs for various bot detection services
TEST_SITES = {
"Cloudflare Protected": "https://nowsecure.nl",
# "Bot Detection Test": "https://bot.sannysoft.com",
# "Fingerprint Test": "https://fingerprint.com/products/bot-detection",
# "Browser Scan": "https://browserscan.net",
# "CreepJS": "https://abrahamjuliot.github.io/creepjs",
}
async def test_with_adapter(url: str, adapter_name: str, adapter):
"""Test a URL with a specific adapter"""
browser_config = BrowserConfig(
headless=False, # Better for avoiding detection
viewport_width=1920,
viewport_height=1080,
verbose=True,
)
# Create the crawler strategy with the adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
print(f"\n{'='*60}")
print(f"Testing with {adapter_name} adapter")
print(f"URL: {url}")
print(f"{'='*60}")
try:
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
crawler_config = CrawlerRunConfig(
delay_before_return_html=3.0, # Give page time to load
wait_for_images=True,
screenshot=True,
simulate_user=True, # Add user simulation
)
result: CrawlResult = await crawler.arun(
url=url,
config=crawler_config
)
# Check results
print(f"✓ Status Code: {result.status_code}")
print(f"✓ Success: {result.success}")
print(f"✓ HTML Length: {len(result.html)}")
print(f"✓ Markdown Length: {len(result.markdown.raw_markdown)}")
# Check for common bot detection indicators
detection_indicators = [
"Access denied",
"Please verify you are human",
"Checking your browser",
"Enable JavaScript",
"captcha",
"403 Forbidden",
"Bot detection",
"Security check"
]
content_lower = result.markdown.raw_markdown.lower()
detected = False
for indicator in detection_indicators:
if indicator.lower() in content_lower:
print(f"⚠️ Possible detection: Found '{indicator}'")
detected = True
break
if not detected:
print("✅ No obvious bot detection triggered!")
# Show first 200 chars of content
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
return result.success and not detected
except Exception as e:
print(f"❌ Error: {str(e)}")
return False
async def compare_adapters(url: str, site_name: str):
"""Compare regular and undetected adapters on the same URL"""
print(f"\n{'#'*60}")
print(f"# Testing: {site_name}")
print(f"{'#'*60}")
# Test with regular adapter
regular_adapter = PlaywrightAdapter()
regular_success = await test_with_adapter(url, "Regular", regular_adapter)
# Small delay between tests
await asyncio.sleep(2)
# Test with undetected adapter
undetected_adapter = UndetectedAdapter()
undetected_success = await test_with_adapter(url, "Undetected", undetected_adapter)
# Summary
print(f"\n{'='*60}")
print(f"Summary for {site_name}:")
print(f"Regular Adapter: {'✅ Passed' if regular_success else '❌ Blocked/Detected'}")
print(f"Undetected Adapter: {'✅ Passed' if undetected_success else '❌ Blocked/Detected'}")
print(f"{'='*60}")
return regular_success, undetected_success
async def main():
"""Run comparison tests on multiple sites"""
print("🤖 Crawl4AI Browser Adapter Comparison")
print("Testing regular vs undetected browser modes\n")
results = {}
# Test each site
for site_name, url in TEST_SITES.items():
regular, undetected = await compare_adapters(url, site_name)
results[site_name] = {
"regular": regular,
"undetected": undetected
}
# Delay between different sites
await asyncio.sleep(3)
# Final summary
print(f"\n{'#'*60}")
print("# FINAL RESULTS")
print(f"{'#'*60}")
print(f"{'Site':<30} {'Regular':<15} {'Undetected':<15}")
print(f"{'-'*60}")
for site, result in results.items():
regular_status = "✅ Passed" if result["regular"] else "❌ Blocked"
undetected_status = "✅ Passed" if result["undetected"] else "❌ Blocked"
print(f"{site:<30} {regular_status:<15} {undetected_status:<15}")
# Calculate success rates
regular_success = sum(1 for r in results.values() if r["regular"])
undetected_success = sum(1 for r in results.values() if r["undetected"])
total = len(results)
print(f"\n{'='*60}")
print(f"Success Rates:")
print(f"Regular Adapter: {regular_success}/{total} ({regular_success/total*100:.1f}%)")
print(f"Undetected Adapter: {undetected_success}/{total} ({undetected_success/total*100:.1f}%)")
print(f"{'='*60}")
if __name__ == "__main__":
# Note: This example may take a while to run as it tests multiple sites
# You can comment out sites in TEST_SITES to run faster tests
asyncio.run(main())

View File

@@ -1,118 +0,0 @@
"""
Simple Undetected Browser Demo
Demonstrates the basic usage of undetected browser mode
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def crawl_with_regular_browser(url: str):
"""Crawl with regular browser"""
print("\n[Regular Browser Mode]")
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
delay_before_return_html=2.0
)
)
print(f"Success: {result.success}")
print(f"Status: {result.status_code}")
print(f"Content length: {len(result.markdown.raw_markdown)}")
# Check for bot detection keywords
content = result.markdown.raw_markdown.lower()
if any(word in content for word in ["cloudflare", "checking your browser", "please wait"]):
print("⚠️ Bot detection triggered!")
else:
print("✅ Page loaded successfully")
return result
async def crawl_with_undetected_browser(url: str):
"""Crawl with undetected browser"""
print("\n[Undetected Browser Mode]")
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create undetected adapter and strategy
undetected_adapter = UndetectedAdapter()
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
delay_before_return_html=2.0
)
)
print(f"Success: {result.success}")
print(f"Status: {result.status_code}")
print(f"Content length: {len(result.markdown.raw_markdown)}")
# Check for bot detection keywords
content = result.markdown.raw_markdown.lower()
if any(word in content for word in ["cloudflare", "checking your browser", "please wait"]):
print("⚠️ Bot detection triggered!")
else:
print("✅ Page loaded successfully")
return result
async def main():
"""Demo comparing regular vs undetected modes"""
print("🤖 Crawl4AI Undetected Browser Demo")
print("="*50)
# Test URLs - you can change these
test_urls = [
"https://www.example.com", # Simple site
"https://httpbin.org/headers", # Shows request headers
]
for url in test_urls:
print(f"\n📍 Testing URL: {url}")
# Test with regular browser
regular_result = await crawl_with_regular_browser(url)
# Small delay
await asyncio.sleep(2)
# Test with undetected browser
undetected_result = await crawl_with_undetected_browser(url)
# Compare results
print(f"\n📊 Comparison for {url}:")
print(f"Regular browser content: {len(regular_result.markdown.raw_markdown)} chars")
print(f"Undetected browser content: {len(undetected_result.markdown.raw_markdown)} chars")
if url == "https://httpbin.org/headers":
# Show headers for comparison
print("\nHeaders seen by server:")
print("Regular:", regular_result.markdown.raw_markdown[:500])
print("\nUndetected:", undetected_result.markdown.raw_markdown[:500])
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -358,77 +358,9 @@ if __name__ == "__main__":
---
---
## 7. Anti-Bot Features (Stealth Mode & Undetected Browser)
Crawl4AI provides two powerful features to bypass bot detection:
### 7.1 Stealth Mode
Stealth mode uses playwright-stealth to modify browser fingerprints and behaviors. Enable it with a simple flag:
```python
browser_config = BrowserConfig(
enable_stealth=True, # Activates stealth mode
headless=False
)
```
**When to use**: Sites with basic bot detection (checking navigator.webdriver, plugins, etc.)
### 7.2 Undetected Browser
For advanced bot detection, use the undetected browser adapter:
```python
from crawl4ai import UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create undetected adapter
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(crawler_strategy=strategy, config=browser_config) as crawler:
# Your crawling code
```
**When to use**: Sites with sophisticated bot detection (Cloudflare, DataDome, etc.)
### 7.3 Combining Both
For maximum evasion, combine stealth mode with undetected browser:
```python
browser_config = BrowserConfig(
enable_stealth=True, # Enable stealth
headless=False
)
adapter = UndetectedAdapter() # Use undetected browser
```
### Choosing the Right Approach
| Detection Level | Recommended Approach |
|----------------|---------------------|
| No protection | Regular browser |
| Basic checks | Regular + Stealth mode |
| Advanced protection | Undetected browser |
| Maximum evasion | Undetected + Stealth mode |
**Best Practice**: Start with regular browser + stealth mode. Only use undetected browser if needed, as it may be slightly slower.
See [Undetected Browser Mode](undetected-browser.md) for detailed examples.
---
## Conclusion & Next Steps
You've now explored several **advanced** features:
Youve now explored several **advanced** features:
- **Proxy Usage**
- **PDF & Screenshot** capturing for large or critical pages
@@ -436,10 +368,7 @@ You've now explored several **advanced** features:
- **Custom Headers** for language or specialized requests
- **Session Persistence** via storage state
- **Robots.txt Compliance**
- **Anti-Bot Features** (Stealth Mode & Undetected Browser)
With these power tools, you can build robust scraping workflows that mimic real user behavior, handle secure sites, capture detailed snapshots, manage sessions across multiple runs, and bypass bot detection—streamlining your entire data collection pipeline.
With these power tools, you can build robust scraping workflows that mimic real user behavior, handle secure sites, capture detailed snapshots, and manage sessions across multiple runs—streamlining your entire data collection pipeline.
**Note**: In future versions, we may enable stealth mode and undetected browser by default. For now, users should explicitly enable these features when needed.
**Last Updated**: 2025-01-17
**Last Updated**: 2025-01-01

View File

@@ -1,394 +0,0 @@
# Undetected Browser Mode
## Overview
Crawl4AI offers two powerful anti-bot features to help you access websites with bot detection:
1. **Stealth Mode** - Uses playwright-stealth to modify browser fingerprints and behaviors
2. **Undetected Browser Mode** - Advanced browser adapter with deep-level patches for sophisticated bot detection
This guide covers both features and helps you choose the right approach for your needs.
## Anti-Bot Features Comparison
| Feature | Regular Browser | Stealth Mode | Undetected Browser |
|---------|----------------|--------------|-------------------|
| WebDriver Detection | ❌ | ✅ | ✅ |
| Navigator Properties | ❌ | ✅ | ✅ |
| Plugin Emulation | ❌ | ✅ | ✅ |
| CDP Detection | ❌ | Partial | ✅ |
| Deep Browser Patches | ❌ | ❌ | ✅ |
| Performance Impact | None | Minimal | Moderate |
| Setup Complexity | None | None | Minimal |
## When to Use Each Approach
### Use Regular Browser + Stealth Mode When:
- Sites have basic bot detection (checking navigator.webdriver, plugins, etc.)
- You need good performance with basic protection
- Sites check for common automation indicators
### Use Undetected Browser When:
- Sites employ sophisticated bot detection services (Cloudflare, DataDome, etc.)
- Stealth mode alone isn't sufficient
- You're willing to trade some performance for better evasion
### Best Practice: Progressive Enhancement
1. **Start with**: Regular browser + Stealth mode
2. **If blocked**: Switch to Undetected browser
3. **If still blocked**: Combine Undetected browser + Stealth mode
## Stealth Mode
Stealth mode is the simpler anti-bot solution that works with both regular and undetected browsers:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
# Enable stealth mode with regular browser
browser_config = BrowserConfig(
enable_stealth=True, # Simple flag to enable
headless=False # Better for avoiding detection
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://example.com")
```
### What Stealth Mode Does:
- Removes `navigator.webdriver` flag
- Modifies browser fingerprints
- Emulates realistic plugin behavior
- Adjusts navigator properties
- Fixes common automation leaks
## Undetected Browser Mode
For sites with sophisticated bot detection that stealth mode can't bypass, use the undetected browser adapter:
### Key Features
- **Drop-in Replacement**: Uses the same API as regular browser mode
- **Enhanced Stealth**: Built-in patches to evade common detection methods
- **Browser Adapter Pattern**: Seamlessly switch between regular and undetected modes
- **Automatic Installation**: `crawl4ai-setup` installs all necessary browser dependencies
### Quick Start
```python
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create browser config
browser_config = BrowserConfig(
headless=False, # Headless mode can be detected easier
verbose=True,
)
# Create the crawler strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Your crawling code here
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig()
)
print(result.markdown[:500])
asyncio.run(main())
```
## Combining Both Features
For maximum evasion, combine stealth mode with undetected browser:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig, UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create browser config with stealth enabled
browser_config = BrowserConfig(
enable_stealth=True, # Enable stealth mode
headless=False
)
# Create undetected adapter
adapter = UndetectedAdapter()
# Create strategy with both features
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://protected-site.com")
```
## Examples
### Example 1: Basic Stealth Mode
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def test_stealth_mode():
# Simple stealth mode configuration
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(screenshot=True)
)
if result.success:
print("✓ Successfully accessed bot detection test site")
# Save screenshot to verify detection results
if result.screenshot:
import base64
with open("stealth_test.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print("✓ Screenshot saved - check for green (passed) tests")
asyncio.run(test_stealth_mode())
```
### Example 2: Undetected Browser Mode
```python
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create browser config
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create the crawler strategy with the undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Configure the crawl
crawler_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter()
),
capture_console_messages=True, # Test adapter console capture
)
# Test on a site that typically detects bots
print("Testing undetected adapter...")
result: CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config
)
print(f"Status: {result.status_code}")
print(f"Success: {result.success}")
print(f"Console messages captured: {len(result.console_messages or [])}")
print(f"Markdown content (first 500 chars):\n{result.markdown.raw_markdown[:500]}")
if __name__ == "__main__":
asyncio.run(main())
```
## Browser Adapter Pattern
The undetected browser support is implemented using an adapter pattern, allowing seamless switching between different browser implementations:
```python
# Regular browser adapter (default)
from crawl4ai import PlaywrightAdapter
regular_adapter = PlaywrightAdapter()
# Undetected browser adapter
from crawl4ai import UndetectedAdapter
undetected_adapter = UndetectedAdapter()
```
The adapter handles:
- JavaScript execution
- Console message capture
- Error handling
- Browser-specific optimizations
## Best Practices
1. **Avoid Headless Mode**: Detection is easier in headless mode
```python
browser_config = BrowserConfig(headless=False)
```
2. **Use Reasonable Delays**: Don't rush through pages
```python
crawler_config = CrawlerRunConfig(
wait_time=3.0, # Wait 3 seconds after page load
delay_before_return_html=2.0 # Additional delay
)
```
3. **Rotate User Agents**: You can customize user agents
```python
browser_config = BrowserConfig(
headers={"User-Agent": "your-user-agent"}
)
```
4. **Handle Failures Gracefully**: Some sites may still detect and block
```python
if not result.success:
print(f"Crawl failed: {result.error_message}")
```
## Advanced Usage Tips
### Progressive Detection Handling
```python
async def crawl_with_progressive_evasion(url):
# Step 1: Try regular browser with stealth
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url)
if result.success and "Access Denied" not in result.html:
return result
# Step 2: If blocked, try undetected browser
print("Regular + stealth blocked, trying undetected browser...")
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun(url)
return result
```
## Installation
The undetected browser dependencies are automatically installed when you run:
```bash
crawl4ai-setup
```
This command installs all necessary browser dependencies for both regular and undetected modes.
## Limitations
- **Performance**: Slightly slower than regular mode due to additional patches
- **Headless Detection**: Some sites can still detect headless mode
- **Resource Usage**: May use more resources than regular mode
- **Not 100% Guaranteed**: Advanced anti-bot services are constantly evolving
## Troubleshooting
### Browser Not Found
Run the setup command:
```bash
crawl4ai-setup
```
### Detection Still Occurring
Try combining with other features:
```python
crawler_config = CrawlerRunConfig(
simulate_user=True, # Add user simulation
magic=True, # Enable magic mode
wait_time=5.0, # Longer waits
)
```
### Performance Issues
If experiencing slow performance:
```python
# Use selective undetected mode only for protected sites
if is_protected_site(url):
adapter = UndetectedAdapter()
else:
adapter = PlaywrightAdapter() # Default adapter
```
## Future Plans
**Note**: In future versions of Crawl4AI, we may enable stealth mode and undetected browser by default to provide better out-of-the-box success rates. For now, users should explicitly enable these features when needed.
## Conclusion
Crawl4AI provides flexible anti-bot solutions:
1. **Start Simple**: Use regular browser + stealth mode for most sites
2. **Escalate if Needed**: Switch to undetected browser for sophisticated protection
3. **Combine for Maximum Effect**: Use both features together when facing the toughest challenges
Remember:
- Always respect robots.txt and website terms of service
- Use appropriate delays to avoid overwhelming servers
- Consider the performance trade-offs of each approach
- Test progressively to find the minimum necessary evasion level
## See Also
- [Advanced Features](advanced-features.md) - Overview of all advanced features
- [Proxy & Security](proxy-security.md) - Using proxies with anti-bot features
- [Session Management](session-management.md) - Maintaining sessions across requests
- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies

View File

@@ -29,7 +29,6 @@ class BrowserConfig:
text_mode=False,
light_mode=False,
extra_args=None,
enable_stealth=False,
# ... other advanced parameters omitted here
):
...
@@ -85,11 +84,6 @@ class BrowserConfig:
- Additional flags for the underlying browser.
- E.g. `["--disable-extensions"]`.
11. **`enable_stealth`**:
- If `True`, enables stealth mode using playwright-stealth.
- Modifies browser fingerprints to avoid basic bot detection.
- Default is `False`. Recommended for sites with bot protection.
### Helper Methods
Both configuration classes provide a `clone()` method to create modified copies:

View File

@@ -187,7 +187,7 @@ Here:
---
## 5. More Fields: Links, Media, Tables and More
## 5. More Fields: Links, Media, and More
### 5.1 `links`
@@ -207,77 +207,7 @@ for img in images:
print("Image URL:", img["src"], "Alt:", img.get("alt"))
```
### 5.3 `tables`
The `tables` field contains structured data extracted from HTML tables found on the crawled page. Tables are analyzed based on various criteria to determine if they are actual data tables (as opposed to layout tables), including:
- Presence of thead and tbody sections
- Use of th elements for headers
- Column consistency
- Text density
- And other factors
Tables that score above the threshold (default: 7) are extracted and stored in result.tables.
### Accessing Table data:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async def main():
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://www.w3schools.com/html/html_tables.asp",
config=CrawlerRunConfig(
table_score_threshold=7 # Minimum score for table detection
)
)
if result.success and result.tables:
print(f"Found {len(result.tables)} tables")
for i, table in enumerate(result.tables):
print(f"\nTable {i+1}:")
print(f"Caption: {table.get('caption', 'No caption')}")
print(f"Headers: {table['headers']}")
print(f"Rows: {len(table['rows'])}")
# Print first few rows as example
for j, row in enumerate(table['rows'][:3]):
print(f" Row {j+1}: {row}")
if __name__ == "__main__":
asyncio.run(main())
```
### Configuring Table Extraction:
You can adjust the sensitivity of the table detection algorithm with:
```python
config = CrawlerRunConfig(
table_score_threshold=5 # Lower value = more tables detected (default: 7)
)
```
Each extracted table contains:
- `headers`: Column header names
- `rows`: List of rows, each containing cell values
- `caption`: Table caption text (if available)
- `summary`: Table summary attribute (if specified)
### Table Extraction Tips
- Not all HTML tables are extracted - only those detected as "data tables" vs. layout tables.
- Tables with inconsistent cell counts, nested tables, or those used purely for layout may be skipped.
- If you're missing tables, try adjusting the `table_score_threshold` to a lower value (default is 7).
The table detection algorithm scores tables based on features like consistent columns, presence of headers, text density, and more. Tables scoring above the threshold are considered data tables worth extracting.
### 5.4 `screenshot`, `pdf`, and `mhtml`
### 5.3 `screenshot`, `pdf`, and `mhtml`
If you set `screenshot=True`, `pdf=True`, or `capture_mhtml=True` in **`CrawlerRunConfig`**, then:
@@ -298,7 +228,7 @@ if result.mhtml:
The MHTML (MIME HTML) format is particularly useful as it captures the entire web page including all of its resources (CSS, images, scripts, etc.) in a single file, making it perfect for archiving or offline viewing.
### 5.5 `ssl_certificate`
### 5.4 `ssl_certificate`
If `fetch_ssl_certificate=True`, `result.ssl_certificate` holds details about the sites SSL cert, such as issuer, validity dates, etc.

View File

@@ -405,409 +405,6 @@ Executes JavaScript snippets on the specified URL and returns the full crawl res
---
## User-Provided Hooks API
The Docker API supports user-provided hook functions, allowing you to customize the crawling behavior by injecting your own Python code at specific points in the crawling pipeline. This powerful feature enables authentication, performance optimization, and custom content extraction without modifying the server code.
> ⚠️ **IMPORTANT SECURITY WARNING**:
> - **Never use hooks with untrusted code or on untrusted websites**
> - **Be extremely careful when crawling sites that might be phishing or malicious**
> - **Hook code has access to page context and can interact with the website**
> - **Always validate and sanitize any data extracted through hooks**
> - **Never expose credentials or sensitive data in hook code**
> - **Consider running the Docker container in an isolated network when testing**
### Hook Information Endpoint
```
GET /hooks/info
```
Returns information about available hook points and their signatures:
```bash
curl http://localhost:11235/hooks/info
```
### Available Hook Points
The API supports 8 hook points that match the local SDK:
| Hook Point | Parameters | Description | Best Use Cases |
|------------|------------|-------------|----------------|
| `on_browser_created` | `browser` | After browser instance creation | Light setup tasks |
| `on_page_context_created` | `page, context` | After page/context creation | **Authentication, cookies, route blocking** |
| `before_goto` | `page, context, url` | Before navigating to URL | Custom headers, logging |
| `after_goto` | `page, context, url, response` | After navigation completes | Verification, waiting for elements |
| `on_user_agent_updated` | `page, context, user_agent` | When user agent changes | UA-specific logic |
| `on_execution_started` | `page, context` | When JS execution begins | JS-related setup |
| `before_retrieve_html` | `page, context` | Before getting final HTML | **Scrolling, lazy loading** |
| `before_return_html` | `page, context, html` | Before returning HTML | Final modifications, metrics |
### Using Hooks in Requests
Add hooks to any crawl request by including the `hooks` parameter:
```json
{
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": {
"hook_point_name": "async def hook(...): ...",
"another_hook": "async def hook(...): ..."
},
"timeout": 30 // Optional, default 30 seconds (max 120)
}
}
```
### Hook Examples with Real URLs
#### 1. Authentication with Cookies (GitHub)
```python
import requests
# Example: Setting GitHub session cookie (use your actual session)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Add authentication cookies for GitHub
# WARNING: Never hardcode real credentials!
await context.add_cookies([
{
'name': 'user_session',
'value': 'your_github_session_token', # Replace with actual token
'domain': '.github.com',
'path': '/',
'httpOnly': True,
'secure': True,
'sameSite': 'Lax'
}
])
return page
"""
}
response = requests.post("http://localhost:11235/crawl", json={
"urls": ["https://github.com/settings/profile"], # Protected page
"hooks": {"code": hooks_code, "timeout": 30}
})
```
#### 2. Basic Authentication (httpbin.org for testing)
```python
# Safe testing with httpbin.org (a service designed for HTTP testing)
hooks_code = {
"before_goto": """
async def hook(page, context, url, **kwargs):
import base64
# httpbin.org/basic-auth expects username="user" and password="passwd"
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}'
})
return page
"""
}
response = requests.post("http://localhost:11235/crawl", json={
"urls": ["https://httpbin.org/basic-auth/user/passwd"],
"hooks": {"code": hooks_code, "timeout": 15}
})
```
#### 3. Performance Optimization (News Sites)
```python
# Example: Optimizing crawling of news sites like CNN or BBC
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Block images, fonts, and media to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg,ico}", lambda route: route.abort())
await context.route("**/*.{woff,woff2,ttf,otf,eot}", lambda route: route.abort())
await context.route("**/*.{mp4,webm,ogg,mp3,wav,flac}", lambda route: route.abort())
# Block common tracking and ad domains
await context.route("**/googletagmanager.com/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
await context.route("**/doubleclick.net/*", lambda route: route.abort())
await context.route("**/facebook.com/tr/*", lambda route: route.abort())
await context.route("**/amazon-adsystem.com/*", lambda route: route.abort())
# Disable CSS animations for faster rendering
await page.add_style_tag(content='''
*, *::before, *::after {
animation-duration: 0s !important;
transition-duration: 0s !important;
}
''')
return page
"""
}
response = requests.post("http://localhost:11235/crawl", json={
"urls": ["https://www.bbc.com/news"], # Heavy news site
"hooks": {"code": hooks_code, "timeout": 30}
})
```
#### 4. Handling Infinite Scroll (Twitter/X)
```python
# Example: Scrolling on Twitter/X (requires authentication)
hooks_code = {
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Scroll to load more tweets
previous_height = 0
for i in range(5): # Limit scrolls to avoid infinite loop
current_height = await page.evaluate("document.body.scrollHeight")
if current_height == previous_height:
break # No more content to load
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(2000) # Wait for content to load
previous_height = current_height
return page
"""
}
# Note: Twitter requires authentication for most content
response = requests.post("http://localhost:11235/crawl", json={
"urls": ["https://twitter.com/nasa"], # Public profile
"hooks": {"code": hooks_code, "timeout": 30}
})
```
#### 5. E-commerce Login (Example Pattern)
```python
# SECURITY WARNING: This is a pattern example.
# Never use real credentials in code!
# Always use environment variables or secure vaults.
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Example pattern for e-commerce sites
# DO NOT use real credentials here!
# Navigate to login page first
await page.goto("https://example-shop.com/login")
# Wait for login form to load
await page.wait_for_selector("#email", timeout=5000)
# Fill login form (use environment variables in production!)
await page.fill("#email", "test@example.com") # Never use real email
await page.fill("#password", "test_password") # Never use real password
# Handle "Remember Me" checkbox if present
try:
await page.uncheck("#remember_me") # Don't remember on shared systems
except:
pass
# Submit form
await page.click("button[type='submit']")
# Wait for redirect after login
await page.wait_for_url("**/account/**", timeout=10000)
return page
"""
}
```
#### 6. Extracting Structured Data (Wikipedia)
```python
# Safe example using Wikipedia
hooks_code = {
"after_goto": """
async def hook(page, context, url, response, **kwargs):
# Wait for Wikipedia content to load
await page.wait_for_selector("#content", timeout=5000)
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Extract structured data from Wikipedia infobox
metadata = await page.evaluate('''() => {
const infobox = document.querySelector('.infobox');
if (!infobox) return null;
const data = {};
const rows = infobox.querySelectorAll('tr');
rows.forEach(row => {
const header = row.querySelector('th');
const value = row.querySelector('td');
if (header && value) {
data[header.innerText.trim()] = value.innerText.trim();
}
});
return data;
}''')
if metadata:
console.log("Extracted metadata:", metadata)
return page
"""
}
response = requests.post("http://localhost:11235/crawl", json={
"urls": ["https://en.wikipedia.org/wiki/Python_(programming_language)"],
"hooks": {"code": hooks_code, "timeout": 20}
})
```
### Security Best Practices
> 🔒 **Critical Security Guidelines**:
1. **Never Trust User Input**: If accepting hook code from users, always validate and sandbox it
2. **Avoid Phishing Sites**: Never use hooks on suspicious or unverified websites
3. **Protect Credentials**:
- Never hardcode passwords, tokens, or API keys in hook code
- Use environment variables or secure secret management
- Rotate credentials regularly
4. **Network Isolation**: Run the Docker container in an isolated network when testing
5. **Audit Hook Code**: Always review hook code before execution
6. **Limit Permissions**: Use the least privileged access needed
7. **Monitor Execution**: Check hook execution logs for suspicious behavior
8. **Timeout Protection**: Always set reasonable timeouts (default 30s)
### Hook Response Information
When hooks are used, the response includes detailed execution information:
```json
{
"success": true,
"results": [...],
"hooks": {
"status": {
"status": "success", // or "partial" or "failed"
"attached_hooks": ["on_page_context_created", "before_retrieve_html"],
"validation_errors": [],
"successfully_attached": 2,
"failed_validation": 0
},
"execution_log": [
{
"hook_point": "on_page_context_created",
"status": "success",
"execution_time": 0.523,
"timestamp": 1234567890.123
}
],
"errors": [], // Any runtime errors
"summary": {
"total_executions": 2,
"successful": 2,
"failed": 0,
"timed_out": 0,
"success_rate": 100.0
}
}
}
```
### Error Handling
The hooks system is designed to be resilient:
1. **Validation Errors**: Caught before execution (syntax errors, wrong parameters)
2. **Runtime Errors**: Handled gracefully - crawl continues with original page object
3. **Timeout Protection**: Hooks automatically terminated after timeout (configurable 1-120s)
### Complete Example: Safe Multi-Hook Crawling
```python
import requests
import json
import os
# Safe example using httpbin.org for testing
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Set viewport and test cookies
await page.set_viewport_size({"width": 1920, "height": 1080})
await context.add_cookies([
{"name": "test_cookie", "value": "test_value", "domain": ".httpbin.org", "path": "/"}
])
# Block unnecessary resources for httpbin
await context.route("**/*.{png,jpg,jpeg}", lambda route: route.abort())
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
# Add custom headers for testing
await page.set_extra_http_headers({
"X-Test-Header": "crawl4ai-test",
"Accept-Language": "en-US,en;q=0.9"
})
print(f"[HOOK] Navigating to: {url}")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Simple scroll for any lazy-loaded content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
return page
"""
}
# Make the request to safe testing endpoints
response = requests.post("http://localhost:11235/crawl", json={
"urls": [
"https://httpbin.org/html",
"https://httpbin.org/json"
],
"hooks": {
"code": hooks_code,
"timeout": 30
},
"crawler_config": {
"cache_mode": "bypass"
}
})
# Check results
if response.status_code == 200:
data = response.json()
# Check hook execution
if data['hooks']['status']['status'] == 'success':
print(f"✅ All {len(data['hooks']['status']['attached_hooks'])} hooks executed successfully")
print(f"Execution stats: {data['hooks']['summary']}")
# Process crawl results
for result in data['results']:
print(f"Crawled: {result['url']} - Success: {result['success']}")
else:
print(f"Error: {response.status_code}")
```
> 💡 **Remember**: Always test your hooks on safe, known websites first before using them on production sites. Never crawl sites that you don't have permission to access or that might be malicious.
---
## Dockerfile Parameters
You can customize the image build process using build arguments (`--build-arg`). These are typically used via `docker buildx build` or within the `docker-compose.yml` file.

View File

@@ -54,16 +54,6 @@ This page provides a comprehensive list of example scripts that demonstrate vari
| Crypto Analysis | Demonstrates how to crawl and analyze cryptocurrency data. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/crypto_analysis_example.py) |
| SERP API | Demonstrates using Crawl4AI with search engine result pages. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/serp_api_project_11_feb.py) |
## Anti-Bot & Stealth Features
| Example | Description | Link |
|---------|-------------|------|
| Stealth Mode Quick Start | Five practical examples showing how to use stealth mode for bypassing basic bot detection. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/stealth_mode_quick_start.py) |
| Stealth Mode Comprehensive | Comprehensive demonstration of stealth mode features with bot detection testing and comparisons. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/stealth_mode_example.py) |
| Undetected Browser | Simple example showing how to use the undetected browser adapter. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/hello_world_undetected.py) |
| Undetected Browser Demo | Basic demo comparing regular and undetected browser modes. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/undetected_simple_demo.py) |
| Undetected Tests | Advanced tests comparing regular vs undetected browsers on various bot detection services. | [View Folder](https://github.com/unclecode/crawl4ai/tree/main/docs/examples/undetectability/) |
## Customization & Security
| Example | Description | Link |

View File

@@ -18,7 +18,7 @@ crawl4ai-setup
```
**What does it do?**
- Installs or updates required browser dependencies for both regular and undetected modes
- Installs or updates required Playwright browsers (Chromium, Firefox, etc.)
- Performs OS-level checks (e.g., missing libs on Linux)
- Confirms your environment is ready to crawl

View File

@@ -520,8 +520,7 @@ This approach is handy when you still want external links but need to block cert
### 4.1 Accessing `result.media`
By default, Crawl4AI collects images, audio and video URLs it finds on the page. These are stored in `result.media`, a dictionary keyed by media type (e.g., `images`, `videos`, `audio`).
**Note: Tables have been moved from `result.media["tables"]` to the new `result.tables` format for better organization and direct access.**
By default, Crawl4AI collects images, audio, video URLs, and data tables it finds on the page. These are stored in `result.media`, a dictionary keyed by media type (e.g., `images`, `videos`, `audio`, `tables`).
**Basic Example**:
@@ -535,6 +534,14 @@ if result.success:
print(f" Alt text: {img.get('alt', '')}")
print(f" Score: {img.get('score')}")
print(f" Description: {img.get('desc', '')}\n")
# Get tables
tables = result.media.get("tables", [])
print(f"Found {len(tables)} data tables in total.")
for i, table in enumerate(tables):
print(f"[Table {i}] Caption: {table.get('caption', 'No caption')}")
print(f" Columns: {len(table.get('headers', []))}")
print(f" Rows: {len(table.get('rows', []))}")
```
**Structure Example**:
@@ -561,6 +568,19 @@ result.media = {
"audio": [
# Similar structure but with audio-specific fields
],
"tables": [
{
"headers": ["Name", "Age", "Location"],
"rows": [
["John Doe", "34", "New York"],
["Jane Smith", "28", "San Francisco"],
["Alex Johnson", "42", "Chicago"]
],
"caption": "Employee Directory",
"summary": "Directory of company employees"
},
# More tables if present
]
}
```
@@ -588,7 +608,53 @@ crawler_cfg = CrawlerRunConfig(
This setting attempts to discard images from outside the primary domain, keeping only those from the site youre crawling.
### 4.3 Additional Media Config
### 3.3 Working with Tables
Crawl4AI can detect and extract structured data from HTML tables. Tables are analyzed based on various criteria to determine if they are actual data tables (as opposed to layout tables), including:
- Presence of thead and tbody sections
- Use of th elements for headers
- Column consistency
- Text density
- And other factors
Tables that score above the threshold (default: 7) are extracted and stored in `result.media.tables`.
**Accessing Table Data**:
```python
if result.success:
tables = result.media.get("tables", [])
print(f"Found {len(tables)} data tables on the page")
if tables:
# Access the first table
first_table = tables[0]
print(f"Table caption: {first_table.get('caption', 'No caption')}")
print(f"Headers: {first_table.get('headers', [])}")
# Print the first 3 rows
for i, row in enumerate(first_table.get('rows', [])[:3]):
print(f"Row {i+1}: {row}")
```
**Configuring Table Extraction**:
You can adjust the sensitivity of the table detection algorithm with:
```python
crawler_cfg = CrawlerRunConfig(
table_score_threshold=5 # Lower value = more tables detected (default: 7)
)
```
Each extracted table contains:
- `headers`: Column header names
- `rows`: List of rows, each containing cell values
- `caption`: Table caption text (if available)
- `summary`: Table summary attribute (if specified)
### 3.4 Additional Media Config
- **`screenshot`**: Set to `True` if you want a full-page screenshot stored as `base64` in `result.screenshot`.
- **`pdf`**: Set to `True` if you want a PDF version of the page in `result.pdf`.
@@ -629,7 +695,7 @@ The MHTML format is particularly useful because:
---
## 5. Putting It All Together: Link & Media Filtering
## 4. Putting It All Together: Link & Media Filtering
Heres a combined example demonstrating how to filter out external links, skip certain domains, and exclude external images:
@@ -677,7 +743,7 @@ if __name__ == "__main__":
---
## 6. Common Pitfalls & Tips
## 5. Common Pitfalls & Tips
1. **Conflicting Flags**:
- `exclude_external_links=True` but then also specifying `exclude_social_media_links=True` is typically fine, but understand that the first setting already discards *all* external links. The second becomes somewhat redundant.
@@ -696,3 +762,10 @@ if __name__ == "__main__":
---
**Thats it for Link & Media Analysis!** Youre now equipped to filter out unwanted sites and zero in on the images and videos that matter for your project.
### Table Extraction Tips
- Not all HTML tables are extracted - only those detected as "data tables" vs. layout tables.
- Tables with inconsistent cell counts, nested tables, or those used purely for layout may be skipped.
- If you're missing tables, try adjusting the `table_score_threshold` to a lower value (default is 7).
The table detection algorithm scores tables based on features like consistent columns, presence of headers, text density, and more. Tables scoring above the threshold are considered data tables worth extracting.

View File

@@ -45,7 +45,6 @@ nav:
- "Lazy Loading": "advanced/lazy-loading.md"
- "Hooks & Auth": "advanced/hooks-auth.md"
- "Proxy & Security": "advanced/proxy-security.md"
- "Undetected Browser": "advanced/undetected-browser.md"
- "Session Management": "advanced/session-management.md"
- "Multi-URL Crawling": "advanced/multi-url-crawling.md"
- "Crawl Dispatcher": "advanced/crawl-dispatcher.md"

View File

@@ -13,34 +13,34 @@ authors = [
{name = "Unclecode", email = "unclecode@kidocode.com"}
]
dependencies = [
"aiofiles>=24.1.0",
"aiohttp>=3.11.11",
"aiosqlite~=0.20",
"anyio>=4.0.0",
"lxml~=5.3",
"litellm>=1.53.1",
"numpy>=1.26.0,<3",
"pillow>=10.4",
"playwright>=1.49.0",
"patchright>=1.49.0",
"python-dotenv~=1.0",
"requests~=2.26",
"beautifulsoup4~=4.12",
"tf-playwright-stealth>=1.1.0",
"xxhash~=3.4",
"rank-bm25~=0.2",
"aiofiles>=24.1.0",
"snowballstemmer~=2.2",
"pydantic>=2.10",
"pyOpenSSL>=24.3.0",
"psutil>=6.1.1",
"PyYAML>=6.0",
"nltk>=3.9.1",
"playwright",
"rich>=13.9.4",
"cssselect>=1.2.0",
"httpx>=0.27.2",
"httpx[http2]>=0.27.2",
"fake-useragent>=2.0.3",
"click>=8.1.7",
"pyperclip>=1.8.2",
"chardet>=5.2.0",
"aiohttp>=3.11.11",
"brotli>=1.1.0",
"humanize>=4.10.0",
"lark>=1.2.2",

View File

@@ -1,29 +1,26 @@
# Note: These requirements are also specified in pyproject.toml
# This file is kept for development environment setup and compatibility
aiofiles>=24.1.0
aiohttp>=3.11.11
aiosqlite~=0.20
anyio>=4.0.0
lxml~=5.3
litellm>=1.53.1
numpy>=1.26.0,<3
pillow>=10.4
playwright>=1.49.0
patchright>=1.49.0
python-dotenv~=1.0
requests~=2.26
beautifulsoup4~=4.12
tf-playwright-stealth>=1.1.0
xxhash~=3.4
rank-bm25~=0.2
aiofiles>=24.1.0
colorama~=0.4
snowballstemmer~=2.2
pydantic>=2.10
pyOpenSSL>=24.3.0
psutil>=6.1.1
PyYAML>=6.0
nltk>=3.9.1
rich>=13.9.4
cssselect>=1.2.0
chardet>=5.2.0
brotli>=1.1.0
httpx[http2]>=0.27.2

View File

@@ -10,11 +10,13 @@ import sys
import uuid
import shutil
from crawl4ai import BrowserProfiler
from crawl4ai.browser_manager import BrowserManager
# Add the project root to Python path if running directly
if __name__ == "__main__":
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from crawl4ai.browser import BrowserManager, BrowserProfileManager
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
@@ -25,7 +27,7 @@ async def test_profile_creation():
"""Test creating and managing browser profiles."""
logger.info("Testing profile creation and management", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
try:
# List existing profiles
@@ -83,7 +85,7 @@ async def test_profile_with_browser():
"""Test using a profile with a browser."""
logger.info("Testing using a profile with a browser", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}"
profile_path = None
@@ -101,6 +103,8 @@ async def test_profile_with_browser():
# Now use this profile with a browser
browser_config = BrowserConfig(
user_data_dir=profile_path,
use_managed_browser=True,
use_persistent_context=True,
headless=True
)

View File

@@ -1,344 +0,0 @@
#!/usr/bin/env python3
"""
Dependency checker for Crawl4AI
Analyzes imports in the codebase and shows which files use them
"""
import ast
import os
import sys
from pathlib import Path
from typing import Set, Dict, List, Tuple
from collections import defaultdict
import re
import toml
# Standard library modules to ignore
STDLIB_MODULES = {
'abc', 'argparse', 'asyncio', 'base64', 'collections', 'concurrent', 'contextlib',
'copy', 'datetime', 'decimal', 'email', 'enum', 'functools', 'glob', 'hashlib',
'http', 'importlib', 'io', 'itertools', 'json', 'logging', 'math', 'mimetypes',
'multiprocessing', 'os', 'pathlib', 'pickle', 'platform', 'pprint', 'random',
're', 'shutil', 'signal', 'socket', 'sqlite3', 'string', 'subprocess', 'sys',
'tempfile', 'threading', 'time', 'traceback', 'typing', 'unittest', 'urllib',
'uuid', 'warnings', 'weakref', 'xml', 'zipfile', 'dataclasses', 'secrets',
'statistics', 'textwrap', 'queue', 'csv', 'gzip', 'tarfile', 'configparser',
'inspect', 'operator', 'struct', 'binascii', 'codecs', 'locale', 'gc',
'atexit', 'builtins', 'html', 'errno', 'fcntl', 'pwd', 'grp', 'resource',
'termios', 'tty', 'pty', 'select', 'selectors', 'ssl', 'zlib', 'bz2',
'lzma', 'types', 'copy', 'pydoc', 'profile', 'cProfile', 'timeit',
'trace', 'doctest', 'pdb', 'contextvars', 'dataclasses', 'graphlib',
'zoneinfo', 'tomllib', 'cgi', 'wsgiref', 'fileinput', 'linecache',
'tokenize', 'tabnanny', 'compileall', 'dis', 'pickletools', 'formatter',
'__future__', 'array', 'ctypes', 'heapq', 'bisect', 'array', 'weakref',
'types', 'copy', 'pprint', 'repr', 'numbers', 'cmath', 'fractions',
'statistics', 'itertools', 'functools', 'operator', 'pathlib', 'fileinput',
'stat', 'filecmp', 'tempfile', 'glob', 'fnmatch', 'linecache', 'shutil',
'pickle', 'copyreg', 'shelve', 'marshal', 'dbm', 'sqlite3', 'zlib', 'gzip',
'bz2', 'lzma', 'zipfile', 'tarfile', 'configparser', 'netrc', 'xdrlib',
'plistlib', 'hashlib', 'hmac', 'secrets', 'os', 'io', 'time', 'argparse',
'getopt', 'logging', 'getpass', 'curses', 'platform', 'errno', 'ctypes',
'threading', 'multiprocessing', 'concurrent', 'subprocess', 'sched', 'queue',
'contextvars', 'asyncio', 'socket', 'ssl', 'email', 'json', 'mailcap',
'mailbox', 'mimetypes', 'base64', 'binhex', 'binascii', 'quopri', 'uu',
'html', 'xml', 'webbrowser', 'cgi', 'cgitb', 'wsgiref', 'urllib', 'http',
'ftplib', 'poplib', 'imaplib', 'nntplib', 'smtplib', 'smtpd', 'telnetlib',
'uuid', 'socketserver', 'xmlrpc', 'ipaddress', 'audioop', 'aifc', 'sunau',
'wave', 'chunk', 'colorsys', 'imghdr', 'sndhdr', 'ossaudiodev', 'gettext',
'locale', 'turtle', 'cmd', 'shlex', 'tkinter', 'typing', 'pydoc', 'doctest',
'unittest', 'test', '2to3', 'distutils', 'venv', 'ensurepip', 'zipapp',
'py_compile', 'compileall', 'dis', 'pickletools', 'pdb', 'timeit', 'trace',
'tracemalloc', 'warnings', 'faulthandler', 'pdb', 'dataclasses', 'cgi',
'cgitb', 'chunk', 'crypt', 'imghdr', 'mailcap', 'nis', 'nntplib', 'optparse',
'ossaudiodev', 'pipes', 'smtpd', 'sndhdr', 'spwd', 'sunau', 'telnetlib',
'uu', 'xdrlib', 'msilib', 'pstats', 'rlcompleter', 'tkinter', 'ast'
}
# Known package name mappings (import name -> package name)
PACKAGE_MAPPINGS = {
'bs4': 'beautifulsoup4',
'PIL': 'pillow',
'cv2': 'opencv-python',
'sklearn': 'scikit-learn',
'yaml': 'PyYAML',
'OpenSSL': 'pyOpenSSL',
'sqlalchemy': 'SQLAlchemy',
'playwright': 'playwright',
'patchright': 'patchright',
'dotenv': 'python-dotenv',
'fake_useragent': 'fake-useragent',
'playwright_stealth': 'tf-playwright-stealth',
'sentence_transformers': 'sentence-transformers',
'rank_bm25': 'rank-bm25',
'snowballstemmer': 'snowballstemmer',
'PyPDF2': 'PyPDF2',
'pdf2image': 'pdf2image',
}
class ImportVisitor(ast.NodeVisitor):
"""AST visitor to extract imports from Python files"""
def __init__(self):
self.imports = {} # Changed to dict to store line numbers
self.from_imports = {}
def visit_Import(self, node):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name not in self.imports:
self.imports[module_name] = []
self.imports[module_name].append(node.lineno)
def visit_ImportFrom(self, node):
if node.module and node.level == 0: # absolute imports only
module_name = node.module.split('.')[0]
if module_name not in self.from_imports:
self.from_imports[module_name] = []
self.from_imports[module_name].append(node.lineno)
def extract_imports_from_file(filepath: Path) -> Dict[str, List[int]]:
"""Extract all imports from a Python file with line numbers"""
all_imports = {}
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
visitor = ImportVisitor()
visitor.visit(tree)
# Merge imports and from_imports
for module, lines in visitor.imports.items():
if module not in all_imports:
all_imports[module] = []
all_imports[module].extend(lines)
for module, lines in visitor.from_imports.items():
if module not in all_imports:
all_imports[module] = []
all_imports[module].extend(lines)
except Exception as e:
# Silently skip files that can't be parsed
pass
return all_imports
def get_codebase_imports_with_files(root_dir: Path) -> Dict[str, List[Tuple[str, List[int]]]]:
"""Get all imports from the crawl4ai library and docs folders with file locations and line numbers"""
import_to_files = defaultdict(list)
# Only scan crawl4ai library folder and docs folder
target_dirs = [
root_dir / 'crawl4ai',
root_dir / 'docs'
]
for target_dir in target_dirs:
if not target_dir.exists():
continue
for py_file in target_dir.rglob('*.py'):
# Skip __pycache__ directories
if '__pycache__' in py_file.parts:
continue
# Skip setup.py and similar files
if py_file.name in ['setup.py', 'setup.cfg', 'conf.py']:
continue
imports = extract_imports_from_file(py_file)
# Map each import to the file and line numbers
for imp, line_numbers in imports.items():
relative_path = py_file.relative_to(root_dir)
import_to_files[imp].append((str(relative_path), sorted(line_numbers)))
return dict(import_to_files)
def get_declared_dependencies() -> Set[str]:
"""Get declared dependencies from pyproject.toml and requirements.txt"""
declared = set()
# Read from pyproject.toml
if Path('pyproject.toml').exists():
with open('pyproject.toml', 'r') as f:
data = toml.load(f)
# Get main dependencies
deps = data.get('project', {}).get('dependencies', [])
for dep in deps:
# Parse dependency string (e.g., "numpy>=1.26.0,<3")
match = re.match(r'^([a-zA-Z0-9_-]+)', dep)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
# Get optional dependencies
optional = data.get('project', {}).get('optional-dependencies', {})
for group, deps in optional.items():
for dep in deps:
match = re.match(r'^([a-zA-Z0-9_-]+)', dep)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
# Also check requirements.txt as backup
if Path('requirements.txt').exists():
with open('requirements.txt', 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
match = re.match(r'^([a-zA-Z0-9_-]+)', line)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
return declared
def normalize_package_name(name: str) -> str:
"""Normalize package name for comparison"""
# Handle known mappings first
if name in PACKAGE_MAPPINGS:
return PACKAGE_MAPPINGS[name].lower()
# Basic normalization
return name.lower().replace('_', '-')
def check_missing_dependencies():
"""Main function to check for missing dependencies"""
print("🔍 Analyzing crawl4ai library and docs folders...\n")
# Get all imports with their file locations
root_dir = Path('.')
import_to_files = get_codebase_imports_with_files(root_dir)
# Get declared dependencies
declared_deps = get_declared_dependencies()
# Normalize declared dependencies
normalized_declared = {normalize_package_name(dep) for dep in declared_deps}
# Categorize imports
external_imports = {}
local_imports = {}
# Known local packages
local_packages = {'crawl4ai'}
for imp, file_info in import_to_files.items():
# Skip standard library
if imp in STDLIB_MODULES:
continue
# Check if it's a local import
if any(imp.startswith(local) for local in local_packages):
local_imports[imp] = file_info
else:
external_imports[imp] = file_info
# Check which external imports are not declared
not_declared = {}
declared_imports = {}
for imp, file_info in external_imports.items():
normalized_imp = normalize_package_name(imp)
# Check if import is covered by declared dependencies
found = False
for declared in normalized_declared:
if normalized_imp == declared or normalized_imp.startswith(declared + '.') or declared.startswith(normalized_imp):
found = True
break
if found:
declared_imports[imp] = file_info
else:
not_declared[imp] = file_info
# Print results
print(f"📊 Summary:")
print(f" - Total unique imports: {len(import_to_files)}")
print(f" - External imports: {len(external_imports)}")
print(f" - Declared dependencies: {len(declared_deps)}")
print(f" - External imports NOT in dependencies: {len(not_declared)}\n")
if not_declared:
print("❌ External imports NOT declared in pyproject.toml or requirements.txt:\n")
# Sort by import name
for imp in sorted(not_declared.keys()):
file_info = not_declared[imp]
print(f" 📦 {imp}")
if imp in PACKAGE_MAPPINGS:
print(f" → Package name: {PACKAGE_MAPPINGS[imp]}")
# Show up to 3 files that use this import
for i, (file_path, line_numbers) in enumerate(file_info[:3]):
# Format line numbers for clickable output
if len(line_numbers) == 1:
print(f" - {file_path}:{line_numbers[0]}")
else:
# Show first few line numbers
line_str = ','.join(str(ln) for ln in line_numbers[:3])
if len(line_numbers) > 3:
line_str += f"... ({len(line_numbers)} imports)"
print(f" - {file_path}: lines {line_str}")
if len(file_info) > 3:
print(f" ... and {len(file_info) - 3} more files")
print()
# Check for potentially unused dependencies
print("\n🔎 Checking declared dependencies usage...\n")
# Get all used external packages
used_packages = set()
for imp in external_imports.keys():
normalized = normalize_package_name(imp)
used_packages.add(normalized)
# Find unused
unused = []
for dep in declared_deps:
normalized_dep = normalize_package_name(dep)
# Check if any import uses this dependency
found_usage = False
for used in used_packages:
if used == normalized_dep or used.startswith(normalized_dep) or normalized_dep.startswith(used):
found_usage = True
break
if not found_usage:
# Some packages are commonly unused directly
indirect_deps = {'wheel', 'setuptools', 'pip', 'colorama', 'certifi', 'packaging', 'urllib3'}
if normalized_dep not in indirect_deps:
unused.append(dep)
if unused:
print("⚠️ Declared dependencies with NO imports found:")
for dep in sorted(unused):
print(f" - {dep}")
print("\n Note: These might be used indirectly or by other dependencies")
else:
print("✅ All declared dependencies have corresponding imports")
print("\n" + "="*60)
print("💡 How to use this report:")
print(" 1. Check each ❌ import to see if it's legitimate")
print(" 2. If legitimate, add the package to pyproject.toml")
print(" 3. If it's an internal module or typo, fix the import")
print(" 4. Review unused dependencies - remove if truly not needed")
print("="*60)
if __name__ == '__main__':
check_missing_dependencies()

View File

@@ -1,372 +0,0 @@
#!/usr/bin/env python3
"""
Test client for demonstrating user-provided hooks in Crawl4AI Docker API
"""
import requests
import json
from typing import Dict, Any
API_BASE_URL = "http://localhost:11234" # Adjust if needed
def test_hooks_info():
"""Get information about available hooks"""
print("=" * 70)
print("Testing: GET /hooks/info")
print("=" * 70)
response = requests.get(f"{API_BASE_URL}/hooks/info")
if response.status_code == 200:
data = response.json()
print("Available Hook Points:")
for hook, info in data['available_hooks'].items():
print(f"\n{hook}:")
print(f" Parameters: {', '.join(info['parameters'])}")
print(f" Description: {info['description']}")
else:
print(f"Error: {response.status_code}")
print(response.text)
def test_basic_crawl_with_hooks():
"""Test basic crawling with user-provided hooks"""
print("\n" + "=" * 70)
print("Testing: POST /crawl with hooks")
print("=" * 70)
# Define hooks as Python code strings
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("Hook: Setting up page context")
# Block images to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp}", lambda route: route.abort())
print("Hook: Images blocked")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("Hook: Before retrieving HTML")
# Scroll to bottom to load lazy content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
print("Hook: Scrolled to bottom")
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
print(f"Hook: About to navigate to {url}")
# Add custom headers
await page.set_extra_http_headers({
'X-Test-Header': 'crawl4ai-hooks-test'
})
return page
"""
}
# Create request payload
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 30
}
}
print("Sending request with hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("\n✅ Crawl successful!")
# Check hooks status
if 'hooks' in data:
hooks_info = data['hooks']
print("\nHooks Execution Summary:")
print(f" Status: {hooks_info['status']['status']}")
print(f" Attached hooks: {', '.join(hooks_info['status']['attached_hooks'])}")
if hooks_info['status']['validation_errors']:
print("\n⚠️ Validation Errors:")
for error in hooks_info['status']['validation_errors']:
print(f" - {error['hook_point']}: {error['error']}")
if 'summary' in hooks_info:
summary = hooks_info['summary']
print(f"\nExecution Statistics:")
print(f" Total executions: {summary['total_executions']}")
print(f" Successful: {summary['successful']}")
print(f" Failed: {summary['failed']}")
print(f" Timed out: {summary['timed_out']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
if hooks_info['execution_log']:
print("\nExecution Log:")
for log_entry in hooks_info['execution_log']:
status_icon = "" if log_entry['status'] == 'success' else ""
print(f" {status_icon} {log_entry['hook_point']}: {log_entry['status']} ({log_entry.get('execution_time', 0):.2f}s)")
if hooks_info['errors']:
print("\n❌ Hook Errors:")
for error in hooks_info['errors']:
print(f" - {error['hook_point']}: {error['error']}")
# Show crawl results
if 'results' in data:
print(f"\nCrawled {len(data['results'])} URL(s)")
for result in data['results']:
print(f" - {result['url']}: {'' if result['success'] else ''}")
else:
print(f"❌ Error: {response.status_code}")
print(response.text)
def test_invalid_hook():
"""Test with an invalid hook to see error handling"""
print("\n" + "=" * 70)
print("Testing: Invalid hook handling")
print("=" * 70)
# Intentionally broken hook
hooks_code = {
"on_page_context_created": """
def hook(page, context): # Missing async!
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# This will cause an error
await page.non_existent_method()
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 5
}
}
print("Sending request with invalid hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
if 'hooks' in data:
hooks_info = data['hooks']
print(f"\nHooks Status: {hooks_info['status']['status']}")
if hooks_info['status']['validation_errors']:
print("\n✅ Validation caught errors (as expected):")
for error in hooks_info['status']['validation_errors']:
print(f" - {error['hook_point']}: {error['error']}")
if hooks_info['errors']:
print("\n✅ Runtime errors handled gracefully:")
for error in hooks_info['errors']:
print(f" - {error['hook_point']}: {error['error']}")
# The crawl should still succeed despite hook errors
if data.get('success'):
print("\n✅ Crawl succeeded despite hook errors (error isolation working!)")
else:
print(f"Error: {response.status_code}")
print(response.text)
def test_authentication_hook():
"""Test authentication using hooks"""
print("\n" + "=" * 70)
print("Testing: Authentication with hooks")
print("=" * 70)
hooks_code = {
"before_goto": """
async def hook(page, context, url, **kwargs):
# For httpbin.org basic auth test, set Authorization header
import base64
# httpbin.org/basic-auth/user/passwd expects username="user" and password="passwd"
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}'
})
print(f"Hook: Set Authorization header for {url}")
return page
""",
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Example: Add cookies for session tracking
await context.add_cookies([
{
'name': 'session_id',
'value': 'test_session_123',
'domain': '.httpbin.org',
'path': '/',
'httpOnly': True,
'secure': True
}
])
print("Hook: Added session cookie")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/basic-auth/user/passwd"],
"hooks": {
"code": hooks_code,
"timeout": 30
}
}
print("Sending request with authentication hook...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
if data.get('success'):
print("✅ Crawl with authentication hook successful")
# Check if hooks executed
if 'hooks' in data:
hooks_info = data['hooks']
if hooks_info.get('summary', {}).get('successful', 0) > 0:
print(f"✅ Authentication hooks executed: {hooks_info['summary']['successful']} successful")
# Check for any hook errors
if hooks_info.get('errors'):
print("⚠️ Hook errors:")
for error in hooks_info['errors']:
print(f" - {error}")
# Check if authentication worked by looking at the result
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
if result.get('success'):
print("✅ Page crawled successfully (authentication worked!)")
# httpbin.org/basic-auth returns JSON with authenticated=true when successful
if 'authenticated' in str(result.get('html', '')):
print("✅ Authentication confirmed in response content")
else:
print(f"❌ Crawl failed: {result.get('error_message', 'Unknown error')}")
else:
print("❌ Request failed")
print(f"Response: {json.dumps(data, indent=2)}")
else:
print(f"❌ Error: {response.status_code}")
try:
error_data = response.json()
print(f"Error details: {json.dumps(error_data, indent=2)}")
except:
print(f"Error text: {response.text[:500]}")
def test_streaming_with_hooks():
"""Test streaming endpoint with hooks"""
print("\n" + "=" * 70)
print("Testing: POST /crawl/stream with hooks")
print("=" * 70)
hooks_code = {
"before_retrieve_html": """
async def hook(page, context, **kwargs):
await page.evaluate("document.querySelectorAll('img').forEach(img => img.remove())")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"],
"hooks": {
"code": hooks_code,
"timeout": 10
}
}
print("Sending streaming request with hooks...")
with requests.post(f"{API_BASE_URL}/crawl/stream", json=payload, stream=True) as response:
if response.status_code == 200:
# Check headers for hooks status
hooks_status = response.headers.get('X-Hooks-Status')
if hooks_status:
print(f"Hooks Status (from header): {hooks_status}")
print("\nStreaming results:")
for line in response.iter_lines():
if line:
try:
result = json.loads(line)
if 'url' in result:
print(f" Received: {result['url']}")
elif 'status' in result:
print(f" Stream status: {result['status']}")
except json.JSONDecodeError:
print(f" Raw: {line.decode()}")
else:
print(f"Error: {response.status_code}")
def test_basic_without_hooks():
"""Test basic crawl without hooks"""
print("\n" + "=" * 70)
print("Testing: POST /crawl with no hooks")
print("=" * 70)
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"]
}
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print(f"Response: {json.dumps(data, indent=2)}")
else:
print(f"Error: {response.status_code}")
def main():
"""Run all tests"""
print("🔧 Crawl4AI Docker API - Hooks Testing")
print("=" * 70)
# Test 1: Get hooks information
# test_hooks_info()
# Test 2: Basic crawl with hooks
# test_basic_crawl_with_hooks()
# Test 3: Invalid hooks (error handling)
test_invalid_hook()
# # Test 4: Authentication hook
# test_authentication_hook()
# # Test 5: Streaming with hooks
# test_streaming_with_hooks()
# # Test 6: Basic crawl without hooks
# test_basic_without_hooks()
print("\n" + "=" * 70)
print("✅ All tests completed!")
print("=" * 70)
if __name__ == "__main__":
main()

View File

@@ -1,512 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive test demonstrating all hook types from hooks_example.py
adapted for the Docker API with real URLs
"""
import requests
import json
import time
from typing import Dict, Any
API_BASE_URL = "http://localhost:11234"
def test_all_hooks_demo():
"""Demonstrate all 8 hook types with practical examples"""
print("=" * 70)
print("Testing: All Hooks Comprehensive Demo")
print("=" * 70)
hooks_code = {
"on_browser_created": """
async def hook(browser, **kwargs):
# Hook called after browser is created
print("[HOOK] on_browser_created - Browser is ready!")
# Browser-level configurations would go here
return browser
""",
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Hook called after a new page and context are created
print("[HOOK] on_page_context_created - New page created!")
# Set viewport size for consistent rendering
await page.set_viewport_size({"width": 1920, "height": 1080})
# Add cookies for the session (using httpbin.org domain)
await context.add_cookies([
{
"name": "test_session",
"value": "abc123xyz",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Block ads and tracking scripts to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg}", lambda route: route.abort())
await context.route("**/analytics/*", lambda route: route.abort())
await context.route("**/ads/*", lambda route: route.abort())
print("[HOOK] Viewport set, cookies added, and ads blocked")
return page
""",
"on_user_agent_updated": """
async def hook(page, context, user_agent, **kwargs):
# Hook called when user agent is updated
print(f"[HOOK] on_user_agent_updated - User agent: {user_agent[:50]}...")
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
# Hook called before navigating to each URL
print(f"[HOOK] before_goto - About to visit: {url}")
# Add custom headers for the request
await page.set_extra_http_headers({
"X-Custom-Header": "crawl4ai-test",
"Accept-Language": "en-US,en;q=0.9",
"DNT": "1"
})
return page
""",
"after_goto": """
async def hook(page, context, url, response, **kwargs):
# Hook called after navigating to each URL
print(f"[HOOK] after_goto - Successfully loaded: {url}")
# Wait a moment for dynamic content to load
await page.wait_for_timeout(1000)
# Check if specific elements exist (with error handling)
try:
# For httpbin.org, wait for body element
await page.wait_for_selector("body", timeout=2000)
print("[HOOK] Body element found and loaded")
except:
print("[HOOK] Timeout waiting for body, continuing anyway")
return page
""",
"on_execution_started": """
async def hook(page, context, **kwargs):
# Hook called after custom JavaScript execution
print("[HOOK] on_execution_started - Custom JS executed!")
# You could inject additional JavaScript here if needed
await page.evaluate("console.log('[INJECTED] Hook JS running');")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Hook called before retrieving the HTML content
print("[HOOK] before_retrieve_html - Preparing to get HTML")
# Scroll to bottom to trigger lazy loading
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(500)
# Scroll back to top
await page.evaluate("window.scrollTo(0, 0);")
await page.wait_for_timeout(500)
# One more scroll to middle for good measure
await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 2);")
print("[HOOK] Scrolling completed for lazy-loaded content")
return page
""",
"before_return_html": """
async def hook(page, context, html, **kwargs):
# Hook called before returning the HTML content
print(f"[HOOK] before_return_html - HTML length: {len(html)} characters")
# Log some page metrics
metrics = await page.evaluate('''() => {
return {
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length
}
}''')
print(f"[HOOK] Page metrics - Images: {metrics['images']}, Links: {metrics['links']}, Scripts: {metrics['scripts']}")
return page
"""
}
# Create request payload
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 30
},
"crawler_config": {
"js_code": "window.scrollTo(0, document.body.scrollHeight);",
"wait_for": "body",
"cache_mode": "bypass"
}
}
print("\nSending request with all 8 hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("\n✅ Request successful!")
# Check hooks execution
if 'hooks' in data:
hooks_info = data['hooks']
print("\n📊 Hooks Execution Summary:")
print(f" Status: {hooks_info['status']['status']}")
print(f" Attached hooks: {len(hooks_info['status']['attached_hooks'])}")
for hook_name in hooks_info['status']['attached_hooks']:
print(f"{hook_name}")
if 'summary' in hooks_info:
summary = hooks_info['summary']
print(f"\n📈 Execution Statistics:")
print(f" Total executions: {summary['total_executions']}")
print(f" Successful: {summary['successful']}")
print(f" Failed: {summary['failed']}")
print(f" Timed out: {summary['timed_out']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
if hooks_info.get('execution_log'):
print(f"\n📝 Execution Log:")
for log_entry in hooks_info['execution_log']:
status_icon = "" if log_entry['status'] == 'success' else ""
exec_time = log_entry.get('execution_time', 0)
print(f" {status_icon} {log_entry['hook_point']}: {exec_time:.3f}s")
# Check crawl results
if 'results' in data and len(data['results']) > 0:
print(f"\n📄 Crawl Results:")
for result in data['results']:
print(f" URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
if result.get('html'):
print(f" HTML length: {len(result['html'])} characters")
else:
print(f"❌ Error: {response.status_code}")
try:
error_data = response.json()
print(f"Error details: {json.dumps(error_data, indent=2)}")
except:
print(f"Error text: {response.text[:500]}")
def test_authentication_flow():
"""Test a complete authentication flow with multiple hooks"""
print("\n" + "=" * 70)
print("Testing: Authentication Flow with Multiple Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Setting up authentication context")
# Add authentication cookies
await context.add_cookies([
{
"name": "auth_token",
"value": "fake_jwt_token_here",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Set localStorage items (for SPA authentication)
await page.evaluate('''
localStorage.setItem('user_id', '12345');
localStorage.setItem('auth_time', new Date().toISOString());
''')
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
print(f"[HOOK] Adding auth headers for {url}")
# Add Authorization header
import base64
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}',
'X-API-Key': 'test-api-key-123'
})
return page
"""
}
payload = {
"urls": [
"https://httpbin.org/basic-auth/user/passwd"
],
"hooks": {
"code": hooks_code,
"timeout": 15
}
}
print("\nTesting authentication with httpbin endpoints...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Authentication test completed")
if 'results' in data:
for i, result in enumerate(data['results']):
print(f"\n URL {i+1}: {result['url']}")
if result.get('success'):
# Check for authentication success indicators
html_content = result.get('html', '')
if '"authenticated"' in html_content and 'true' in html_content:
print(" ✅ Authentication successful! Basic auth worked.")
else:
print(" ⚠️ Page loaded but auth status unclear")
else:
print(f" ❌ Failed: {result.get('error_message', 'Unknown error')}")
else:
print(f"❌ Error: {response.status_code}")
def test_performance_optimization_hooks():
"""Test hooks for performance optimization"""
print("\n" + "=" * 70)
print("Testing: Performance Optimization Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Optimizing page for performance")
# Block resource-heavy content
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg,ico}", lambda route: route.abort())
await context.route("**/*.{woff,woff2,ttf,otf}", lambda route: route.abort())
await context.route("**/*.{mp4,webm,ogg,mp3,wav}", lambda route: route.abort())
await context.route("**/googletagmanager.com/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
await context.route("**/doubleclick.net/*", lambda route: route.abort())
await context.route("**/facebook.com/*", lambda route: route.abort())
# Disable animations and transitions
await page.add_style_tag(content='''
*, *::before, *::after {
animation-duration: 0s !important;
animation-delay: 0s !important;
transition-duration: 0s !important;
transition-delay: 0s !important;
}
''')
print("[HOOK] Performance optimizations applied")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Removing unnecessary elements before extraction")
# Remove ads, popups, and other unnecessary elements
await page.evaluate('''() => {
// Remove common ad containers
const adSelectors = [
'.ad', '.ads', '.advertisement', '[id*="ad-"]', '[class*="ad-"]',
'.popup', '.modal', '.overlay', '.cookie-banner', '.newsletter-signup'
];
adSelectors.forEach(selector => {
document.querySelectorAll(selector).forEach(el => el.remove());
});
// Remove script tags to clean up HTML
document.querySelectorAll('script').forEach(el => el.remove());
// Remove style tags we don't need
document.querySelectorAll('style').forEach(el => el.remove());
}''')
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 10
}
}
print("\nTesting performance optimization hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("✅ Performance optimization test completed")
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
if result.get('html'):
print(f" HTML size: {len(result['html'])} characters")
print(" Resources blocked, ads removed, animations disabled")
else:
print(f"❌ Error: {response.status_code}")
def test_content_extraction_hooks():
"""Test hooks for intelligent content extraction"""
print("\n" + "=" * 70)
print("Testing: Content Extraction Hooks")
print("=" * 70)
hooks_code = {
"after_goto": """
async def hook(page, context, url, response, **kwargs):
print(f"[HOOK] Waiting for dynamic content on {url}")
# Wait for any lazy-loaded content
await page.wait_for_timeout(2000)
# Trigger any "Load More" buttons
try:
load_more = await page.query_selector('[class*="load-more"], [class*="show-more"], button:has-text("Load More")')
if load_more:
await load_more.click()
await page.wait_for_timeout(1000)
print("[HOOK] Clicked 'Load More' button")
except:
pass
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Extracting structured data")
# Extract metadata
metadata = await page.evaluate('''() => {
const getMeta = (name) => {
const element = document.querySelector(`meta[name="${name}"], meta[property="${name}"]`);
return element ? element.getAttribute('content') : null;
};
return {
title: document.title,
description: getMeta('description') || getMeta('og:description'),
author: getMeta('author'),
keywords: getMeta('keywords'),
ogTitle: getMeta('og:title'),
ogImage: getMeta('og:image'),
canonical: document.querySelector('link[rel="canonical"]')?.href,
jsonLd: Array.from(document.querySelectorAll('script[type="application/ld+json"]'))
.map(el => el.textContent).filter(Boolean)
};
}''')
print(f"[HOOK] Extracted metadata: {json.dumps(metadata, indent=2)}")
# Infinite scroll handling
for i in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
print(f"[HOOK] Scroll iteration {i+1}/3")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"],
"hooks": {
"code": hooks_code,
"timeout": 20
}
}
print("\nTesting content extraction hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Content extraction test completed")
if 'hooks' in data and 'summary' in data['hooks']:
summary = data['hooks']['summary']
print(f" Hooks executed: {summary['successful']}/{summary['total_executions']}")
if 'results' in data:
for result in data['results']:
print(f"\n URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
else:
print(f"❌ Error: {response.status_code}")
def main():
"""Run comprehensive hook tests"""
print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing")
print("Based on docs/examples/hooks_example.py")
print("=" * 70)
tests = [
("All Hooks Demo", test_all_hooks_demo),
("Authentication Flow", test_authentication_flow),
("Performance Optimization", test_performance_optimization_hooks),
("Content Extraction", test_content_extraction_hooks),
]
for i, (name, test_func) in enumerate(tests, 1):
print(f"\n📌 Test {i}/{len(tests)}: {name}")
try:
test_func()
print(f"{name} completed")
except Exception as e:
print(f"{name} failed: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 70)
print("🎉 All comprehensive hook tests completed!")
print("=" * 70)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,55 @@
import sys
import pytest
import asyncio
from unittest.mock import patch, MagicMock
from crawl4ai.browser_profiler import BrowserProfiler
@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test")
async def test_keyboard_input_handling():
# Mock sequence of keystrokes: arrow key followed by 'q'
mock_keys = [b'\x00K', b'q']
mock_kbhit = MagicMock(side_effect=[True, True, False])
mock_getch = MagicMock(side_effect=mock_keys)
with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch):
# profiler = BrowserProfiler()
user_done_event = asyncio.Event()
# Create a local async function to simulate the keyboard input handling
async def test_listen_for_quit_command():
if sys.platform == "win32":
while True:
try:
if mock_kbhit():
raw = mock_getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
continue
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
user_done_event.set()
return
await asyncio.sleep(0.1)
except Exception as e:
continue
# Run the listener
listener_task = asyncio.create_task(test_listen_for_quit_command())
# Wait for the event to be set
try:
await asyncio.wait_for(user_done_event.wait(), timeout=1.0)
assert user_done_event.is_set()
finally:
if not listener_task.done():
listener_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass