Compare commits

...

22 Commits

Author SHA1 Message Date
AHMET YILMAZ
89cf5aba2b #1057 : enhance ProxyConfig initialization to support dict and string formats 2025-08-06 18:34:58 +08:00
ntohidi
a5bcac4c9d feat(docs): enhance table data access example with a real url 2025-08-06 15:19:37 +08:00
Nasrin
45d8327d23 Merge pull request #1366 from unclecode/fix/update-tables-documentation
docs: Update README.md and modify Media and Tables Documentation.(#1271)
2025-08-06 15:15:24 +08:00
ntohidi
437395e490 Merge branch 'feat/undetected-browser' into develop-future 2025-08-06 15:03:30 +08:00
Soham Kukreti
fddae303fb docs: Update README.md and modify Media and Tables Documentation.(#1271)
- Update Table-to-DataFrame Extraction example in README.md
- Replace old method of accessing tables via result.media directly with result.tables in the documentation
- Remove tables section from links & media page.
- Add tables section to crawler result page.
2025-08-05 23:29:19 +05:30
ntohidi
ff6ea41ac3 feat(docker): add flexible LLM provider configuration
- Support LLM_PROVIDER env var to override default provider (openai/gpt-4o-mini)
- Add optional 'provider' parameter to API endpoints for per-request overrides
- Implement provider validation to ensure API keys exist
- Update documentation and examples with new configuration options

Closes the need to hardcode providers in config.yml
2025-08-05 14:09:54 +08:00
ntohidi
31a435fb0e Merge branch 'develop' of https://github.com/unclecode/crawl4ai into develop 2025-08-04 19:12:19 +08:00
Nasrin
5de6a28055 Merge pull request #1361 from unclecode/fix/crawler-result-docs
Update CrawlResult documentation with missing fields
2025-08-04 19:12:09 +08:00
ntohidi
de1561ad14 Merge branch 'develop' of https://github.com/unclecode/crawl4ai into develop 2025-08-04 19:04:50 +08:00
Nasrin
337b588732 Merge pull request #1358 from shonenada/patch-1
Fix typos in examples.md
2025-08-04 19:04:42 +08:00
ntohidi
7a6ad547f0 Squashed commit of the following:
commit 2def6524cdacb69c72760bf55a41089257c0bb07
Author: ntohidi <nasrin@kidocode.com>
Date:   Mon Aug 4 18:59:10 2025 +0800

    refactor: consolidate WebScrapingStrategy to use LXML implementation only

    BREAKING CHANGE: None - full backward compatibility maintained

    This commit simplifies the content scraping architecture by removing the
    redundant BeautifulSoup-based WebScrapingStrategy implementation and making
    it an alias for LXMLWebScrapingStrategy.

    Changes:
    - Remove ~1000 lines of BeautifulSoup-based WebScrapingStrategy code
    - Make WebScrapingStrategy an alias for LXMLWebScrapingStrategy
    - Update LXMLWebScrapingStrategy to inherit directly from ContentScrapingStrategy
    - Add required methods (scrap, ascrap, process_element, _log) to LXMLWebScrapingStrategy
    - Maintain 100% backward compatibility - existing code continues to work

    Code changes:
    - crawl4ai/content_scraping_strategy.py: Remove WebScrapingStrategy class, add alias
    - crawl4ai/async_configs.py: Remove WebScrapingStrategy from imports
    - crawl4ai/__init__.py: Update imports to show alias relationship
    - crawl4ai/types.py: Update type definitions
    - crawl4ai/legacy/web_crawler.py: Update import to use alias
    - tests/async/test_content_scraper_strategy.py: Update to use LXMLWebScrapingStrategy
    - docs/examples/scraping_strategies_performance.py: Update to use single strategy

    Documentation updates:
    - docs/md_v2/core/content-selection.md: Update scraping modes section
    - docs/md_v2/migration/webscraping-strategy-migration.md: Add migration guide
    - CHANGELOG.md: Document the refactoring under [Unreleased]

    Benefits:
    - 10-20x faster HTML parsing for large documents
    - Reduced memory usage and simplified codebase
    - Consistent parsing behavior
    - No migration required for existing users

    All existing code using WebScrapingStrategy continues to work without
    modification, while benefiting from LXML's superior performance.
2025-08-04 19:02:01 +08:00
Soham Kukreti
e6692b987d docs: Update CrawlResult documentation with missing fields.
- Add missing fields: fit_html, js_execution_result, redirected_url, network_requests, console_messages, tables
2025-08-04 15:43:40 +05:30
ntohidi
307fe28b32 fix: Correct URL matcher fallback behavior and improve memory monitoring
Fix critical issue where unmatched URLs incorrectly used the first config instead of failing safely. Also clarify that configs without url_matcher match ALL URLs by design, and improve memory usage monitoring.

Bug fixes:
- Change select_config() to return None when no config matches instead of using first config
- Add proper error handling in dispatchers when no config matches a URL
- Return failed CrawlResult with "No matching configuration found" error message
- Fix is_match() to return True when url_matcher is None (matches all URLs)
- Import and use get_true_memory_usage_percent() for more accurate memory monitoring

Behavior clarification:
- CrawlerRunConfig with url_matcher=None matches ALL URLs (not nothing)
- This is the intended behavior for default/fallback configurations
- Enables clean pattern: specific configs first, default config last

Documentation updates:
- Clarify that configs without url_matcher match everything
- Explain "No matching configuration found" error when no default config
- Add examples showing proper default config usage
- Update all relevant docs: multi-url-crawling.md, arun_many.md, parameters.md
- Simplify API config examples by removing extraction_strategy

Demo and test updates:
- Update demo_multi_config_clean.py with commented default config to show behavior
- Change example URL to w3schools.com to demonstrate no-match scenario
- Uncomment all test URLs in test_multi_config.py for comprehensive testing

Breaking changes: None - this restores the intended behavior

This ensures URLs only get processed with appropriate configs, preventing
issues like HTML pages being processed with PDF extraction strategies.
2025-08-03 16:50:54 +08:00
Yaoda Liu
438a103b17 Fix typos in examples.md 2025-08-03 14:33:10 +08:00
ntohidi
a03e68fa2f feat: Add URL-specific crawler configurations for multi-URL crawling
Implement dynamic configuration selection based on URL patterns to optimize crawling for different content types. This feature enables users to apply different crawling strategies (PDF extraction, content filtering, JavaScript execution) based on URL matching patterns.

Key additions:
- Add url_matcher and match_mode parameters to CrawlerRunConfig
- Implement is_match() method supporting string patterns, functions, and mixed lists
- Add MatchMode enum for OR/AND logic when combining multiple matchers
- Update AsyncWebCrawler.arun_many() to accept List[CrawlerRunConfig]
- Add select_config() method to dispatchers for runtime config selection
- First matching config wins, with fallback to default

Pattern matching supports:
- Glob-style strings: *.pdf, */blog/*, *api*
- Lambda functions: lambda url: 'github.com' in url
- Mixed patterns with AND/OR logic for complex matching

This enables optimal per-URL configuration:
- PDFs: Use PDFContentScrapingStrategy without JavaScript
- Blogs: Apply content filtering to reduce noise
- APIs: Skip JavaScript, use JSON extraction
- Dynamic sites: Execute only necessary JavaScript

Breaking changes: None - fully backward compatible
2025-08-02 19:10:36 +08:00
Nasrin
864d87afb2 Merge pull request #1339 from charlaie/fix-sitemap-redirect
Fix: URL Seeder sitemap redirect
2025-07-31 15:21:03 +08:00
Charlie C
508b6fc233 fix: Enable following redirects in sitemap fetching for seeder 2025-07-31 12:06:10 +08:00
UncleCode
e3281935bc fix: Add write permissions for GitHub release creation 2025-07-25 18:22:45 +08:00
unclecode
805c498adf docs: add simple anti-bot examples
- Add simple_anti_bot_examples.py with minimal code examples
- Demonstrates stealth mode, undetected browser, and combined usage
- Clean examples without logging for easy reference

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 17:05:35 +08:00
unclecode
6a728cbe5b feat: add stealth mode and enhance undetected browser support
- Add playwright-stealth integration with enable_stealth parameter in BrowserConfig
- Merge undetected browser strategy into main async_crawler_strategy.py using adapter pattern
- Add browser adapters (BrowserAdapter, PlaywrightAdapter, UndetectedAdapter) for flexible browser switching
- Update install.py to install both playwright and patchright browsers automatically
- Add comprehensive documentation for anti-bot features (stealth mode + undetected browser)
- Create examples demonstrating stealth mode usage and comparison tests
- Update pyproject.toml and requirements.txt with patchright>=1.49.0 and other dependencies
- Remove duplicate/unused dependencies (alphashape, cssselect, pyperclip, shapely, selenium)
- Add dependency checker tool in tests/check_dependencies.py

Breaking changes: None - all existing functionality preserved

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 16:59:10 +08:00
unclecode
5c33cbcca2 feat: add undetected browser support with adapter pattern 2025-07-14 17:29:50 +08:00
UncleCode
7b80eb6b99 docs: Add missing documentation pages to mkdocs.yml
- Added Adaptive Crawling to Core section
- Added URL Seeding to Core section
- Added Adaptive Strategies to Advanced section
2025-07-12 19:55:35 +08:00
69 changed files with 7952 additions and 1443 deletions

View File

@@ -8,6 +8,8 @@ on:
jobs:
release:
runs-on: ubuntu-latest
permissions:
contents: write # Required for creating releases
steps:
- name: Checkout code
@@ -95,12 +97,10 @@ jobs:
platforms: linux/amd64,linux/arm64
- name: Create GitHub Release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
uses: softprops/action-gh-release@v2
with:
tag_name: v${{ steps.get_version.outputs.VERSION }}
release_name: Release v${{ steps.get_version.outputs.VERSION }}
name: Release v${{ steps.get_version.outputs.VERSION }}
body: |
## 🎉 Crawl4AI v${{ steps.get_version.outputs.VERSION }} Released!
@@ -121,6 +121,7 @@ jobs:
See [CHANGELOG.md](https://github.com/${{ github.repository }}/blob/main/CHANGELOG.md) for details.
draft: false
prerelease: false
token: ${{ secrets.GITHUB_TOKEN }}
- name: Summary
run: |

View File

@@ -21,6 +21,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- **Flexible LLM Provider Configuration** (Docker):
- Support for `LLM_PROVIDER` environment variable to override default provider
- Per-request provider override via optional `provider` parameter in API endpoints
- Automatic provider validation with clear error messages
- Updated Docker documentation and examples
### Changed
- **WebScrapingStrategy Refactoring**: Simplified content scraping architecture
- `WebScrapingStrategy` is now an alias for `LXMLWebScrapingStrategy` for backward compatibility
- Removed redundant BeautifulSoup-based implementation (~1000 lines of code)
- `LXMLWebScrapingStrategy` now inherits directly from `ContentScrapingStrategy`
- All existing code using `WebScrapingStrategy` continues to work without modification
- Default scraping strategy remains `LXMLWebScrapingStrategy` for optimal performance
### Added
- **AsyncUrlSeeder**: High-performance URL discovery system for intelligent crawling at scale
- Discover URLs from sitemaps and Common Crawl index

View File

@@ -618,16 +618,16 @@ Read the full details in our [0.7.0 Release Notes](https://docs.crawl4ai.com/blo
# Process results
raw_df = pd.DataFrame()
for result in results:
if result.success and result.media["tables"]:
if result.success and result.tables:
raw_df = pd.DataFrame(
result.media["tables"][0]["rows"],
columns=result.media["tables"][0]["headers"],
result.tables[0]["rows"],
columns=result.tables[0]["headers"],
)
break
print(raw_df.head())
finally:
await crawler.stop()
await crawler.close()
```
- **🚀 Browser Pooling**: Pages launch hot with pre-warmed browser instances for lower latency and memory usage

View File

@@ -3,12 +3,12 @@ import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig and VirtualScrollConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig, MatchMode
from .content_scraping_strategy import (
ContentScrapingStrategy,
WebScrapingStrategy,
LXMLWebScrapingStrategy,
WebScrapingStrategy, # Backward compatibility alias
)
from .async_logger import (
AsyncLoggerBase,
@@ -88,6 +88,13 @@ from .script import (
ErrorDetail
)
# Browser Adapters
from .browser_adapter import (
BrowserAdapter,
PlaywrightAdapter,
UndetectedAdapter
)
from .utils import (
start_colab_display_server,
setup_colab_environment
@@ -132,6 +139,7 @@ __all__ = [
"CrawlResult",
"CrawlerHub",
"CacheMode",
"MatchMode",
"ContentScrapingStrategy",
"WebScrapingStrategy",
"LXMLWebScrapingStrategy",
@@ -173,6 +181,10 @@ __all__ = [
"CompilationResult",
"ValidationResult",
"ErrorDetail",
# Browser Adapters
"BrowserAdapter",
"PlaywrightAdapter",
"UndetectedAdapter",
"LinkPreviewConfig"
]

View File

@@ -18,17 +18,24 @@ from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy, LXMLWebScrapingStrategy
from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy
from .deep_crawling import DeepCrawlStrategy
from .cache_context import CacheMode
from .proxy_strategy import ProxyRotationStrategy
from typing import Union, List
from typing import Union, List, Callable
import inspect
from typing import Any, Dict, Optional
from enum import Enum
# Type alias for URL matching
UrlMatcher = Union[str, Callable[[str], bool], List[Union[str, Callable[[str], bool]]]]
class MatchMode(Enum):
OR = "or"
AND = "and"
# from .proxy_strategy import ProxyConfig
@@ -383,6 +390,8 @@ class BrowserConfig:
light_mode (bool): Disables certain background features for performance gains. Default: False.
extra_args (list): Additional command-line arguments passed to the browser.
Default: [].
enable_stealth (bool): If True, applies playwright-stealth to bypass basic bot detection.
Cannot be used with use_undetected browser mode. Default: False.
"""
def __init__(
@@ -423,6 +432,7 @@ class BrowserConfig:
extra_args: list = None,
debugging_port: int = 9222,
host: str = "localhost",
enable_stealth: bool = False,
):
self.browser_type = browser_type
self.headless = headless
@@ -438,6 +448,10 @@ class BrowserConfig:
self.chrome_channel = ""
self.proxy = proxy
self.proxy_config = proxy_config
if isinstance(self.proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(self.proxy_config)
if isinstance(self.proxy_config, str):
self.proxy_config = ProxyConfig.from_string(self.proxy_config)
self.viewport_width = viewport_width
@@ -463,6 +477,7 @@ class BrowserConfig:
self.verbose = verbose
self.debugging_port = debugging_port
self.host = host
self.enable_stealth = enable_stealth
fa_user_agenr_generator = ValidUAGenerator()
if self.user_agent_mode == "random":
@@ -494,6 +509,13 @@ class BrowserConfig:
# If persistent context is requested, ensure managed browser is enabled
if self.use_persistent_context:
self.use_managed_browser = True
# Validate stealth configuration
if self.enable_stealth and self.use_managed_browser and self.browser_mode == "builtin":
raise ValueError(
"enable_stealth cannot be used with browser_mode='builtin'. "
"Stealth mode requires a dedicated browser instance."
)
@staticmethod
def from_kwargs(kwargs: dict) -> "BrowserConfig":
@@ -530,6 +552,7 @@ class BrowserConfig:
extra_args=kwargs.get("extra_args", []),
debugging_port=kwargs.get("debugging_port", 9222),
host=kwargs.get("host", "localhost"),
enable_stealth=kwargs.get("enable_stealth", False),
)
def to_dict(self):
@@ -564,6 +587,7 @@ class BrowserConfig:
"verbose": self.verbose,
"debugging_port": self.debugging_port,
"host": self.host,
"enable_stealth": self.enable_stealth,
}
@@ -862,7 +886,7 @@ class CrawlerRunConfig():
parser_type (str): Type of parser to use for HTML parsing.
Default: "lxml".
scraping_strategy (ContentScrapingStrategy): Scraping strategy to use.
Default: WebScrapingStrategy.
Default: LXMLWebScrapingStrategy.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
@@ -1113,6 +1137,9 @@ class CrawlerRunConfig():
link_preview_config: Union[LinkPreviewConfig, Dict[str, Any]] = None,
# Virtual Scroll Parameters
virtual_scroll_config: Union[VirtualScrollConfig, Dict[str, Any]] = None,
# URL Matching Parameters
url_matcher: Optional[UrlMatcher] = None,
match_mode: MatchMode = MatchMode.OR,
# Experimental Parameters
experimental: Dict[str, Any] = None,
):
@@ -1136,6 +1163,11 @@ class CrawlerRunConfig():
self.parser_type = parser_type
self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy()
self.proxy_config = proxy_config
if isinstance(proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(proxy_config)
if isinstance(proxy_config, str):
self.proxy_config = ProxyConfig.from_string(proxy_config)
self.proxy_rotation_strategy = proxy_rotation_strategy
# Browser Location and Identity Parameters
@@ -1266,6 +1298,10 @@ class CrawlerRunConfig():
else:
raise ValueError("virtual_scroll_config must be VirtualScrollConfig object or dict")
# URL Matching Parameters
self.url_matcher = url_matcher
self.match_mode = match_mode
# Experimental Parameters
self.experimental = experimental or {}
@@ -1321,6 +1357,51 @@ class CrawlerRunConfig():
if "compilation error" not in str(e).lower():
raise ValueError(f"Failed to compile C4A script: {str(e)}")
raise
def is_match(self, url: str) -> bool:
"""Check if this config matches the given URL.
Args:
url: The URL to check against this config's matcher
Returns:
bool: True if this config should be used for the URL or if no matcher is set.
"""
if self.url_matcher is None:
return True
if callable(self.url_matcher):
# Single function matcher
return self.url_matcher(url)
elif isinstance(self.url_matcher, str):
# Single pattern string
from fnmatch import fnmatch
return fnmatch(url, self.url_matcher)
elif isinstance(self.url_matcher, list):
# List of mixed matchers
if not self.url_matcher: # Empty list
return False
results = []
for matcher in self.url_matcher:
if callable(matcher):
results.append(matcher(url))
elif isinstance(matcher, str):
from fnmatch import fnmatch
results.append(fnmatch(url, matcher))
else:
# Skip invalid matchers
continue
# Apply match mode logic
if self.match_mode == MatchMode.OR:
return any(results) if results else False
else: # AND mode
return all(results) if results else False
return False
def __getattr__(self, name):
@@ -1443,6 +1524,9 @@ class CrawlerRunConfig():
# Link Extraction Parameters
link_preview_config=kwargs.get("link_preview_config"),
url=kwargs.get("url"),
# URL Matching Parameters
url_matcher=kwargs.get("url_matcher"),
match_mode=kwargs.get("match_mode", MatchMode.OR),
# Experimental Parameters
experimental=kwargs.get("experimental"),
)
@@ -1540,6 +1624,8 @@ class CrawlerRunConfig():
"deep_crawl_strategy": self.deep_crawl_strategy,
"link_preview_config": self.link_preview_config.to_dict() if self.link_preview_config else None,
"url": self.url,
"url_matcher": self.url_matcher,
"match_mode": self.match_mode,
"experimental": self.experimental,
}

File diff suppressed because it is too large Load Diff

View File

@@ -21,6 +21,7 @@ from .async_logger import AsyncLogger
from .ssl_certificate import SSLCertificate
from .user_agent_generator import ValidUAGenerator
from .browser_manager import BrowserManager
from .browser_adapter import BrowserAdapter, PlaywrightAdapter, UndetectedAdapter
import aiofiles
import aiohttp
@@ -71,7 +72,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
def __init__(
self, browser_config: BrowserConfig = None, logger: AsyncLogger = None, **kwargs
self, browser_config: BrowserConfig = None, logger: AsyncLogger = None, browser_adapter: BrowserAdapter = None, **kwargs
):
"""
Initialize the AsyncPlaywrightCrawlerStrategy with a browser configuration.
@@ -80,11 +81,16 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
browser_config (BrowserConfig): Configuration object containing browser settings.
If None, will be created from kwargs for backwards compatibility.
logger: Logger instance for recording events and errors.
browser_adapter (BrowserAdapter): Browser adapter for handling browser-specific operations.
If None, defaults to PlaywrightAdapter.
**kwargs: Additional arguments for backwards compatibility and extending functionality.
"""
# Initialize browser config, either from provided object or kwargs
self.browser_config = browser_config or BrowserConfig.from_kwargs(kwargs)
self.logger = logger
# Initialize browser adapter
self.adapter = browser_adapter or PlaywrightAdapter()
# Initialize session management
self._downloaded_files = []
@@ -104,7 +110,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Initialize browser manager with config
self.browser_manager = BrowserManager(
browser_config=self.browser_config, logger=self.logger
browser_config=self.browser_config,
logger=self.logger,
use_undetected=isinstance(self.adapter, UndetectedAdapter)
)
async def __aenter__(self):
@@ -322,7 +330,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
try:
result = await page.evaluate(wrapper_js)
result = await self.adapter.evaluate(page, wrapper_js)
return result
except Exception as e:
if "Error evaluating condition" in str(e):
@@ -367,7 +375,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Replace the iframe with a div containing the extracted content
_iframe = iframe_content.replace("`", "\\`")
await page.evaluate(
await self.adapter.evaluate(page,
f"""
() => {{
const iframe = document.getElementById('iframe-{i}');
@@ -628,91 +636,16 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
page.on("requestfailed", handle_request_failed_capture)
# Console Message Capturing
handle_console = None
handle_error = None
if config.capture_console_messages:
def handle_console_capture(msg):
try:
message_type = "unknown"
try:
message_type = msg.type
except:
pass
message_text = "unknown"
try:
message_text = msg.text
except:
pass
# Basic console message with minimal content
entry = {
"type": message_type,
"text": message_text,
"timestamp": time.time()
}
captured_console.append(entry)
except Exception as e:
if self.logger:
self.logger.warning(f"Error capturing console message: {e}", tag="CAPTURE")
# Still add something to the list even on error
captured_console.append({
"type": "console_capture_error",
"error": str(e),
"timestamp": time.time()
})
def handle_pageerror_capture(err):
try:
error_message = "Unknown error"
try:
error_message = err.message
except:
pass
error_stack = ""
try:
error_stack = err.stack
except:
pass
captured_console.append({
"type": "error",
"text": error_message,
"stack": error_stack,
"timestamp": time.time()
})
except Exception as e:
if self.logger:
self.logger.warning(f"Error capturing page error: {e}", tag="CAPTURE")
captured_console.append({
"type": "pageerror_capture_error",
"error": str(e),
"timestamp": time.time()
})
# Add event listeners directly
page.on("console", handle_console_capture)
page.on("pageerror", handle_pageerror_capture)
# Set up console capture using adapter
handle_console = await self.adapter.setup_console_capture(page, captured_console)
handle_error = await self.adapter.setup_error_capture(page, captured_console)
# Set up console logging if requested
if config.log_console:
def log_consol(
msg, console_log_type="debug"
): # Corrected the parameter syntax
if console_log_type == "error":
self.logger.error(
message=f"Console error: {msg}", # Use f-string for variable interpolation
tag="CONSOLE"
)
elif console_log_type == "debug":
self.logger.debug(
message=f"Console: {msg}", # Use f-string for variable interpolation
tag="CONSOLE"
)
page.on("console", log_consol)
page.on("pageerror", lambda e: log_consol(e, "error"))
# Note: For undetected browsers, console logging won't work directly
# but captured messages can still be logged after retrieval
try:
# Get SSL certificate information if requested and URL is HTTPS
@@ -998,7 +931,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await page.wait_for_load_state("domcontentloaded", timeout=5)
except PlaywrightTimeoutError:
pass
await page.evaluate(update_image_dimensions_js)
await self.adapter.evaluate(page, update_image_dimensions_js)
except Exception as e:
self.logger.error(
message="Error updating image dimensions: {error}",
@@ -1027,7 +960,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
for selector in selectors:
try:
content = await page.evaluate(
content = await self.adapter.evaluate(page,
f"""Array.from(document.querySelectorAll("{selector}"))
.map(el => el.outerHTML)
.join('')"""
@@ -1085,6 +1018,11 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await asyncio.sleep(delay)
return await page.content()
# For undetected browsers, retrieve console messages before returning
if config.capture_console_messages and hasattr(self.adapter, 'retrieve_console_messages'):
final_messages = await self.adapter.retrieve_console_messages(page)
captured_console.extend(final_messages)
# Return complete response
return AsyncCrawlResponse(
html=html,
@@ -1123,8 +1061,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
page.remove_listener("response", handle_response_capture)
page.remove_listener("requestfailed", handle_request_failed_capture)
if config.capture_console_messages:
page.remove_listener("console", handle_console_capture)
page.remove_listener("pageerror", handle_pageerror_capture)
# Retrieve any final console messages for undetected browsers
if hasattr(self.adapter, 'retrieve_console_messages'):
final_messages = await self.adapter.retrieve_console_messages(page)
captured_console.extend(final_messages)
# Clean up console capture
await self.adapter.cleanup_console_capture(page, handle_console, handle_error)
# Close the page
await page.close()
@@ -1354,7 +1297,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
# Execute virtual scroll capture
result = await page.evaluate(virtual_scroll_js, config.to_dict())
result = await self.adapter.evaluate(page, virtual_scroll_js, config.to_dict())
if result.get("replaced", False):
self.logger.success(
@@ -1438,7 +1381,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
remove_overlays_js = load_js_script("remove_overlay_elements")
try:
await page.evaluate(
await self.adapter.evaluate(page,
f"""
(() => {{
try {{
@@ -1843,7 +1786,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.
# """
result = await page.evaluate(
result = await self.adapter.evaluate(page,
f"""
(async () => {{
try {{
@@ -1965,7 +1908,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
for script in scripts:
try:
# Execute the script and wait for network idle
result = await page.evaluate(
result = await self.adapter.evaluate(page,
f"""
(() => {{
return new Promise((resolve) => {{
@@ -2049,7 +1992,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Returns:
Boolean indicating visibility
"""
return await page.evaluate(
return await self.adapter.evaluate(page,
"""
() => {
const element = document.body;
@@ -2090,7 +2033,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Dict containing scroll status and position information
"""
try:
result = await page.evaluate(
result = await self.adapter.evaluate(page,
f"""() => {{
try {{
const startX = window.scrollX;
@@ -2147,7 +2090,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Returns:
Dict containing width and height of the page
"""
return await page.evaluate(
return await self.adapter.evaluate(page,
"""
() => {
const {scrollWidth, scrollHeight} = document.documentElement;
@@ -2167,7 +2110,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
bool: True if page needs scrolling
"""
try:
need_scroll = await page.evaluate(
need_scroll = await self.adapter.evaluate(page,
"""
() => {
const scrollHeight = document.documentElement.scrollHeight;
@@ -2186,265 +2129,3 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return True # Default to scrolling if check fails
####################################################################################################
# HTTP Crawler Strategy
####################################################################################################
class HTTPCrawlerError(Exception):
"""Base error class for HTTP crawler specific exceptions"""
pass
class ConnectionTimeoutError(HTTPCrawlerError):
"""Raised when connection timeout occurs"""
pass
class HTTPStatusError(HTTPCrawlerError):
"""Raised for unexpected status codes"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
super().__init__(f"HTTP {status_code}: {message}")
class AsyncHTTPCrawlerStrategy(AsyncCrawlerStrategy):
"""
Fast, lightweight HTTP-only crawler strategy optimized for memory efficiency.
"""
__slots__ = ('logger', 'max_connections', 'dns_cache_ttl', 'chunk_size', '_session', 'hooks', 'browser_config')
DEFAULT_TIMEOUT: Final[int] = 30
DEFAULT_CHUNK_SIZE: Final[int] = 64 * 1024
DEFAULT_MAX_CONNECTIONS: Final[int] = min(32, (os.cpu_count() or 1) * 4)
DEFAULT_DNS_CACHE_TTL: Final[int] = 300
VALID_SCHEMES: Final = frozenset({'http', 'https', 'file', 'raw'})
_BASE_HEADERS: Final = MappingProxyType({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def __init__(
self,
browser_config: Optional[HTTPCrawlerConfig] = None,
logger: Optional[AsyncLogger] = None,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
dns_cache_ttl: int = DEFAULT_DNS_CACHE_TTL,
chunk_size: int = DEFAULT_CHUNK_SIZE
):
"""Initialize the HTTP crawler with config"""
self.browser_config = browser_config or HTTPCrawlerConfig()
self.logger = logger
self.max_connections = max_connections
self.dns_cache_ttl = dns_cache_ttl
self.chunk_size = chunk_size
self._session: Optional[aiohttp.ClientSession] = None
self.hooks = {
k: partial(self._execute_hook, k)
for k in ('before_request', 'after_request', 'on_error')
}
# Set default hooks
self.set_hook('before_request', lambda *args, **kwargs: None)
self.set_hook('after_request', lambda *args, **kwargs: None)
self.set_hook('on_error', lambda *args, **kwargs: None)
async def __aenter__(self) -> AsyncHTTPCrawlerStrategy:
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@contextlib.asynccontextmanager
async def _session_context(self):
try:
if not self._session:
await self.start()
yield self._session
finally:
pass
def set_hook(self, hook_type: str, hook_func: Callable) -> None:
if hook_type in self.hooks:
self.hooks[hook_type] = partial(self._execute_hook, hook_type, hook_func)
else:
raise ValueError(f"Invalid hook type: {hook_type}")
async def _execute_hook(
self,
hook_type: str,
hook_func: Callable,
*args: Any,
**kwargs: Any
) -> Any:
if asyncio.iscoroutinefunction(hook_func):
return await hook_func(*args, **kwargs)
return hook_func(*args, **kwargs)
async def start(self) -> None:
if not self._session:
connector = aiohttp.TCPConnector(
limit=self.max_connections,
ttl_dns_cache=self.dns_cache_ttl,
use_dns_cache=True,
force_close=False
)
self._session = aiohttp.ClientSession(
headers=dict(self._BASE_HEADERS),
connector=connector,
timeout=ClientTimeout(total=self.DEFAULT_TIMEOUT)
)
async def close(self) -> None:
if self._session and not self._session.closed:
try:
await asyncio.wait_for(self._session.close(), timeout=5.0)
except asyncio.TimeoutError:
if self.logger:
self.logger.warning(
message="Session cleanup timed out",
tag="CLEANUP"
)
finally:
self._session = None
async def _stream_file(self, path: str) -> AsyncGenerator[memoryview, None]:
async with aiofiles.open(path, mode='rb') as f:
while chunk := await f.read(self.chunk_size):
yield memoryview(chunk)
async def _handle_file(self, path: str) -> AsyncCrawlResponse:
if not os.path.exists(path):
raise FileNotFoundError(f"Local file not found: {path}")
chunks = []
async for chunk in self._stream_file(path):
chunks.append(chunk.tobytes().decode('utf-8', errors='replace'))
return AsyncCrawlResponse(
html=''.join(chunks),
response_headers={},
status_code=200
)
async def _handle_raw(self, content: str) -> AsyncCrawlResponse:
return AsyncCrawlResponse(
html=content,
response_headers={},
status_code=200
)
async def _handle_http(
self,
url: str,
config: CrawlerRunConfig
) -> AsyncCrawlResponse:
async with self._session_context() as session:
timeout = ClientTimeout(
total=config.page_timeout or self.DEFAULT_TIMEOUT,
connect=10,
sock_read=30
)
headers = dict(self._BASE_HEADERS)
if self.browser_config.headers:
headers.update(self.browser_config.headers)
request_kwargs = {
'timeout': timeout,
'allow_redirects': self.browser_config.follow_redirects,
'ssl': self.browser_config.verify_ssl,
'headers': headers
}
if self.browser_config.method == "POST":
if self.browser_config.data:
request_kwargs['data'] = self.browser_config.data
if self.browser_config.json:
request_kwargs['json'] = self.browser_config.json
await self.hooks['before_request'](url, request_kwargs)
try:
async with session.request(self.browser_config.method, url, **request_kwargs) as response:
content = memoryview(await response.read())
if not (200 <= response.status < 300):
raise HTTPStatusError(
response.status,
f"Unexpected status code for {url}"
)
encoding = response.charset
if not encoding:
encoding = chardet.detect(content.tobytes())['encoding'] or 'utf-8'
result = AsyncCrawlResponse(
html=content.tobytes().decode(encoding, errors='replace'),
response_headers=dict(response.headers),
status_code=response.status,
redirected_url=str(response.url)
)
await self.hooks['after_request'](result)
return result
except aiohttp.ServerTimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except aiohttp.ClientConnectorError as e:
await self.hooks['on_error'](e)
raise ConnectionError(f"Connection failed: {str(e)}")
except aiohttp.ClientError as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP client error: {str(e)}")
except asyncio.exceptions.TimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except Exception as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP request failed: {str(e)}")
async def crawl(
self,
url: str,
config: Optional[CrawlerRunConfig] = None,
**kwargs
) -> AsyncCrawlResponse:
config = config or CrawlerRunConfig.from_kwargs(kwargs)
parsed = urlparse(url)
scheme = parsed.scheme.rstrip('/')
if scheme not in self.VALID_SCHEMES:
raise ValueError(f"Unsupported URL scheme: {scheme}")
try:
if scheme == 'file':
return await self._handle_file(parsed.path)
elif scheme == 'raw':
return await self._handle_raw(parsed.path)
else: # http or https
return await self._handle_http(url, config)
except Exception as e:
if self.logger:
self.logger.error(
message="Crawl failed: {error}",
tag="CRAWL",
params={"error": str(e), "url": url}
)
raise

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional, List, Tuple
from typing import Dict, Optional, List, Tuple, Union
from .async_configs import CrawlerRunConfig
from .models import (
CrawlResult,
@@ -22,6 +22,8 @@ from urllib.parse import urlparse
import random
from abc import ABC, abstractmethod
from .memory_utils import get_true_memory_usage_percent
class RateLimiter:
def __init__(
@@ -96,11 +98,37 @@ class BaseDispatcher(ABC):
self.rate_limiter = rate_limiter
self.monitor = monitor
def select_config(self, url: str, configs: Union[CrawlerRunConfig, List[CrawlerRunConfig]]) -> Optional[CrawlerRunConfig]:
"""Select the appropriate config for a given URL.
Args:
url: The URL to match against
configs: Single config or list of configs to choose from
Returns:
The matching config, or None if no match found
"""
# Single config - return as is
if isinstance(configs, CrawlerRunConfig):
return configs
# Empty list - return None
if not configs:
return None
# Find first matching config
for config in configs:
if config.is_match(url):
return config
# No match found - return None to indicate URL should be skipped
return None
@abstractmethod
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
monitor: Optional[CrawlerMonitor] = None,
) -> CrawlerTaskResult:
@@ -111,7 +139,7 @@ class BaseDispatcher(ABC):
self,
urls: List[str],
crawler: AsyncWebCrawler, # noqa: F821
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
monitor: Optional[CrawlerMonitor] = None,
) -> List[CrawlerTaskResult]:
pass
@@ -147,7 +175,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
async def _memory_monitor_task(self):
"""Background task to continuously monitor memory usage and update state"""
while True:
self.current_memory_percent = psutil.virtual_memory().percent
self.current_memory_percent = get_true_memory_usage_percent()
# Enter memory pressure mode if we cross the threshold
if self.current_memory_percent >= self.memory_threshold_percent:
@@ -200,7 +228,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
retry_count: int = 0,
) -> CrawlerTaskResult:
@@ -208,6 +236,37 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
error_message = ""
memory_usage = peak_memory = 0.0
# Select appropriate config for this URL
selected_config = self.select_config(url, config)
# If no config matches, return failed result
if selected_config is None:
error_message = f"No matching configuration found for URL: {url}"
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.FAILED,
error_message=error_message
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=CrawlResult(
url=url,
html="",
metadata={"status": "no_config_match"},
success=False,
error_message=error_message
),
memory_usage=0,
peak_memory=0,
start_time=start_time,
end_time=time.time(),
error_message=error_message,
retry_count=retry_count
)
# Get starting memory for accurate measurement
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
@@ -257,8 +316,8 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
retry_count=retry_count + 1
)
# Execute the crawl
result = await self.crawler.arun(url, config=config, session_id=task_id)
# Execute the crawl with selected config
result = await self.crawler.arun(url, config=selected_config, session_id=task_id)
# Measure memory usage
end_memory = process.memory_info().rss / (1024 * 1024)
@@ -316,7 +375,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self,
urls: List[str],
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> List[CrawlerTaskResult]:
self.crawler = crawler
@@ -470,7 +529,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self,
urls: List[str],
crawler: AsyncWebCrawler,
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> AsyncGenerator[CrawlerTaskResult, None]:
self.crawler = crawler
@@ -572,7 +631,7 @@ class SemaphoreDispatcher(BaseDispatcher):
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
semaphore: asyncio.Semaphore = None,
) -> CrawlerTaskResult:
@@ -580,6 +639,36 @@ class SemaphoreDispatcher(BaseDispatcher):
error_message = ""
memory_usage = peak_memory = 0.0
# Select appropriate config for this URL
selected_config = self.select_config(url, config)
# If no config matches, return failed result
if selected_config is None:
error_message = f"No matching configuration found for URL: {url}"
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.FAILED,
error_message=error_message
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=CrawlResult(
url=url,
html="",
metadata={"status": "no_config_match"},
success=False,
error_message=error_message
),
memory_usage=0,
peak_memory=0,
start_time=start_time,
end_time=time.time(),
error_message=error_message
)
try:
if self.monitor:
self.monitor.update_task(
@@ -592,7 +681,7 @@ class SemaphoreDispatcher(BaseDispatcher):
async with semaphore:
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
result = await self.crawler.arun(url, config=config, session_id=task_id)
result = await self.crawler.arun(url, config=selected_config, session_id=task_id)
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
@@ -654,7 +743,7 @@ class SemaphoreDispatcher(BaseDispatcher):
self,
crawler: AsyncWebCrawler, # noqa: F821
urls: List[str],
config: CrawlerRunConfig,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> List[CrawlerTaskResult]:
self.crawler = crawler
if self.monitor:

View File

@@ -829,7 +829,7 @@ class AsyncUrlSeeder:
async def _iter_sitemap(self, url: str):
try:
r = await self.client.get(url, timeout=15)
r = await self.client.get(url, timeout=15, follow_redirects=True)
r.raise_for_status()
except httpx.HTTPStatusError as e:
self._log("warning", "Failed to fetch sitemap {url}: HTTP {status_code}",

View File

@@ -653,7 +653,7 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[Union[CrawlerRunConfig, List[CrawlerRunConfig]]] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
# word_count_threshold=MIN_WORD_THRESHOLD,
@@ -674,7 +674,9 @@ class AsyncWebCrawler:
Args:
urls: List of URLs to crawl
config: Configuration object controlling crawl behavior for all URLs
config: Configuration object(s) controlling crawl behavior. Can be:
- Single CrawlerRunConfig: Used for all URLs
- List[CrawlerRunConfig]: Configs with url_matcher for URL-specific settings
dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher
[other parameters maintained for backwards compatibility]
@@ -739,7 +741,11 @@ class AsyncWebCrawler:
or task_result.result
)
stream = config.stream
# Handle stream setting - use first config's stream setting if config is a list
if isinstance(config, list):
stream = config[0].stream if config else False
else:
stream = config.stream
if stream:

293
crawl4ai/browser_adapter.py Normal file
View File

@@ -0,0 +1,293 @@
# browser_adapter.py
"""
Browser adapter for Crawl4AI to support both Playwright and undetected browsers
with minimal changes to existing codebase.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
import time
import json
# Import both, but use conditionally
try:
from playwright.async_api import Page
except ImportError:
Page = Any
try:
from patchright.async_api import Page as UndetectedPage
except ImportError:
UndetectedPage = Any
class BrowserAdapter(ABC):
"""Abstract adapter for browser-specific operations"""
@abstractmethod
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Execute JavaScript in the page"""
pass
@abstractmethod
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console message capturing, returns handler function if needed"""
pass
@abstractmethod
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capturing, returns handler function if needed"""
pass
@abstractmethod
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Retrieve captured console messages (for undetected browsers)"""
pass
@abstractmethod
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up console event listeners"""
pass
@abstractmethod
def get_imports(self) -> tuple:
"""Get the appropriate imports for this adapter"""
pass
class PlaywrightAdapter(BrowserAdapter):
"""Adapter for standard Playwright"""
async def evaluate(self, page: Page, expression: str, arg: Any = None) -> Any:
"""Standard Playwright evaluate"""
if arg is not None:
return await page.evaluate(expression, arg)
return await page.evaluate(expression)
async def setup_console_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using Playwright's event system"""
def handle_console_capture(msg):
try:
message_type = "unknown"
try:
message_type = msg.type
except:
pass
message_text = "unknown"
try:
message_text = msg.text
except:
pass
entry = {
"type": message_type,
"text": message_text,
"timestamp": time.time()
}
captured_console.append(entry)
except Exception as e:
captured_console.append({
"type": "console_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("console", handle_console_capture)
return handle_console_capture
async def setup_error_capture(self, page: Page, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using Playwright's event system"""
def handle_pageerror_capture(err):
try:
error_message = "Unknown error"
try:
error_message = err.message
except:
pass
error_stack = ""
try:
error_stack = err.stack
except:
pass
captured_console.append({
"type": "error",
"text": error_message,
"stack": error_stack,
"timestamp": time.time()
})
except Exception as e:
captured_console.append({
"type": "pageerror_capture_error",
"error": str(e),
"timestamp": time.time()
})
page.on("pageerror", handle_pageerror_capture)
return handle_pageerror_capture
async def retrieve_console_messages(self, page: Page) -> List[Dict]:
"""Not needed for Playwright - messages are captured via events"""
return []
async def cleanup_console_capture(self, page: Page, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Remove event listeners"""
if handle_console:
page.remove_listener("console", handle_console)
if handle_error:
page.remove_listener("pageerror", handle_error)
def get_imports(self) -> tuple:
"""Return Playwright imports"""
from playwright.async_api import Page, Error
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError
class UndetectedAdapter(BrowserAdapter):
"""Adapter for undetected browser automation with stealth features"""
def __init__(self):
self._console_script_injected = {}
async def evaluate(self, page: UndetectedPage, expression: str, arg: Any = None) -> Any:
"""Undetected browser evaluate with isolated context"""
# For most evaluations, use isolated context for stealth
# Only use non-isolated when we need to access our injected console capture
isolated = not (
"__console" in expression or
"__captured" in expression or
"__error" in expression or
"window.__" in expression
)
if arg is not None:
return await page.evaluate(expression, arg, isolated_context=isolated)
return await page.evaluate(expression, isolated_context=isolated)
async def setup_console_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup console capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Initialize console capture
window.__capturedConsole = [];
window.__capturedErrors = [];
// Store original console methods
const originalConsole = {};
['log', 'info', 'warn', 'error', 'debug'].forEach(method => {
originalConsole[method] = console[method];
console[method] = function(...args) {
try {
window.__capturedConsole.push({
type: method,
text: args.map(arg => {
try {
if (typeof arg === 'object') {
return JSON.stringify(arg);
}
return String(arg);
} catch (e) {
return '[Object]';
}
}).join(' '),
timestamp: Date.now()
});
} catch (e) {
// Fail silently to avoid detection
}
// Call original method
originalConsole[method].apply(console, args);
};
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def setup_error_capture(self, page: UndetectedPage, captured_console: List[Dict]) -> Optional[Callable]:
"""Setup error capture using JavaScript injection for undetected browsers"""
if not self._console_script_injected.get(page, False):
await page.add_init_script("""
// Capture errors
window.addEventListener('error', (event) => {
try {
window.__capturedErrors.push({
type: 'error',
text: event.message,
stack: event.error ? event.error.stack : '',
filename: event.filename,
lineno: event.lineno,
colno: event.colno,
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
// Capture unhandled promise rejections
window.addEventListener('unhandledrejection', (event) => {
try {
window.__capturedErrors.push({
type: 'unhandledrejection',
text: event.reason ? String(event.reason) : 'Unhandled Promise Rejection',
stack: event.reason && event.reason.stack ? event.reason.stack : '',
timestamp: Date.now()
});
} catch (e) {
// Fail silently
}
});
""")
self._console_script_injected[page] = True
return None # No handler function needed for undetected browser
async def retrieve_console_messages(self, page: UndetectedPage) -> List[Dict]:
"""Retrieve captured console messages and errors from the page"""
messages = []
try:
# Get console messages
console_messages = await page.evaluate(
"() => { const msgs = window.__capturedConsole || []; window.__capturedConsole = []; return msgs; }",
isolated_context=False
)
messages.extend(console_messages)
# Get errors
errors = await page.evaluate(
"() => { const errs = window.__capturedErrors || []; window.__capturedErrors = []; return errs; }",
isolated_context=False
)
messages.extend(errors)
# Convert timestamps from JS to Python format
for msg in messages:
if 'timestamp' in msg and isinstance(msg['timestamp'], (int, float)):
msg['timestamp'] = msg['timestamp'] / 1000.0 # Convert from ms to seconds
except Exception:
# If retrieval fails, return empty list
pass
return messages
async def cleanup_console_capture(self, page: UndetectedPage, handle_console: Optional[Callable], handle_error: Optional[Callable]):
"""Clean up for undetected browser - retrieve final messages"""
# For undetected browser, we don't have event listeners to remove
# but we should retrieve any final messages
final_messages = await self.retrieve_console_messages(page)
return final_messages
def get_imports(self) -> tuple:
"""Return undetected browser imports"""
from patchright.async_api import Page, Error
from patchright.async_api import TimeoutError as PlaywrightTimeoutError
return Page, Error, PlaywrightTimeoutError

View File

@@ -573,21 +573,26 @@ class BrowserManager:
_playwright_instance = None
@classmethod
async def get_playwright(cls):
from playwright.async_api import async_playwright
async def get_playwright(cls, use_undetected: bool = False):
if use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
cls._playwright_instance = await async_playwright().start()
return cls._playwright_instance
def __init__(self, browser_config: BrowserConfig, logger=None):
def __init__(self, browser_config: BrowserConfig, logger=None, use_undetected: bool = False):
"""
Initialize the BrowserManager with a browser configuration.
Args:
browser_config (BrowserConfig): Configuration object containing all browser settings
logger: Logger instance for recording events and errors
use_undetected (bool): Whether to use undetected browser (Patchright)
"""
self.config: BrowserConfig = browser_config
self.logger = logger
self.use_undetected = use_undetected
# Browser state
self.browser = None
@@ -601,7 +606,11 @@ class BrowserManager:
# Keep track of contexts by a "config signature," so each unique config reuses a single context
self.contexts_by_config = {}
self._contexts_lock = asyncio.Lock()
self._contexts_lock = asyncio.Lock()
# Stealth-related attributes
self._stealth_instance = None
self._stealth_cm = None
# Initialize ManagedBrowser if needed
if self.config.use_managed_browser:
@@ -630,9 +639,21 @@ class BrowserManager:
if self.playwright is not None:
await self.close()
from playwright.async_api import async_playwright
if self.use_undetected:
from patchright.async_api import async_playwright
else:
from playwright.async_api import async_playwright
self.playwright = await async_playwright().start()
# Initialize playwright with or without stealth
if self.config.enable_stealth and not self.use_undetected:
# Import stealth only when needed
from playwright_stealth import Stealth
# Use the recommended stealth wrapper approach
self._stealth_instance = Stealth()
self._stealth_cm = self._stealth_instance.use_async(async_playwright())
self.playwright = await self._stealth_cm.__aenter__()
else:
self.playwright = await async_playwright().start()
if self.config.cdp_url or self.config.use_managed_browser:
self.config.use_managed_browser = True
@@ -1094,5 +1115,19 @@ class BrowserManager:
self.managed_browser = None
if self.playwright:
await self.playwright.stop()
# Handle stealth context manager cleanup if it exists
if hasattr(self, '_stealth_cm') and self._stealth_cm is not None:
try:
await self._stealth_cm.__aexit__(None, None, None)
except Exception as e:
if self.logger:
self.logger.error(
message="Error closing stealth context: {error}",
tag="ERROR",
params={"error": str(e)}
)
self._stealth_cm = None
self._stealth_instance = None
else:
await self.playwright.stop()
self.playwright = None

View File

@@ -98,20 +98,20 @@ class ContentScrapingStrategy(ABC):
pass
class WebScrapingStrategy(ContentScrapingStrategy):
class LXMLWebScrapingStrategy(ContentScrapingStrategy):
"""
Class for web content scraping. Perhaps the most important class.
How it works:
1. Extract content from HTML using BeautifulSoup.
2. Clean the extracted content using a content cleaning strategy.
3. Filter the cleaned content using a content filtering strategy.
4. Generate markdown content from the filtered content.
5. Return the markdown content.
LXML-based implementation for fast web content scraping.
This is the primary scraping strategy in Crawl4AI, providing high-performance
HTML parsing and content extraction using the lxml library.
Note: WebScrapingStrategy is now an alias for this class to maintain
backward compatibility.
"""
def __init__(self, logger=None):
self.logger = logger
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
def _log(self, level, message, tag="SCRAPE", **kwargs):
"""Helper method to safely use logger."""
@@ -132,7 +132,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
ScrapingResult: A structured result containing the scraped content.
"""
actual_url = kwargs.get("redirected_url", url)
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
raw_result = self._scrap(actual_url, html, **kwargs)
if raw_result is None:
return ScrapingResult(
cleaned_html="",
@@ -196,376 +196,9 @@ class WebScrapingStrategy(ContentScrapingStrategy):
Returns:
ScrapingResult: A structured result containing the scraped content.
"""
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
return await asyncio.to_thread(self.scrap, url, html, **kwargs)
def is_data_table(self, table: Tag, **kwargs) -> bool:
"""
Determine if a table element is a data table (not a layout table).
Args:
table (Tag): BeautifulSoup Tag representing a table element
**kwargs: Additional keyword arguments including table_score_threshold
Returns:
bool: True if the table is a data table, False otherwise
"""
score = 0
# Check for thead and tbody
has_thead = len(table.select('thead')) > 0
has_tbody = len(table.select('tbody')) > 0
if has_thead:
score += 2
if has_tbody:
score += 1
# Check for th elements
th_count = len(table.select('th'))
if th_count > 0:
score += 2
if has_thead or len(table.select('tr:first-child th')) > 0:
score += 1
# Check for nested tables
if len(table.select('table')) > 0:
score -= 3
# Role attribute check
role = table.get('role', '').lower()
if role in {'presentation', 'none'}:
score -= 3
# Column consistency
rows = table.select('tr')
if not rows:
return False
col_counts = [len(row.select('td, th')) for row in rows]
avg_cols = sum(col_counts) / len(col_counts)
variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts)
if variance < 1:
score += 2
# Caption and summary
if table.select('caption'):
score += 2
if table.has_attr('summary') and table['summary']:
score += 1
# Text density
total_text = sum(len(cell.get_text().strip()) for row in rows for cell in row.select('td, th'))
total_tags = sum(1 for _ in table.descendants if isinstance(_, Tag))
text_ratio = total_text / (total_tags + 1e-5)
if text_ratio > 20:
score += 3
elif text_ratio > 10:
score += 2
# Data attributes
data_attrs = sum(1 for attr in table.attrs if attr.startswith('data-'))
score += data_attrs * 0.5
# Size check
if avg_cols >= 2 and len(rows) >= 2:
score += 2
threshold = kwargs.get('table_score_threshold', 7)
return score >= threshold
def extract_table_data(self, table: Tag) -> dict:
"""
Extract structured data from a table element.
Args:
table (Tag): BeautifulSoup Tag representing a table element
Returns:
dict: Dictionary containing table data (headers, rows, caption, summary)
"""
caption_elem = table.select_one('caption')
caption = caption_elem.get_text().strip() if caption_elem else ""
summary = table.get('summary', '').strip()
# Extract headers with colspan handling
headers = []
thead_rows = table.select('thead tr')
if thead_rows:
header_cells = thead_rows[0].select('th')
for cell in header_cells:
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
else:
first_row = table.select('tr:first-child')
if first_row:
for cell in first_row[0].select('th, td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
headers.extend([text] * colspan)
# Extract rows with colspan handling
rows = []
all_rows = table.select('tr')
thead = table.select_one('thead')
tbody_rows = []
if thead:
thead_rows = thead.select('tr')
tbody_rows = [row for row in all_rows if row not in thead_rows]
else:
if all_rows and all_rows[0].select('th'):
tbody_rows = all_rows[1:]
else:
tbody_rows = all_rows
for row in tbody_rows:
# for row in table.select('tr:not(:has(ancestor::thead))'):
row_data = []
for cell in row.select('td'):
text = cell.get_text().strip()
colspan = int(cell.get('colspan', 1))
row_data.extend([text] * colspan)
if row_data:
rows.append(row_data)
# Align rows with headers
max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0)
aligned_rows = []
for row in rows:
aligned = row[:max_columns] + [''] * (max_columns - len(row))
aligned_rows.append(aligned)
if not headers:
headers = [f"Column {i+1}" for i in range(max_columns)]
return {
"headers": headers,
"rows": aligned_rows,
"caption": caption,
"summary": summary,
}
def flatten_nested_elements(self, node):
"""
Flatten nested elements in a HTML tree.
Args:
node (Tag): The root node of the HTML tree.
Returns:
Tag: The flattened HTML tree.
"""
if isinstance(node, NavigableString):
return node
if (
len(node.contents) == 1
and isinstance(node.contents[0], Tag)
and node.contents[0].name == node.name
):
return self.flatten_nested_elements(node.contents[0])
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
return node
def find_closest_parent_with_useful_text(self, tag, **kwargs):
"""
Find the closest parent with useful text.
Args:
tag (Tag): The starting tag to search from.
**kwargs: Additional keyword arguments.
Returns:
Tag: The closest parent with useful text, or None if not found.
"""
image_description_min_word_threshold = kwargs.get(
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
)
current_tag = tag
while current_tag:
current_tag = current_tag.parent
# Get the text content of the parent tag
if current_tag:
text_content = current_tag.get_text(separator=" ", strip=True)
# Check if the text content has at least word_count_threshold
if len(text_content.split()) >= image_description_min_word_threshold:
return text_content
return None
def remove_unwanted_attributes(
self, element, important_attrs, keep_data_attributes=False
):
"""
Remove unwanted attributes from an HTML element.
Args:
element (Tag): The HTML element to remove attributes from.
important_attrs (list): List of important attributes to keep.
keep_data_attributes (bool): Whether to keep data attributes.
Returns:
None
"""
attrs_to_remove = []
for attr in element.attrs:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith("data-"):
attrs_to_remove.append(attr)
else:
attrs_to_remove.append(attr)
for attr in attrs_to_remove:
del element[attr]
def process_image(self, img, url, index, total_images, **kwargs):
"""
Process an image element.
How it works:
1. Check if the image has valid display and inside undesired html elements.
2. Score an image for it's usefulness.
3. Extract image file metadata to extract size and extension.
4. Generate a dictionary with the processed image information.
5. Return the processed image information.
Args:
img (Tag): The image element to process.
url (str): The URL of the page containing the image.
index (int): The index of the image in the list of images.
total_images (int): The total number of images in the list.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the processed image information.
"""
# parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
# if ' ' in u else None}
# for u in [f"http{p}" for p in s.split("http") if p]]
# Constants for checks
classes_to_check = frozenset(["button", "icon", "logo"])
tags_to_check = frozenset(["button", "input"])
image_formats = frozenset(["jpg", "jpeg", "png", "webp", "avif", "gif"])
# Pre-fetch commonly used attributes
style = img.get("style", "")
alt = img.get("alt", "")
src = img.get("src", "")
data_src = img.get("data-src", "")
srcset = img.get("srcset", "")
data_srcset = img.get("data-srcset", "")
width = img.get("width")
height = img.get("height")
parent = img.parent
parent_classes = parent.get("class", [])
# Quick validation checks
if (
"display:none" in style
or parent.name in tags_to_check
or any(c in cls for c in parent_classes for cls in classes_to_check)
or any(c in src for c in classes_to_check)
or any(c in alt for c in classes_to_check)
):
return None
# Quick score calculation
score = 0
if width and width.isdigit():
width_val = int(width)
score += 1 if width_val > 150 else 0
if height and height.isdigit():
height_val = int(height)
score += 1 if height_val > 150 else 0
if alt:
score += 1
score += index / total_images < 0.5
# image_format = ''
# if "data:image/" in src:
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0]
# else:
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0]
# if image_format in ('jpg', 'png', 'webp', 'avif'):
# score += 1
# Check for image format in all possible sources
def has_image_format(url):
return any(fmt in url.lower() for fmt in image_formats)
# Score for having proper image sources
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]):
score += 1
if srcset or data_srcset:
score += 1
if img.find_parent("picture"):
score += 1
# Detect format from any available source
detected_format = None
for url in [src, data_src, srcset, data_srcset]:
if url:
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
if format_matches:
detected_format = format_matches[0]
break
if score <= kwargs.get("image_score_threshold", IMAGE_SCORE_THRESHOLD):
return None
# Use set for deduplication
unique_urls = set()
image_variants = []
# Generate a unique group ID for this set of variants
group_id = index
# Base image info template
base_info = {
"alt": alt,
"desc": self.find_closest_parent_with_useful_text(img, **kwargs),
"score": score,
"type": "image",
"group_id": group_id, # Group ID for this set of variants
"format": detected_format,
}
# Inline function for adding variants
def add_variant(src, width=None):
if src and not src.startswith("data:") and src not in unique_urls:
unique_urls.add(src)
image_variants.append({**base_info, "src": src, "width": width})
# Process all sources
add_variant(src)
add_variant(data_src)
# Handle srcset and data-srcset in one pass
for attr in ("srcset", "data-srcset"):
if value := img.get(attr):
for source in parse_srcset(value):
add_variant(source["url"], source["width"])
# Quick picture element check
if picture := img.find_parent("picture"):
for source in picture.find_all("source"):
if srcset := source.get("srcset"):
for src in parse_srcset(srcset):
add_variant(src["url"], src["width"])
# Framework-specific attributes in one pass
for attr, value in img.attrs.items():
if (
attr.startswith("data-")
and ("src" in attr or "srcset" in attr)
and "http" in value
):
add_variant(value)
return image_variants if image_variants else None
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
def process_element(self, url, element: lhtml.HtmlElement, **kwargs) -> Dict[str, Any]:
"""
Process an HTML element.
@@ -577,7 +210,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
Args:
url (str): The URL of the page containing the element.
element (Tag): The HTML element to process.
element (lhtml.HtmlElement): The HTML element to process.
**kwargs: Additional keyword arguments.
Returns:
@@ -595,514 +228,6 @@ class WebScrapingStrategy(ContentScrapingStrategy):
"external_links_dict": external_links_dict,
}
def _process_element(
self,
url,
element: PageElement,
media: Dict[str, Any],
internal_links_dict: Dict[str, Any],
external_links_dict: Dict[str, Any],
**kwargs,
) -> bool:
"""
Process an HTML element.
"""
try:
if isinstance(element, NavigableString):
if isinstance(element, Comment):
element.extract()
return False
# if element.name == 'img':
# process_image(element, url, 0, 1)
# return True
base_domain = kwargs.get("base_domain", get_base_domain(url))
if element.name in ["script", "style", "link", "meta", "noscript"]:
element.decompose()
return False
keep_element = False
# Special case for table elements - always preserve structure
if element.name in ["tr", "td", "th"]:
keep_element = True
exclude_domains = kwargs.get("exclude_domains", [])
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS))
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
# exclude_social_media_domains = list(set(exclude_social_media_domains))
try:
if element.name == "a" and element.get("href"):
href = element.get("href", "").strip()
if not href: # Skip empty hrefs
return False
# url_base = url.split("/")[2]
# Normalize the URL
try:
normalized_href = normalize_url(href, url)
except ValueError:
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}")
return False
link_data = {
"href": normalized_href,
"text": element.get_text().strip(),
"title": element.get("title", "").strip(),
"base_domain": base_domain,
}
is_external = is_external_url(normalized_href, base_domain)
keep_element = True
# Handle external link exclusions
if is_external:
link_base_domain = get_base_domain(normalized_href)
link_data["base_domain"] = link_base_domain
if kwargs.get("exclude_external_links", False):
element.decompose()
return False
# elif kwargs.get('exclude_social_media_links', False):
# if link_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains):
# element.decompose()
# return False
elif exclude_domains:
if link_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
if is_external:
if normalized_href not in external_links_dict:
external_links_dict[normalized_href] = link_data
else:
if kwargs.get("exclude_internal_links", False):
element.decompose()
return False
if normalized_href not in internal_links_dict:
internal_links_dict[normalized_href] = link_data
except Exception as e:
raise Exception(f"Error processing links: {str(e)}")
try:
if element.name == "img":
potential_sources = [
"src",
"data-src",
"srcset" "data-lazy-src",
"data-original",
]
src = element.get("src", "")
while not src and potential_sources:
src = element.get(potential_sources.pop(0), "")
if not src:
element.decompose()
return False
# If it is srcset pick up the first image
if "srcset" in element.attrs:
src = element.attrs["srcset"].split(",")[0].split(" ")[0]
# If image src is internal, then skip
if not is_external_url(src, base_domain):
return True
image_src_base_domain = get_base_domain(src)
# Check flag if we should remove external images
if kwargs.get("exclude_external_images", False):
# Handle relative URLs (which are always from the same domain)
if not src.startswith('http') and not src.startswith('//'):
return True # Keep relative URLs
# For absolute URLs, compare the base domains using the existing function
src_base_domain = get_base_domain(src)
url_base_domain = get_base_domain(url)
# If the domains don't match and both are valid, the image is external
if src_base_domain and url_base_domain and src_base_domain != url_base_domain:
element.decompose()
return False
# if kwargs.get('exclude_social_media_links', False):
# if image_src_base_domain in exclude_social_media_domains:
# element.decompose()
# return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if any(domain in src for domain in exclude_social_media_domains):
# element.decompose()
# return False
# Handle exclude domains
if exclude_domains:
if image_src_base_domain in exclude_domains:
element.decompose()
return False
# if any(domain in src for domain in kwargs.get('exclude_domains', [])):
# element.decompose()
# return False
return True # Always keep image elements
except Exception:
raise "Error processing images"
# Check if flag to remove all forms is set
if kwargs.get("remove_forms", False) and element.name == "form":
element.decompose()
return False
if element.name in ["video", "audio"]:
media[f"{element.name}s"].append(
{
"src": element.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
source_tags = element.find_all("source")
for source_tag in source_tags:
media[f"{element.name}s"].append(
{
"src": source_tag.get("src"),
"alt": element.get("alt"),
"type": element.name,
"description": self.find_closest_parent_with_useful_text(
element, **kwargs
),
}
)
return True # Always keep video and audio elements
if element.name in ONLY_TEXT_ELIGIBLE_TAGS:
if kwargs.get("only_text", False):
element.replace_with(element.get_text())
try:
self.remove_unwanted_attributes(
element, IMPORTANT_ATTRS + kwargs.get("keep_attrs", []) , kwargs.get("keep_data_attributes", False)
)
except Exception as e:
# print('Error removing unwanted attributes:', str(e))
self._log(
"error",
message="Error removing unwanted attributes: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
# Process children
for child in list(element.children):
if isinstance(child, NavigableString) and not isinstance(
child, Comment
):
if len(child.strip()) > 0:
keep_element = True
else:
if self._process_element(
url,
child,
media,
internal_links_dict,
external_links_dict,
**kwargs,
):
keep_element = True
# Check word count
word_count_threshold = kwargs.get(
"word_count_threshold", MIN_WORD_THRESHOLD
)
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
if not keep_element:
element.decompose()
return keep_element
except Exception as e:
# print('Error processing element:', str(e))
self._log(
"error",
message="Error processing element: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
return False
def _scrap(
self,
url: str,
html: str,
word_count_threshold: int = MIN_WORD_THRESHOLD,
css_selector: str = None,
target_elements: List[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Extract content from HTML using BeautifulSoup.
Args:
url (str): The URL of the page to scrape.
html (str): The HTML content of the page to scrape.
word_count_threshold (int): The minimum word count threshold for content extraction.
css_selector (str): The CSS selector to use for content extraction.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the extracted content.
"""
success = True
if not html:
return None
parser_type = kwargs.get("parser", "lxml")
soup = BeautifulSoup(html, parser_type)
body = soup.body
if body is None:
raise Exception("'<body>' tag is not found in fetched html. Consider adding wait_for=\"css:body\" to wait for body tag to be loaded into DOM.")
base_domain = get_base_domain(url)
# Early removal of all images if exclude_all_images is set
# This happens before any processing to minimize memory usage
if kwargs.get("exclude_all_images", False):
for img in body.find_all('img'):
img.decompose()
try:
meta = extract_metadata("", soup)
except Exception as e:
self._log(
"error",
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
meta = {}
# Handle tag-based removal first - faster than CSS selection
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if excluded_tags:
for element in body.find_all(lambda tag: tag.name in excluded_tags):
element.extract()
# Handle CSS selector-based removal
excluded_selector = kwargs.get("excluded_selector", "")
if excluded_selector:
is_single_selector = (
"," not in excluded_selector and " " not in excluded_selector
)
if is_single_selector:
while element := body.select_one(excluded_selector):
element.extract()
else:
for element in body.select(excluded_selector):
element.extract()
content_element = None
if target_elements:
try:
for_content_targeted_element = []
for target_element in target_elements:
for_content_targeted_element.extend(body.select(target_element))
content_element = soup.new_tag("div")
for el in for_content_targeted_element:
content_element.append(copy.deepcopy(el))
except Exception as e:
self._log("error", f"Error with target element detection: {str(e)}", "SCRAPE")
return None
else:
content_element = body
kwargs["exclude_social_media_domains"] = set(
kwargs.get("exclude_social_media_domains", []) + SOCIAL_MEDIA_DOMAINS
)
kwargs["exclude_domains"] = set(kwargs.get("exclude_domains", []))
if kwargs.get("exclude_social_media_links", False):
kwargs["exclude_domains"] = kwargs["exclude_domains"].union(
kwargs["exclude_social_media_domains"]
)
result_obj = self.process_element(
url,
body,
word_count_threshold=word_count_threshold,
base_domain=base_domain,
**kwargs,
)
links = {"internal": [], "external": []}
media = result_obj["media"]
internal_links_dict = result_obj["internal_links_dict"]
external_links_dict = result_obj["external_links_dict"]
# Update the links dictionary with unique links
links["internal"] = list(internal_links_dict.values())
links["external"] = list(external_links_dict.values())
# Extract head content for links if configured
link_preview_config = kwargs.get("link_preview_config")
if link_preview_config is not None:
try:
import asyncio
from .link_preview import LinkPreview
from .models import Links, Link
verbose = link_preview_config.verbose
if verbose:
self._log("info", "Starting link head extraction for {internal} internal and {external} external links",
params={"internal": len(links["internal"]), "external": len(links["external"])}, tag="LINK_EXTRACT")
# Convert dict links to Link objects
internal_links = [Link(**link_data) for link_data in links["internal"]]
external_links = [Link(**link_data) for link_data in links["external"]]
links_obj = Links(internal=internal_links, external=external_links)
# Create a config object for LinkPreview
class TempCrawlerRunConfig:
def __init__(self, link_config, score_links):
self.link_preview_config = link_config
self.score_links = score_links
config = TempCrawlerRunConfig(link_preview_config, kwargs.get("score_links", False))
# Extract head content (run async operation in sync context)
async def extract_links():
async with LinkPreview(self.logger) as extractor:
return await extractor.extract_link_heads(links_obj, config)
# Run the async operation
try:
# Check if we're already in an async context
loop = asyncio.get_running_loop()
# If we're in an async context, we need to run in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, extract_links())
updated_links = future.result()
except RuntimeError:
# No running loop, we can use asyncio.run directly
updated_links = asyncio.run(extract_links())
# Convert back to dict format
links["internal"] = [link.dict() for link in updated_links.internal]
links["external"] = [link.dict() for link in updated_links.external]
if verbose:
successful_internal = len([l for l in updated_links.internal if l.head_extraction_status == "valid"])
successful_external = len([l for l in updated_links.external if l.head_extraction_status == "valid"])
self._log("info", "Link head extraction completed: {internal_success}/{internal_total} internal, {external_success}/{external_total} external",
params={
"internal_success": successful_internal,
"internal_total": len(updated_links.internal),
"external_success": successful_external,
"external_total": len(updated_links.external)
}, tag="LINK_EXTRACT")
else:
self._log("info", "Link head extraction completed successfully", tag="LINK_EXTRACT")
except Exception as e:
self._log("error", f"Link head extraction failed: {str(e)}", tag="LINK_EXTRACT")
# Continue with original links if extraction fails
# # Process images using ThreadPoolExecutor
imgs = body.find_all("img")
media["images"] = [
img
for result in (
self.process_image(img, url, i, len(imgs), **kwargs)
for i, img in enumerate(imgs)
)
if result is not None
for img in result
]
# Process tables if not excluded
excluded_tags = set(kwargs.get("excluded_tags", []) or [])
if 'table' not in excluded_tags:
tables = body.find_all('table')
for table in tables:
if self.is_data_table(table, **kwargs):
table_data = self.extract_table_data(table)
media["tables"].append(table_data)
body = self.flatten_nested_elements(body)
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
for img in imgs:
src = img.get("src", "")
if base64_pattern.match(src):
# Replace base64 data with empty string
img["src"] = base64_pattern.sub("", src)
str_body = ""
try:
str_body = content_element.encode_contents().decode("utf-8")
except Exception:
# Reset body to the original HTML
success = False
body = BeautifulSoup(html, "html.parser")
# Create a new div with a special ID
error_div = body.new_tag("div", id="crawl4ai_error_message")
error_div.string = """
Crawl4AI Error: This page is not fully supported.
Possible reasons:
1. The page may have restrictions that prevent crawling.
2. The page might not be fully loaded.
Suggestions:
- Try calling the crawl function with these parameters:
magic=True,
- Set headless=False to visualize what's happening on the page.
If the issue persists, please check the page's structure and any potential anti-crawling measures.
"""
# Append the error div to the body
body.append(error_div)
str_body = body.encode_contents().decode("utf-8")
print(
"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details."
)
self._log(
"error",
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
tag="SCRAPE",
)
cleaned_html = str_body.replace("\n\n", "\n").replace(" ", " ")
return {
"cleaned_html": cleaned_html,
"success": success,
"media": media,
"links": links,
"metadata": meta,
}
class LXMLWebScrapingStrategy(WebScrapingStrategy):
def __init__(self, logger=None):
super().__init__(logger)
self.DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
self.BASE64_PATTERN = re.compile(r'data:image/[^;]+;base64,([^"]+)')
def _process_element(
self,
url: str,
@@ -1862,3 +987,7 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
"links": {"internal": [], "external": []},
"metadata": {},
}
# Backward compatibility alias
WebScrapingStrategy = LXMLWebScrapingStrategy

View File

@@ -119,6 +119,32 @@ def install_playwright():
logger.warning(
f"Please run '{sys.executable} -m playwright install --with-deps' manually after the installation."
)
# Install Patchright browsers for undetected browser support
logger.info("Installing Patchright browsers for undetected mode...", tag="INIT")
try:
subprocess.check_call(
[
sys.executable,
"-m",
"patchright",
"install",
"--with-deps",
"--force",
"chromium",
]
)
logger.success(
"Patchright installation completed successfully.", tag="COMPLETE"
)
except subprocess.CalledProcessError:
logger.warning(
f"Please run '{sys.executable} -m patchright install --with-deps' manually after the installation."
)
except Exception:
logger.warning(
f"Please run '{sys.executable} -m patchright install --with-deps' manually after the installation."
)
def run_migration():

View File

@@ -11,7 +11,7 @@ from .extraction_strategy import *
from .crawler_strategy import *
from typing import List
from concurrent.futures import ThreadPoolExecutor
from .content_scraping_strategy import WebScrapingStrategy
from ..content_scraping_strategy import LXMLWebScrapingStrategy as WebScrapingStrategy
from .config import *
import warnings
import json

79
crawl4ai/memory_utils.py Normal file
View File

@@ -0,0 +1,79 @@
import psutil
import platform
import subprocess
from typing import Tuple
def get_true_available_memory_gb() -> float:
"""Get truly available memory including inactive pages (cross-platform)"""
vm = psutil.virtual_memory()
if platform.system() == 'Darwin': # macOS
# On macOS, we need to include inactive memory too
try:
# Use vm_stat to get accurate values
result = subprocess.run(['vm_stat'], capture_output=True, text=True)
lines = result.stdout.split('\n')
page_size = 16384 # macOS page size
pages = {}
for line in lines:
if 'Pages free:' in line:
pages['free'] = int(line.split()[-1].rstrip('.'))
elif 'Pages inactive:' in line:
pages['inactive'] = int(line.split()[-1].rstrip('.'))
elif 'Pages speculative:' in line:
pages['speculative'] = int(line.split()[-1].rstrip('.'))
elif 'Pages purgeable:' in line:
pages['purgeable'] = int(line.split()[-1].rstrip('.'))
# Calculate total available (free + inactive + speculative + purgeable)
total_available_pages = (
pages.get('free', 0) +
pages.get('inactive', 0) +
pages.get('speculative', 0) +
pages.get('purgeable', 0)
)
available_gb = (total_available_pages * page_size) / (1024**3)
return available_gb
except:
# Fallback to psutil
return vm.available / (1024**3)
else:
# For Windows and Linux, psutil.available is accurate
return vm.available / (1024**3)
def get_true_memory_usage_percent() -> float:
"""
Get memory usage percentage that accounts for platform differences.
Returns:
float: Memory usage percentage (0-100)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
# Calculate used percentage based on truly available memory
used_percent = 100.0 * (total_gb - available_gb) / total_gb
# Ensure it's within valid range
return max(0.0, min(100.0, used_percent))
def get_memory_stats() -> Tuple[float, float, float]:
"""
Get comprehensive memory statistics.
Returns:
Tuple[float, float, float]: (used_percent, available_gb, total_gb)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
used_percent = get_true_memory_usage_percent()
return used_percent, available_gb, total_gb

View File

@@ -1056,7 +1056,7 @@ Your output must:
</output_requirements>
"""
GENERATE_SCRIPT_PROMPT = """You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
GENERATE_SCRIPT_PROMPT = r"""You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
Your scripts run **before the crawl** to handle dynamic content, user interactions, and other obstacles. You are a master of two tools: raw **JavaScript** and the high-level **Crawl4ai Script (c4a)**.

View File

@@ -23,8 +23,9 @@ SeedingConfig = Union['SeedingConfigType']
# Content scraping types
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
WebScrapingStrategy = Union['WebScrapingStrategyType']
LXMLWebScrapingStrategy = Union['LXMLWebScrapingStrategyType']
# Backward compatibility alias
WebScrapingStrategy = Union['LXMLWebScrapingStrategyType']
# Proxy types
ProxyRotationStrategy = Union['ProxyRotationStrategyType']
@@ -114,7 +115,6 @@ if TYPE_CHECKING:
# Content scraping imports
from .content_scraping_strategy import (
ContentScrapingStrategy as ContentScrapingStrategyType,
WebScrapingStrategy as WebScrapingStrategyType,
LXMLWebScrapingStrategy as LXMLWebScrapingStrategyType,
)

View File

@@ -1517,8 +1517,29 @@ def extract_metadata_using_lxml(html, doc=None):
head = head[0]
# Title - using XPath
# title = head.xpath(".//title/text()")
# metadata["title"] = title[0].strip() if title else None
# === Title Extraction - New Approach ===
# Attempt to extract <title> using XPath
title = head.xpath(".//title/text()")
metadata["title"] = title[0].strip() if title else None
title = title[0] if title else None
# Fallback: Use .find() in case XPath fails due to malformed HTML
if not title:
title_el = doc.find(".//title")
title = title_el.text if title_el is not None else None
# Final fallback: Use OpenGraph or Twitter title if <title> is missing or empty
if not title:
title_candidates = (
doc.xpath("//meta[@property='og:title']/@content") or
doc.xpath("//meta[@name='twitter:title']/@content")
)
title = title_candidates[0] if title_candidates else None
# Strip and assign title
metadata["title"] = title.strip() if title else None
# Meta description - using XPath with multiple attribute conditions
description = head.xpath('.//meta[@name="description"]/@content')

View File

@@ -5,4 +5,9 @@ ANTHROPIC_API_KEY=your_anthropic_key_here
GROQ_API_KEY=your_groq_key_here
TOGETHER_API_KEY=your_together_key_here
MISTRAL_API_KEY=your_mistral_key_here
GEMINI_API_TOKEN=your_gemini_key_here
GEMINI_API_TOKEN=your_gemini_key_here
# Optional: Override the default LLM provider
# Examples: "openai/gpt-4", "anthropic/claude-3-opus", "deepseek/chat", etc.
# If not set, uses the provider specified in config.yml (default: openai/gpt-4o-mini)
# LLM_PROVIDER=anthropic/claude-3-opus

View File

@@ -154,6 +154,29 @@ cp deploy/docker/.llm.env.example .llm.env
# Now edit .llm.env and add your API keys
```
**Flexible LLM Provider Configuration:**
The Docker setup now supports flexible LLM provider configuration through three methods:
1. **Environment Variable** (Highest Priority): Set `LLM_PROVIDER` to override the default
```bash
export LLM_PROVIDER="anthropic/claude-3-opus"
# Or in your .llm.env file:
# LLM_PROVIDER=anthropic/claude-3-opus
```
2. **API Request Parameter**: Specify provider per request
```json
{
"url": "https://example.com",
"provider": "groq/mixtral-8x7b"
}
```
3. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
The system automatically selects the appropriate API key based on the provider.
#### 3. Build and Run with Compose
The `docker-compose.yml` file in the project root provides a simplified approach that automatically handles architecture detection using buildx.
@@ -668,7 +691,7 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini"
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored

View File

@@ -40,7 +40,9 @@ from utils import (
get_base_url,
is_task_id,
should_cleanup_task,
decode_redis_hash
decode_redis_hash,
get_llm_api_key,
validate_llm_provider
)
import psutil, time
@@ -89,10 +91,12 @@ async def handle_llm_qa(
Answer:"""
# api_token=os.environ.get(config["llm"].get("api_key_env", ""))
response = perform_completion_with_backoff(
provider=config["llm"]["provider"],
prompt_with_variables=prompt,
api_token=os.environ.get(config["llm"].get("api_key_env", ""))
api_token=get_llm_api_key(config)
)
return response.choices[0].message.content
@@ -110,19 +114,23 @@ async def process_llm_extraction(
url: str,
instruction: str,
schema: Optional[str] = None,
cache: str = "0"
cache: str = "0",
provider: Optional[str] = None
) -> None:
"""Process LLM extraction in background."""
try:
# If config['llm'] has api_key then ignore the api_key_env
api_key = ""
if "api_key" in config["llm"]:
api_key = config["llm"]["api_key"]
else:
api_key = os.environ.get(config["llm"].get("api_key_env", None), "")
# Validate provider
is_valid, error_msg = validate_llm_provider(config, provider)
if not is_valid:
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.FAILED,
"error": error_msg
})
return
api_key = get_llm_api_key(config, provider)
llm_strategy = LLMExtractionStrategy(
llm_config=LLMConfig(
provider=config["llm"]["provider"],
provider=provider or config["llm"]["provider"],
api_token=api_key
),
instruction=instruction,
@@ -169,10 +177,19 @@ async def handle_markdown_request(
filter_type: FilterType,
query: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None
config: Optional[dict] = None,
provider: Optional[str] = None
) -> str:
"""Handle markdown generation requests."""
try:
# Validate provider if using LLM filter
if filter_type == FilterType.LLM:
is_valid, error_msg = validate_llm_provider(config, provider)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg
)
decoded_url = unquote(url)
if not decoded_url.startswith(('http://', 'https://')):
decoded_url = 'https://' + decoded_url
@@ -185,8 +202,8 @@ async def handle_markdown_request(
FilterType.BM25: BM25ContentFilter(user_query=query or ""),
FilterType.LLM: LLMContentFilter(
llm_config=LLMConfig(
provider=config["llm"]["provider"],
api_token=os.environ.get(config["llm"].get("api_key_env", None), ""),
provider=provider or config["llm"]["provider"],
api_token=get_llm_api_key(config, provider),
),
instruction=query or "Extract main content"
)
@@ -230,7 +247,8 @@ async def handle_llm_request(
query: Optional[str] = None,
schema: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None
config: Optional[dict] = None,
provider: Optional[str] = None
) -> JSONResponse:
"""Handle LLM extraction requests."""
base_url = get_base_url(request)
@@ -260,7 +278,8 @@ async def handle_llm_request(
schema,
cache,
base_url,
config
config,
provider
)
except Exception as e:
@@ -304,7 +323,8 @@ async def create_new_task(
schema: Optional[str],
cache: str,
base_url: str,
config: dict
config: dict,
provider: Optional[str] = None
) -> JSONResponse:
"""Create and initialize a new task."""
decoded_url = unquote(input_path)
@@ -328,7 +348,8 @@ async def create_new_task(
decoded_url,
query,
schema,
cache
cache,
provider
)
return JSONResponse({

View File

@@ -36,6 +36,7 @@ class LlmJobPayload(BaseModel):
q: str
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
class CrawlJobPayload(BaseModel):
@@ -61,6 +62,7 @@ async def llm_job_enqueue(
schema=payload.schema,
cache=payload.cache,
config=_config,
provider=payload.provider,
)

View File

@@ -15,6 +15,7 @@ class MarkdownRequest(BaseModel):
f: FilterType = Field(FilterType.FIT, description="Contentfilter strategy: fit, raw, bm25, or llm")
q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters")
c: Optional[str] = Field("0", description="Cachebust / revision counter")
provider: Optional[str] = Field(None, description="LLM provider override (e.g., 'anthropic/claude-3-opus')")
class RawCode(BaseModel):

View File

@@ -241,7 +241,7 @@ async def get_markdown(
raise HTTPException(
400, "URL must be absolute and start with http/https")
markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config
body.url, body.f, body.q, body.c, config, body.provider
)
return JSONResponse({
"url": body.url,

View File

@@ -1,6 +1,7 @@
import dns.resolver
import logging
import yaml
import os
from datetime import datetime
from enum import Enum
from pathlib import Path
@@ -19,10 +20,24 @@ class FilterType(str, Enum):
LLM = "llm"
def load_config() -> Dict:
"""Load and return application configuration."""
"""Load and return application configuration with environment variable overrides."""
config_path = Path(__file__).parent / "config.yml"
with open(config_path, "r") as config_file:
return yaml.safe_load(config_file)
config = yaml.safe_load(config_file)
# Override LLM provider from environment if set
llm_provider = os.environ.get("LLM_PROVIDER")
if llm_provider:
config["llm"]["provider"] = llm_provider
logging.info(f"LLM provider overridden from environment: {llm_provider}")
# Also support direct API key from environment if the provider-specific key isn't set
llm_api_key = os.environ.get("LLM_API_KEY")
if llm_api_key and "api_key" not in config["llm"]:
config["llm"]["api_key"] = llm_api_key
logging.info("LLM API key loaded from LLM_API_KEY environment variable")
return config
def setup_logging(config: Dict) -> None:
"""Configure application logging."""
@@ -56,6 +71,52 @@ def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> str:
"""Get the appropriate API key based on the LLM provider.
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The API key for the provider, or empty string if not found
"""
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Check if direct API key is configured
if "api_key" in config["llm"]:
return config["llm"]["api_key"]
# Fall back to the configured api_key_env if no match
return os.environ.get(config["llm"].get("api_key_env", ""), "")
def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple[bool, str]:
"""Validate that the LLM provider has an associated API key.
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
Tuple of (is_valid, error_message)
"""
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Get the API key for this provider
api_key = get_llm_api_key(config, provider)
if not api_key:
return False, f"No API key found for provider '{provider}'. Please set the appropriate environment variable."
return True, ""
def verify_email_domain(email: str) -> bool:
try:
domain = email.split('@')[1]

View File

@@ -14,6 +14,7 @@ x-base-config: &base-config
- TOGETHER_API_KEY=${TOGETHER_API_KEY:-}
- MISTRAL_API_KEY=${MISTRAL_API_KEY:-}
- GEMINI_API_TOKEN=${GEMINI_API_TOKEN:-}
- LLM_PROVIDER=${LLM_PROVIDER:-} # Optional: Override default provider (e.g., "anthropic/claude-3-opus")
volumes:
- /dev/shm:/dev/shm # Chromium performance
deploy:

View File

@@ -3,8 +3,8 @@ C4A-Script API Usage Examples
Shows how to use the new Result-based API in various scenarios
"""
from c4a_compile import compile, validate, compile_file
from c4a_result import CompilationResult, ValidationResult
from crawl4ai.script.c4a_compile import compile, validate, compile_file
from crawl4ai.script.c4a_result import CompilationResult, ValidationResult
import json

View File

@@ -3,7 +3,7 @@ C4A-Script Hello World
A concise example showing how to use the C4A-Script compiler
"""
from c4a_compile import compile
from crawl4ai.script.c4a_compile import compile
# Define your C4A-Script
script = """

View File

@@ -3,7 +3,7 @@ C4A-Script Hello World - Error Example
Shows how error handling works
"""
from c4a_compile import compile
from crawl4ai.script.c4a_compile import compile
# Define a script with an error (missing THEN)
script = """

View File

@@ -0,0 +1,303 @@
"""
🎯 Multi-Config URL Matching Demo
=================================
Learn how to use different crawler configurations for different URL patterns
in a single crawl batch with Crawl4AI's multi-config feature.
Part 1: Understanding URL Matching (Pattern Testing)
Part 2: Practical Example with Real Crawling
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
MatchMode
)
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
def print_section(title):
"""Print a formatted section header"""
print(f"\n{'=' * 60}")
print(f"{title}")
print(f"{'=' * 60}\n")
def test_url_matching(config, test_urls, config_name):
"""Test URL matching for a config and show results"""
print(f"Config: {config_name}")
print(f"Matcher: {config.url_matcher}")
if hasattr(config, 'match_mode'):
print(f"Mode: {config.match_mode.value}")
print("-" * 40)
for url in test_urls:
matches = config.is_match(url)
symbol = "" if matches else ""
print(f"{symbol} {url}")
print()
# ==============================================================================
# PART 1: Understanding URL Matching
# ==============================================================================
def demo_part1_pattern_matching():
"""Part 1: Learn how URL matching works without crawling"""
print_section("PART 1: Understanding URL Matching")
print("Let's explore different ways to match URLs with configs.\n")
# Test URLs we'll use throughout
test_urls = [
"https://example.com/report.pdf",
"https://example.com/data.json",
"https://example.com/blog/post-1",
"https://example.com/article/news",
"https://api.example.com/v1/users",
"https://example.com/about"
]
# 1.1 Simple String Pattern
print("1.1 Simple String Pattern Matching")
print("-" * 40)
pdf_config = CrawlerRunConfig(
url_matcher="*.pdf"
)
test_url_matching(pdf_config, test_urls, "PDF Config")
# 1.2 Multiple String Patterns
print("1.2 Multiple String Patterns (OR logic)")
print("-" * 40)
blog_config = CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*", "*/news/*"],
match_mode=MatchMode.OR # This is default, shown for clarity
)
test_url_matching(blog_config, test_urls, "Blog/Article Config")
# 1.3 Single Function Matcher
print("1.3 Function-based Matching")
print("-" * 40)
api_config = CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or url.endswith('.json')
)
test_url_matching(api_config, test_urls, "API Config")
# 1.4 List of Functions
print("1.4 Multiple Functions with AND Logic")
print("-" * 40)
# Must be HTTPS AND contain 'api' AND have version number
secure_api_config = CrawlerRunConfig(
url_matcher=[
lambda url: url.startswith('https://'),
lambda url: 'api' in url,
lambda url: '/v' in url # Version indicator
],
match_mode=MatchMode.AND
)
test_url_matching(secure_api_config, test_urls, "Secure API Config")
# 1.5 Mixed: String and Function Together
print("1.5 Mixed Patterns: String + Function")
print("-" * 40)
# Match JSON files OR any API endpoint
json_or_api_config = CrawlerRunConfig(
url_matcher=[
"*.json", # String pattern
lambda url: 'api' in url # Function
],
match_mode=MatchMode.OR
)
test_url_matching(json_or_api_config, test_urls, "JSON or API Config")
# 1.6 Complex: Multiple Strings + Multiple Functions
print("1.6 Complex Matcher: Mixed Types with AND Logic")
print("-" * 40)
# Must be: HTTPS AND (.com domain) AND (blog OR article) AND NOT a PDF
complex_config = CrawlerRunConfig(
url_matcher=[
lambda url: url.startswith('https://'), # Function: HTTPS check
"*.com/*", # String: .com domain
lambda url: any(pattern in url for pattern in ['/blog/', '/article/']), # Function: Blog OR article
lambda url: not url.endswith('.pdf') # Function: Not PDF
],
match_mode=MatchMode.AND
)
test_url_matching(complex_config, test_urls, "Complex Mixed Config")
print("\n✅ Key Takeaway: First matching config wins when passed to arun_many()!")
# ==============================================================================
# PART 2: Practical Multi-URL Crawling
# ==============================================================================
async def demo_part2_practical_crawling():
"""Part 2: Real-world example with different content types"""
print_section("PART 2: Practical Multi-URL Crawling")
print("Now let's see multi-config in action with real URLs.\n")
# Create specialized configs for different content types
configs = [
# Config 1: PDF documents - only match files ending with .pdf
CrawlerRunConfig(
url_matcher="*.pdf",
scraping_strategy=PDFContentScrapingStrategy()
),
# Config 2: Blog/article pages with content filtering
CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*", "*python.org*"],
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
)
),
# Config 3: Dynamic pages requiring JavaScript
CrawlerRunConfig(
url_matcher=lambda url: 'github.com' in url,
js_code="window.scrollTo(0, 500);" # Scroll to load content
),
# Config 4: Mixed matcher - API endpoints (string OR function)
CrawlerRunConfig(
url_matcher=[
"*.json", # String pattern for JSON files
lambda url: 'api' in url or 'httpbin.org' in url # Function for API endpoints
],
match_mode=MatchMode.OR,
),
# Config 5: Complex matcher - Secure documentation sites
CrawlerRunConfig(
url_matcher=[
lambda url: url.startswith('https://'), # Must be HTTPS
"*.org/*", # String: .org domain
lambda url: any(doc in url for doc in ['docs', 'documentation', 'reference']), # Has docs
lambda url: not url.endswith(('.pdf', '.json')) # Not PDF or JSON
],
match_mode=MatchMode.AND,
# wait_for="css:.content, css:article" # Wait for content to load
),
# Default config for everything else
# CrawlerRunConfig() # No url_matcher means it matches everything (use it as fallback)
]
# URLs to crawl - each will use a different config
urls = [
"https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", # → PDF config
"https://blog.python.org/", # → Blog config with content filter
"https://github.com/microsoft/playwright", # → JS config
"https://httpbin.org/json", # → Mixed matcher config (API)
"https://docs.python.org/3/reference/", # → Complex matcher config
"https://www.w3schools.com/", # → Default config, if you uncomment the default config line above, if not you will see `Error: No matching configuration`
]
print("URLs to crawl:")
for i, url in enumerate(urls, 1):
print(f"{i}. {url}")
print("\nCrawling with appropriate config for each URL...\n")
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls=urls,
config=configs
)
# Display results
print("Results:")
print("-" * 60)
for result in results:
if result.success:
# Determine which config was used
config_type = "Default"
if result.url.endswith('.pdf'):
config_type = "PDF Strategy"
elif any(pattern in result.url for pattern in ['blog', 'python.org']) and 'docs' not in result.url:
config_type = "Blog + Content Filter"
elif 'github.com' in result.url:
config_type = "JavaScript Enabled"
elif 'httpbin.org' in result.url or result.url.endswith('.json'):
config_type = "Mixed Matcher (API)"
elif 'docs.python.org' in result.url:
config_type = "Complex Matcher (Secure Docs)"
print(f"\n{result.url}")
print(f" Config used: {config_type}")
print(f" Content size: {len(result.markdown)} chars")
# Show if we have fit_markdown (from content filter)
if hasattr(result.markdown, 'fit_markdown') and result.markdown.fit_markdown:
print(f" Fit markdown size: {len(result.markdown.fit_markdown)} chars")
reduction = (1 - len(result.markdown.fit_markdown) / len(result.markdown)) * 100
print(f" Content reduced by: {reduction:.1f}%")
# Show extracted data if using extraction strategy
if hasattr(result, 'extracted_content') and result.extracted_content:
print(f" Extracted data: {str(result.extracted_content)[:100]}...")
else:
print(f"\n{result.url}")
print(f" Error: {result.error_message}")
print("\n" + "=" * 60)
print("✅ Multi-config crawling complete!")
print("\nBenefits demonstrated:")
print("- PDFs handled with specialized scraper")
print("- Blog content filtered for relevance")
print("- JavaScript executed only where needed")
print("- Mixed matchers (string + function) for flexible matching")
print("- Complex matchers for precise URL targeting")
print("- Each URL got optimal configuration automatically!")
async def main():
"""Run both parts of the demo"""
print("""
🎯 Multi-Config URL Matching Demo
=================================
Learn how Crawl4AI can use different configurations
for different URLs in a single batch.
""")
# Part 1: Pattern matching
demo_part1_pattern_matching()
print("\nPress Enter to continue to Part 2...")
try:
input()
except EOFError:
# Running in non-interactive mode, skip input
pass
# Part 2: Practical crawling
await demo_part2_practical_crawling()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,57 @@
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
DefaultMarkdownGenerator,
PruningContentFilter,
CrawlResult,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create browser config
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create the crawler strategy with the undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Configure the crawl
crawler_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter()
),
capture_console_messages=True, # Enable console capture to test adapter
)
# Test on a site that typically detects bots
print("Testing undetected adapter...")
result: CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config
)
print(f"Status: {result.status_code}")
print(f"Success: {result.success}")
print(f"Console messages captured: {len(result.console_messages or [])}")
print(f"Markdown content (first 500 chars):\n{result.markdown.raw_markdown[:500]}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,5 +1,6 @@
import time, re
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
# WebScrapingStrategy is now an alias for LXMLWebScrapingStrategy
import time
import functools
from collections import defaultdict
@@ -57,7 +58,7 @@ methods_to_profile = [
# Apply decorators to both strategies
for strategy, name in [(WebScrapingStrategy, "Original"), (LXMLWebScrapingStrategy, "LXML")]:
for strategy, name in [(LXMLWebScrapingStrategy, "LXML")]:
for method in methods_to_profile:
apply_decorators(strategy, method, name)
@@ -85,7 +86,7 @@ def generate_large_html(n_elements=1000):
def test_scraping():
# Initialize both scrapers
original_scraper = WebScrapingStrategy()
original_scraper = LXMLWebScrapingStrategy()
selected_scraper = LXMLWebScrapingStrategy()
# Generate test HTML

View File

@@ -0,0 +1,59 @@
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Example 1: Stealth Mode
async def stealth_mode_example():
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Example 2: Undetected Browser
async def undetected_browser_example():
browser_config = BrowserConfig(
headless=False
)
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Example 3: Both Combined
async def combined_example():
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://example.com")
return result.html[:500]
# Run examples
if __name__ == "__main__":
asyncio.run(stealth_mode_example())
asyncio.run(undetected_browser_example())
asyncio.run(combined_example())

View File

@@ -0,0 +1,522 @@
"""
Stealth Mode Example with Crawl4AI
This example demonstrates how to use the stealth mode feature to bypass basic bot detection.
The stealth mode uses playwright-stealth to modify browser fingerprints and behaviors
that are commonly used to detect automated browsers.
Key features demonstrated:
1. Comparing crawling with and without stealth mode
2. Testing against bot detection sites
3. Accessing sites that block automated browsers
4. Best practices for stealth crawling
"""
import asyncio
import json
from typing import Dict, Any
from colorama import Fore, Style, init
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
# Initialize colorama for colored output
init()
# Create a logger for better output
logger = AsyncLogger(verbose=True)
async def test_bot_detection(use_stealth: bool = False) -> Dict[str, Any]:
"""Test against a bot detection service"""
logger.info(
f"Testing bot detection with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
# Configure browser with or without stealth
browser_config = BrowserConfig(
headless=False, # Use False to see the browser in action
enable_stealth=use_stealth,
viewport_width=1280,
viewport_height=800
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# JavaScript to extract bot detection results
detection_script = """
// Comprehensive bot detection checks
(() => {
const detectionResults = {
// Basic WebDriver detection
webdriver: navigator.webdriver,
// Chrome specific
chrome: !!window.chrome,
chromeRuntime: !!window.chrome?.runtime,
// Automation indicators
automationControlled: navigator.webdriver,
// Permissions API
permissionsPresent: !!navigator.permissions?.query,
// Plugins
pluginsLength: navigator.plugins.length,
pluginsArray: Array.from(navigator.plugins).map(p => p.name),
// Languages
languages: navigator.languages,
language: navigator.language,
// User agent
userAgent: navigator.userAgent,
// Screen and window properties
screen: {
width: screen.width,
height: screen.height,
availWidth: screen.availWidth,
availHeight: screen.availHeight,
colorDepth: screen.colorDepth,
pixelDepth: screen.pixelDepth
},
// WebGL vendor
webglVendor: (() => {
try {
const canvas = document.createElement('canvas');
const gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl');
const ext = gl.getExtension('WEBGL_debug_renderer_info');
return gl.getParameter(ext.UNMASKED_VENDOR_WEBGL);
} catch (e) {
return 'Error';
}
})(),
// Platform
platform: navigator.platform,
// Hardware concurrency
hardwareConcurrency: navigator.hardwareConcurrency,
// Device memory
deviceMemory: navigator.deviceMemory,
// Connection
connection: navigator.connection?.effectiveType
};
// Log results for console capture
console.log('DETECTION_RESULTS:', JSON.stringify(detectionResults, null, 2));
// Return results
return detectionResults;
})();
"""
# Crawl bot detection test page
config = CrawlerRunConfig(
js_code=detection_script,
capture_console_messages=True,
wait_until="networkidle",
delay_before_return_html=2.0 # Give time for all checks to complete
)
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=config
)
if result.success:
# Extract detection results from console
detection_data = None
for msg in result.console_messages or []:
if "DETECTION_RESULTS:" in msg.get("text", ""):
try:
json_str = msg["text"].replace("DETECTION_RESULTS:", "").strip()
detection_data = json.loads(json_str)
except:
pass
# Also try to get from JavaScript execution result
if not detection_data and result.js_execution_result:
detection_data = result.js_execution_result
return {
"success": True,
"url": result.url,
"detection_data": detection_data,
"page_title": result.metadata.get("title", ""),
"stealth_enabled": use_stealth
}
else:
return {
"success": False,
"error": result.error_message,
"stealth_enabled": use_stealth
}
async def test_cloudflare_site(use_stealth: bool = False) -> Dict[str, Any]:
"""Test accessing a Cloudflare-protected site"""
logger.info(
f"Testing Cloudflare site with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
browser_config = BrowserConfig(
headless=True, # Cloudflare detection works better in headless mode with stealth
enable_stealth=use_stealth,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=30000, # 30 seconds
delay_before_return_html=3.0
)
# Test on a site that often shows Cloudflare challenges
result = await crawler.arun(
url="https://nowsecure.nl",
config=config
)
# Check if we hit Cloudflare challenge
cloudflare_detected = False
if result.html:
cloudflare_indicators = [
"Checking your browser",
"Just a moment",
"cf-browser-verification",
"cf-challenge",
"ray ID"
]
cloudflare_detected = any(indicator in result.html for indicator in cloudflare_indicators)
return {
"success": result.success,
"url": result.url,
"cloudflare_challenge": cloudflare_detected,
"status_code": result.status_code,
"page_title": result.metadata.get("title", "") if result.metadata else "",
"stealth_enabled": use_stealth,
"html_snippet": result.html[:500] if result.html else ""
}
async def test_anti_bot_site(use_stealth: bool = False) -> Dict[str, Any]:
"""Test against sites with anti-bot measures"""
logger.info(
f"Testing anti-bot site with stealth={'ON' if use_stealth else 'OFF'}",
tag="STEALTH"
)
browser_config = BrowserConfig(
headless=False,
enable_stealth=use_stealth,
# Additional browser arguments that help with stealth
extra_args=[
"--disable-blink-features=AutomationControlled",
"--disable-features=site-per-process"
] if not use_stealth else [] # These are automatically applied with stealth
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Some sites check for specific behaviors
behavior_script = """
(async () => {
// Simulate human-like behavior
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
// Random mouse movement
const moveX = Math.random() * 100;
const moveY = Math.random() * 100;
// Simulate reading time
await sleep(1000 + Math.random() * 2000);
// Scroll slightly
window.scrollBy(0, 100 + Math.random() * 200);
console.log('Human behavior simulation complete');
return true;
})()
"""
config = CrawlerRunConfig(
js_code=behavior_script,
wait_until="networkidle",
delay_before_return_html=5.0, # Longer delay to appear more human
capture_console_messages=True
)
# Test on a site that implements anti-bot measures
result = await crawler.arun(
url="https://www.g2.com/",
config=config
)
# Check for common anti-bot blocks
blocked_indicators = [
"Access Denied",
"403 Forbidden",
"Security Check",
"Verify you are human",
"captcha",
"challenge"
]
blocked = False
if result.html:
blocked = any(indicator.lower() in result.html.lower() for indicator in blocked_indicators)
return {
"success": result.success and not blocked,
"url": result.url,
"blocked": blocked,
"status_code": result.status_code,
"page_title": result.metadata.get("title", "") if result.metadata else "",
"stealth_enabled": use_stealth
}
async def compare_results():
"""Run all tests with and without stealth mode and compare results"""
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Crawl4AI Stealth Mode Comparison{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
# Test 1: Bot Detection
print(f"{Fore.YELLOW}1. Bot Detection Test (bot.sannysoft.com){Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_detection = await test_bot_detection(use_stealth=False)
if regular_detection["success"] and regular_detection["detection_data"]:
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
data = regular_detection["detection_data"]
print(f" • WebDriver detected: {data.get('webdriver', 'Unknown')}")
print(f" • Chrome: {data.get('chrome', 'Unknown')}")
print(f" • Languages: {data.get('languages', 'Unknown')}")
print(f" • Plugins: {data.get('pluginsLength', 'Unknown')}")
print(f" • User Agent: {data.get('userAgent', 'Unknown')[:60]}...")
# With stealth
stealth_detection = await test_bot_detection(use_stealth=True)
if stealth_detection["success"] and stealth_detection["detection_data"]:
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
data = stealth_detection["detection_data"]
print(f" • WebDriver detected: {data.get('webdriver', 'Unknown')}")
print(f" • Chrome: {data.get('chrome', 'Unknown')}")
print(f" • Languages: {data.get('languages', 'Unknown')}")
print(f" • Plugins: {data.get('pluginsLength', 'Unknown')}")
print(f" • User Agent: {data.get('userAgent', 'Unknown')[:60]}...")
# Test 2: Cloudflare Site
print(f"\n\n{Fore.YELLOW}2. Cloudflare Protected Site Test{Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_cf = await test_cloudflare_site(use_stealth=False)
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
print(f" • Success: {regular_cf['success']}")
print(f" • Cloudflare Challenge: {regular_cf['cloudflare_challenge']}")
print(f" • Status Code: {regular_cf['status_code']}")
print(f" • Page Title: {regular_cf['page_title']}")
# With stealth
stealth_cf = await test_cloudflare_site(use_stealth=True)
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
print(f" • Success: {stealth_cf['success']}")
print(f" • Cloudflare Challenge: {stealth_cf['cloudflare_challenge']}")
print(f" • Status Code: {stealth_cf['status_code']}")
print(f" • Page Title: {stealth_cf['page_title']}")
# Test 3: Anti-bot Site
print(f"\n\n{Fore.YELLOW}3. Anti-Bot Site Test{Style.RESET_ALL}")
print("-" * 40)
# Without stealth
regular_antibot = await test_anti_bot_site(use_stealth=False)
print(f"{Fore.RED}Without Stealth:{Style.RESET_ALL}")
print(f" • Success: {regular_antibot['success']}")
print(f" • Blocked: {regular_antibot['blocked']}")
print(f" • Status Code: {regular_antibot['status_code']}")
print(f" • Page Title: {regular_antibot['page_title']}")
# With stealth
stealth_antibot = await test_anti_bot_site(use_stealth=True)
print(f"\n{Fore.GREEN}With Stealth:{Style.RESET_ALL}")
print(f" • Success: {stealth_antibot['success']}")
print(f" • Blocked: {stealth_antibot['blocked']}")
print(f" • Status Code: {stealth_antibot['status_code']}")
print(f" • Page Title: {stealth_antibot['page_title']}")
# Summary
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Summary:{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"\nStealth mode helps bypass basic bot detection by:")
print(f" • Hiding webdriver property")
print(f" • Modifying browser fingerprints")
print(f" • Adjusting navigator properties")
print(f" • Emulating real browser plugin behavior")
print(f"\n{Fore.YELLOW}Note:{Style.RESET_ALL} Stealth mode is not a silver bullet.")
print(f"Advanced anti-bot systems may still detect automation.")
print(f"Always respect robots.txt and website terms of service.")
async def stealth_best_practices():
"""Demonstrate best practices for using stealth mode"""
print(f"\n\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Stealth Mode Best Practices{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
# Best Practice 1: Combine with realistic behavior
print(f"{Fore.YELLOW}1. Combine with Realistic Behavior:{Style.RESET_ALL}")
browser_config = BrowserConfig(
headless=False,
enable_stealth=True,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Simulate human-like behavior
human_behavior_script = """
(async () => {
// Wait random time between actions
const randomWait = () => Math.random() * 2000 + 1000;
// Simulate reading
await new Promise(resolve => setTimeout(resolve, randomWait()));
// Smooth scroll
const smoothScroll = async () => {
const totalHeight = document.body.scrollHeight;
const viewHeight = window.innerHeight;
let currentPosition = 0;
while (currentPosition < totalHeight - viewHeight) {
const scrollAmount = Math.random() * 300 + 100;
window.scrollBy({
top: scrollAmount,
behavior: 'smooth'
});
currentPosition += scrollAmount;
await new Promise(resolve => setTimeout(resolve, randomWait()));
}
};
await smoothScroll();
console.log('Human-like behavior simulation completed');
return true;
})()
"""
config = CrawlerRunConfig(
js_code=human_behavior_script,
wait_until="networkidle",
delay_before_return_html=3.0,
capture_console_messages=True
)
result = await crawler.arun(
url="https://example.com",
config=config
)
print(f" ✓ Simulated human-like scrolling and reading patterns")
print(f" ✓ Added random delays between actions")
print(f" ✓ Result: {result.success}")
# Best Practice 2: Use appropriate viewport and user agent
print(f"\n{Fore.YELLOW}2. Use Realistic Viewport and User Agent:{Style.RESET_ALL}")
# Get a realistic user agent
from crawl4ai.user_agent_generator import UserAgentGenerator
ua_generator = UserAgentGenerator()
browser_config = BrowserConfig(
headless=True,
enable_stealth=True,
viewport_width=1920,
viewport_height=1080,
user_agent=ua_generator.generate(device_type="desktop", browser_type="chrome")
)
print(f" ✓ Using realistic viewport: 1920x1080")
print(f" ✓ Using current Chrome user agent")
print(f" ✓ Stealth mode will ensure consistency")
# Best Practice 3: Manage request rate
print(f"\n{Fore.YELLOW}3. Manage Request Rate:{Style.RESET_ALL}")
print(f" ✓ Add delays between requests")
print(f" ✓ Randomize timing patterns")
print(f" ✓ Respect robots.txt")
# Best Practice 4: Session management
print(f"\n{Fore.YELLOW}4. Use Session Management:{Style.RESET_ALL}")
browser_config = BrowserConfig(
headless=False,
enable_stealth=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Create a session for multiple requests
session_id = "stealth_session_1"
config = CrawlerRunConfig(
session_id=session_id,
wait_until="domcontentloaded"
)
# First request
result1 = await crawler.arun(
url="https://example.com",
config=config
)
# Subsequent request reuses the same browser context
result2 = await crawler.arun(
url="https://example.com/about",
config=config
)
print(f" ✓ Reused browser session for multiple requests")
print(f" ✓ Maintains cookies and state between requests")
print(f" ✓ More efficient and realistic browsing pattern")
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
async def main():
"""Run all examples"""
# Run comparison tests
await compare_results()
# Show best practices
await stealth_best_practices()
print(f"\n{Fore.GREEN}Examples completed!{Style.RESET_ALL}")
print(f"\n{Fore.YELLOW}Remember:{Style.RESET_ALL}")
print(f"• Stealth mode helps with basic bot detection")
print(f"• Always respect website terms of service")
print(f"• Consider rate limiting and ethical scraping practices")
print(f"• For advanced protection, consider additional measures")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,215 @@
"""
Quick Start: Using Stealth Mode in Crawl4AI
This example shows practical use cases for the stealth mode feature.
Stealth mode helps bypass basic bot detection mechanisms.
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def example_1_basic_stealth():
"""Example 1: Basic stealth mode usage"""
print("\n=== Example 1: Basic Stealth Mode ===")
# Enable stealth mode in browser config
browser_config = BrowserConfig(
enable_stealth=True, # This is the key parameter
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
print(f"✓ Crawled {result.url} successfully")
print(f"✓ Title: {result.metadata.get('title', 'N/A')}")
async def example_2_stealth_with_screenshot():
"""Example 2: Stealth mode with screenshot to show detection results"""
print("\n=== Example 2: Stealth Mode Visual Verification ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=False # Set to False to see the browser
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=config
)
if result.success:
print(f"✓ Successfully crawled bot detection site")
print(f"✓ With stealth enabled, many detection tests should show as passed")
if result.screenshot:
# Save screenshot for verification
import base64
with open("stealth_detection_results.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f"✓ Screenshot saved as 'stealth_detection_results.png'")
print(f" Check the screenshot to see detection results!")
async def example_3_stealth_for_protected_sites():
"""Example 3: Using stealth for sites with bot protection"""
print("\n=== Example 3: Stealth for Protected Sites ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=True,
viewport_width=1920,
viewport_height=1080
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Add human-like behavior
config = CrawlerRunConfig(
wait_until="networkidle",
delay_before_return_html=2.0, # Wait 2 seconds
js_code="""
// Simulate human-like scrolling
window.scrollTo({
top: document.body.scrollHeight / 2,
behavior: 'smooth'
});
"""
)
# Try accessing a site that might have bot protection
result = await crawler.arun(
url="https://www.g2.com/products/slack/reviews",
config=config
)
if result.success:
print(f"✓ Successfully accessed protected site")
print(f"✓ Retrieved {len(result.html)} characters of HTML")
else:
print(f"✗ Failed to access site: {result.error_message}")
async def example_4_stealth_with_sessions():
"""Example 4: Stealth mode with session management"""
print("\n=== Example 4: Stealth + Session Management ===")
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
session_id = "my_stealth_session"
# First request - establish session
config = CrawlerRunConfig(
session_id=session_id,
wait_until="domcontentloaded"
)
result1 = await crawler.arun(
url="https://news.ycombinator.com",
config=config
)
print(f"✓ First request completed: {result1.url}")
# Second request - reuse session
await asyncio.sleep(2) # Brief delay between requests
result2 = await crawler.arun(
url="https://news.ycombinator.com/best",
config=config
)
print(f"✓ Second request completed: {result2.url}")
print(f"✓ Session reused, maintaining cookies and state")
async def example_5_stealth_comparison():
"""Example 5: Compare results with and without stealth using screenshots"""
print("\n=== Example 5: Stealth Mode Comparison ===")
test_url = "https://bot.sannysoft.com"
# First test WITHOUT stealth
print("\nWithout stealth:")
regular_config = BrowserConfig(
enable_stealth=False,
headless=True
)
async with AsyncWebCrawler(config=regular_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(url=test_url, config=config)
if result.success and result.screenshot:
import base64
with open("comparison_without_stealth.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f" ✓ Screenshot saved: comparison_without_stealth.png")
print(f" Many tests will show as FAILED (red)")
# Then test WITH stealth
print("\nWith stealth:")
stealth_config = BrowserConfig(
enable_stealth=True,
headless=True
)
async with AsyncWebCrawler(config=stealth_config) as crawler:
config = CrawlerRunConfig(
screenshot=True,
wait_until="networkidle"
)
result = await crawler.arun(url=test_url, config=config)
if result.success and result.screenshot:
import base64
with open("comparison_with_stealth.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print(f" ✓ Screenshot saved: comparison_with_stealth.png")
print(f" More tests should show as PASSED (green)")
print("\nCompare the two screenshots to see the difference!")
async def main():
"""Run all examples"""
print("Crawl4AI Stealth Mode Examples")
print("==============================")
# Run basic example
await example_1_basic_stealth()
# Run screenshot verification example
await example_2_stealth_with_screenshot()
# Run protected site example
await example_3_stealth_for_protected_sites()
# Run session example
await example_4_stealth_with_sessions()
# Run comparison example
await example_5_stealth_comparison()
print("\n" + "="*50)
print("Tips for using stealth mode effectively:")
print("- Use realistic viewport sizes (1920x1080, 1366x768)")
print("- Add delays between requests to appear more human")
print("- Combine with session management for better results")
print("- Remember: stealth mode is for legitimate scraping only")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,62 @@
"""
Simple test to verify stealth mode is working
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def test_stealth():
"""Test stealth mode effectiveness"""
# Test WITHOUT stealth
print("=== WITHOUT Stealth ===")
config1 = BrowserConfig(
headless=False,
enable_stealth=False
)
async with AsyncWebCrawler(config=config1) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(
wait_until="networkidle",
screenshot=True
)
)
print(f"Success: {result.success}")
# Take screenshot
if result.screenshot:
with open("without_stealth.png", "wb") as f:
import base64
f.write(base64.b64decode(result.screenshot))
print("Screenshot saved: without_stealth.png")
# Test WITH stealth
print("\n=== WITH Stealth ===")
config2 = BrowserConfig(
headless=False,
enable_stealth=True
)
async with AsyncWebCrawler(config=config2) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(
wait_until="networkidle",
screenshot=True
)
)
print(f"Success: {result.success}")
# Take screenshot
if result.screenshot:
with open("with_stealth.png", "wb") as f:
import base64
f.write(base64.b64decode(result.screenshot))
print("Screenshot saved: with_stealth.png")
print("\nCheck the screenshots to see the difference in bot detection results!")
if __name__ == "__main__":
asyncio.run(test_stealth())

View File

@@ -0,0 +1,74 @@
"""
Basic Undetected Browser Test
Simple example to test if undetected mode works
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig
async def test_regular_mode():
"""Test with regular browser"""
print("Testing Regular Browser Mode...")
browser_config = BrowserConfig(
headless=False,
verbose=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://www.example.com")
print(f"Regular Mode - Success: {result.success}")
print(f"Regular Mode - Status: {result.status_code}")
print(f"Regular Mode - Content length: {len(result.markdown.raw_markdown)}")
print(f"Regular Mode - First 100 chars: {result.markdown.raw_markdown[:100]}...")
return result.success
async def test_undetected_mode():
"""Test with undetected browser"""
print("\nTesting Undetected Browser Mode...")
from crawl4ai import UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
browser_config = BrowserConfig(
headless=False,
verbose=True
)
# Create undetected adapter
undetected_adapter = UndetectedAdapter()
# Create strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
result = await crawler.arun(url="https://www.example.com")
print(f"Undetected Mode - Success: {result.success}")
print(f"Undetected Mode - Status: {result.status_code}")
print(f"Undetected Mode - Content length: {len(result.markdown.raw_markdown)}")
print(f"Undetected Mode - First 100 chars: {result.markdown.raw_markdown[:100]}...")
return result.success
async def main():
"""Run both tests"""
print("🤖 Crawl4AI Basic Adapter Test\n")
# Test regular mode
regular_success = await test_regular_mode()
# Test undetected mode
undetected_success = await test_undetected_mode()
# Summary
print("\n" + "="*50)
print("Summary:")
print(f"Regular Mode: {'✅ Success' if regular_success else '❌ Failed'}")
print(f"Undetected Mode: {'✅ Success' if undetected_success else '❌ Failed'}")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,155 @@
"""
Bot Detection Test - Compare Regular vs Undetected
Tests browser fingerprinting differences at bot.sannysoft.com
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter,
CrawlResult
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Bot detection test site
TEST_URL = "https://bot.sannysoft.com"
def analyze_bot_detection(result: CrawlResult) -> dict:
"""Analyze bot detection results from the page"""
detections = {
"webdriver": False,
"headless": False,
"automation": False,
"user_agent": False,
"total_tests": 0,
"failed_tests": 0
}
if not result.success or not result.html:
return detections
# Look for specific test results in the HTML
html_lower = result.html.lower()
# Check for common bot indicators
if "webdriver" in html_lower and ("fail" in html_lower or "true" in html_lower):
detections["webdriver"] = True
detections["failed_tests"] += 1
if "headless" in html_lower and ("fail" in html_lower or "true" in html_lower):
detections["headless"] = True
detections["failed_tests"] += 1
if "automation" in html_lower and "detected" in html_lower:
detections["automation"] = True
detections["failed_tests"] += 1
# Count total tests (approximate)
detections["total_tests"] = html_lower.count("test") + html_lower.count("check")
return detections
async def test_browser_mode(adapter_name: str, adapter=None):
"""Test a browser mode and return results"""
print(f"\n{'='*60}")
print(f"Testing: {adapter_name}")
print(f"{'='*60}")
browser_config = BrowserConfig(
headless=False, # Run in headed mode for better results
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
if adapter:
# Use undetected mode
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
crawler = AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
)
else:
# Use regular mode
crawler = AsyncWebCrawler(config=browser_config)
async with crawler:
config = CrawlerRunConfig(
delay_before_return_html=3.0, # Let detection scripts run
wait_for_images=True,
screenshot=True,
simulate_user=False, # Don't simulate for accurate detection
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
if result.success:
# Analyze detection results
detections = analyze_bot_detection(result)
print(f"\n🔍 Bot Detection Analysis:")
print(f" - WebDriver Detected: {'❌ Yes' if detections['webdriver'] else '✅ No'}")
print(f" - Headless Detected: {'❌ Yes' if detections['headless'] else '✅ No'}")
print(f" - Automation Detected: {'❌ Yes' if detections['automation'] else '✅ No'}")
print(f" - Failed Tests: {detections['failed_tests']}")
# Show some content
if result.markdown.raw_markdown:
print(f"\nContent preview:")
lines = result.markdown.raw_markdown.split('\n')
for line in lines[:20]: # Show first 20 lines
if any(keyword in line.lower() for keyword in ['test', 'pass', 'fail', 'yes', 'no']):
print(f" {line.strip()}")
return result, detections if result.success else {}
async def main():
"""Run the comparison"""
print("🤖 Crawl4AI - Bot Detection Test")
print(f"Testing at: {TEST_URL}")
print("This site runs various browser fingerprinting tests\n")
# Test regular browser
regular_result, regular_detections = await test_browser_mode("Regular Browser")
# Small delay
await asyncio.sleep(2)
# Test undetected browser
undetected_adapter = UndetectedAdapter()
undetected_result, undetected_detections = await test_browser_mode(
"Undetected Browser",
undetected_adapter
)
# Summary comparison
print(f"\n{'='*60}")
print("COMPARISON SUMMARY")
print(f"{'='*60}")
print(f"\n{'Test':<25} {'Regular':<15} {'Undetected':<15}")
print(f"{'-'*55}")
if regular_detections and undetected_detections:
print(f"{'WebDriver Detection':<25} {'❌ Detected' if regular_detections['webdriver'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['webdriver'] else '✅ Passed':<15}")
print(f"{'Headless Detection':<25} {'❌ Detected' if regular_detections['headless'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['headless'] else '✅ Passed':<15}")
print(f"{'Automation Detection':<25} {'❌ Detected' if regular_detections['automation'] else '✅ Passed':<15} {'❌ Detected' if undetected_detections['automation'] else '✅ Passed':<15}")
print(f"{'Failed Tests':<25} {regular_detections['failed_tests']:<15} {undetected_detections['failed_tests']:<15}")
print(f"\n{'='*60}")
if undetected_detections.get('failed_tests', 0) < regular_detections.get('failed_tests', 1):
print("✅ Undetected browser performed better at evading detection!")
else:
print(" Both browsers had similar detection results")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,164 @@
"""
Undetected Browser Test - Cloudflare Protected Site
Tests the difference between regular and undetected modes on a Cloudflare-protected site
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Test URL with Cloudflare protection
TEST_URL = "https://nowsecure.nl"
async def test_regular_browser():
"""Test with regular browser - likely to be blocked"""
print("=" * 60)
print("Testing with Regular Browser")
print("=" * 60)
browser_config = BrowserConfig(
headless=False,
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
delay_before_return_html=2.0,
simulate_user=True,
magic=True, # Try with magic mode too
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
print(f"✓ HTML Length: {len(result.html)}")
# Check for Cloudflare challenge
if result.html:
cf_indicators = [
"Checking your browser",
"Please stand by",
"cloudflare",
"cf-browser-verification",
"Access denied",
"Ray ID"
]
detected = False
for indicator in cf_indicators:
if indicator.lower() in result.html.lower():
print(f"⚠️ Cloudflare Challenge Detected: '{indicator}' found")
detected = True
break
if not detected and len(result.markdown.raw_markdown) > 100:
print("✅ Successfully bypassed Cloudflare!")
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
elif not detected:
print("⚠️ Page loaded but content seems minimal")
return result
async def test_undetected_browser():
"""Test with undetected browser - should bypass Cloudflare"""
print("\n" + "=" * 60)
print("Testing with Undetected Browser")
print("=" * 60)
browser_config = BrowserConfig(
headless=False, # Headless is easier to detect
verbose=True,
viewport_width=1920,
viewport_height=1080,
)
# Create undetected adapter
undetected_adapter = UndetectedAdapter()
# Create strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
config = CrawlerRunConfig(
delay_before_return_html=2.0,
simulate_user=True,
)
result = await crawler.arun(url=TEST_URL, config=config)
print(f"\n✓ Success: {result.success}")
print(f"✓ Status Code: {result.status_code}")
print(f"✓ HTML Length: {len(result.html)}")
# Check for Cloudflare challenge
if result.html:
cf_indicators = [
"Checking your browser",
"Please stand by",
"cloudflare",
"cf-browser-verification",
"Access denied",
"Ray ID"
]
detected = False
for indicator in cf_indicators:
if indicator.lower() in result.html.lower():
print(f"⚠️ Cloudflare Challenge Detected: '{indicator}' found")
detected = True
break
if not detected and len(result.markdown.raw_markdown) > 100:
print("✅ Successfully bypassed Cloudflare!")
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
elif not detected:
print("⚠️ Page loaded but content seems minimal")
return result
async def main():
"""Compare regular vs undetected browser"""
print("🤖 Crawl4AI - Cloudflare Bypass Test")
print(f"Testing URL: {TEST_URL}\n")
# Test regular browser
regular_result = await test_regular_browser()
# Small delay
await asyncio.sleep(2)
# Test undetected browser
undetected_result = await test_undetected_browser()
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Regular Browser:")
print(f" - Success: {regular_result.success}")
print(f" - Content Length: {len(regular_result.markdown.raw_markdown) if regular_result.markdown else 0}")
print(f"\nUndetected Browser:")
print(f" - Success: {undetected_result.success}")
print(f" - Content Length: {len(undetected_result.markdown.raw_markdown) if undetected_result.markdown else 0}")
if undetected_result.success and len(undetected_result.markdown.raw_markdown) > len(regular_result.markdown.raw_markdown):
print("\n✅ Undetected browser successfully bypassed protection!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,184 @@
"""
Undetected vs Regular Browser Comparison
This example demonstrates the difference between regular and undetected browser modes
when accessing sites with bot detection services.
Based on tested anti-bot services:
- Cloudflare
- Kasada
- Akamai
- DataDome
- Bet365
- And others
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
PlaywrightAdapter,
UndetectedAdapter,
CrawlResult
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Test URLs for various bot detection services
TEST_SITES = {
"Cloudflare Protected": "https://nowsecure.nl",
# "Bot Detection Test": "https://bot.sannysoft.com",
# "Fingerprint Test": "https://fingerprint.com/products/bot-detection",
# "Browser Scan": "https://browserscan.net",
# "CreepJS": "https://abrahamjuliot.github.io/creepjs",
}
async def test_with_adapter(url: str, adapter_name: str, adapter):
"""Test a URL with a specific adapter"""
browser_config = BrowserConfig(
headless=False, # Better for avoiding detection
viewport_width=1920,
viewport_height=1080,
verbose=True,
)
# Create the crawler strategy with the adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
print(f"\n{'='*60}")
print(f"Testing with {adapter_name} adapter")
print(f"URL: {url}")
print(f"{'='*60}")
try:
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
crawler_config = CrawlerRunConfig(
delay_before_return_html=3.0, # Give page time to load
wait_for_images=True,
screenshot=True,
simulate_user=True, # Add user simulation
)
result: CrawlResult = await crawler.arun(
url=url,
config=crawler_config
)
# Check results
print(f"✓ Status Code: {result.status_code}")
print(f"✓ Success: {result.success}")
print(f"✓ HTML Length: {len(result.html)}")
print(f"✓ Markdown Length: {len(result.markdown.raw_markdown)}")
# Check for common bot detection indicators
detection_indicators = [
"Access denied",
"Please verify you are human",
"Checking your browser",
"Enable JavaScript",
"captcha",
"403 Forbidden",
"Bot detection",
"Security check"
]
content_lower = result.markdown.raw_markdown.lower()
detected = False
for indicator in detection_indicators:
if indicator.lower() in content_lower:
print(f"⚠️ Possible detection: Found '{indicator}'")
detected = True
break
if not detected:
print("✅ No obvious bot detection triggered!")
# Show first 200 chars of content
print(f"Content preview: {result.markdown.raw_markdown[:200]}...")
return result.success and not detected
except Exception as e:
print(f"❌ Error: {str(e)}")
return False
async def compare_adapters(url: str, site_name: str):
"""Compare regular and undetected adapters on the same URL"""
print(f"\n{'#'*60}")
print(f"# Testing: {site_name}")
print(f"{'#'*60}")
# Test with regular adapter
regular_adapter = PlaywrightAdapter()
regular_success = await test_with_adapter(url, "Regular", regular_adapter)
# Small delay between tests
await asyncio.sleep(2)
# Test with undetected adapter
undetected_adapter = UndetectedAdapter()
undetected_success = await test_with_adapter(url, "Undetected", undetected_adapter)
# Summary
print(f"\n{'='*60}")
print(f"Summary for {site_name}:")
print(f"Regular Adapter: {'✅ Passed' if regular_success else '❌ Blocked/Detected'}")
print(f"Undetected Adapter: {'✅ Passed' if undetected_success else '❌ Blocked/Detected'}")
print(f"{'='*60}")
return regular_success, undetected_success
async def main():
"""Run comparison tests on multiple sites"""
print("🤖 Crawl4AI Browser Adapter Comparison")
print("Testing regular vs undetected browser modes\n")
results = {}
# Test each site
for site_name, url in TEST_SITES.items():
regular, undetected = await compare_adapters(url, site_name)
results[site_name] = {
"regular": regular,
"undetected": undetected
}
# Delay between different sites
await asyncio.sleep(3)
# Final summary
print(f"\n{'#'*60}")
print("# FINAL RESULTS")
print(f"{'#'*60}")
print(f"{'Site':<30} {'Regular':<15} {'Undetected':<15}")
print(f"{'-'*60}")
for site, result in results.items():
regular_status = "✅ Passed" if result["regular"] else "❌ Blocked"
undetected_status = "✅ Passed" if result["undetected"] else "❌ Blocked"
print(f"{site:<30} {regular_status:<15} {undetected_status:<15}")
# Calculate success rates
regular_success = sum(1 for r in results.values() if r["regular"])
undetected_success = sum(1 for r in results.values() if r["undetected"])
total = len(results)
print(f"\n{'='*60}")
print(f"Success Rates:")
print(f"Regular Adapter: {regular_success}/{total} ({regular_success/total*100:.1f}%)")
print(f"Undetected Adapter: {undetected_success}/{total} ({undetected_success/total*100:.1f}%)")
print(f"{'='*60}")
if __name__ == "__main__":
# Note: This example may take a while to run as it tests multiple sites
# You can comment out sites in TEST_SITES to run faster tests
asyncio.run(main())

View File

@@ -0,0 +1,118 @@
"""
Simple Undetected Browser Demo
Demonstrates the basic usage of undetected browser mode
"""
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def crawl_with_regular_browser(url: str):
"""Crawl with regular browser"""
print("\n[Regular Browser Mode]")
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
delay_before_return_html=2.0
)
)
print(f"Success: {result.success}")
print(f"Status: {result.status_code}")
print(f"Content length: {len(result.markdown.raw_markdown)}")
# Check for bot detection keywords
content = result.markdown.raw_markdown.lower()
if any(word in content for word in ["cloudflare", "checking your browser", "please wait"]):
print("⚠️ Bot detection triggered!")
else:
print("✅ Page loaded successfully")
return result
async def crawl_with_undetected_browser(url: str):
"""Crawl with undetected browser"""
print("\n[Undetected Browser Mode]")
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create undetected adapter and strategy
undetected_adapter = UndetectedAdapter()
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
delay_before_return_html=2.0
)
)
print(f"Success: {result.success}")
print(f"Status: {result.status_code}")
print(f"Content length: {len(result.markdown.raw_markdown)}")
# Check for bot detection keywords
content = result.markdown.raw_markdown.lower()
if any(word in content for word in ["cloudflare", "checking your browser", "please wait"]):
print("⚠️ Bot detection triggered!")
else:
print("✅ Page loaded successfully")
return result
async def main():
"""Demo comparing regular vs undetected modes"""
print("🤖 Crawl4AI Undetected Browser Demo")
print("="*50)
# Test URLs - you can change these
test_urls = [
"https://www.example.com", # Simple site
"https://httpbin.org/headers", # Shows request headers
]
for url in test_urls:
print(f"\n📍 Testing URL: {url}")
# Test with regular browser
regular_result = await crawl_with_regular_browser(url)
# Small delay
await asyncio.sleep(2)
# Test with undetected browser
undetected_result = await crawl_with_undetected_browser(url)
# Compare results
print(f"\n📊 Comparison for {url}:")
print(f"Regular browser content: {len(regular_result.markdown.raw_markdown)} chars")
print(f"Undetected browser content: {len(undetected_result.markdown.raw_markdown)} chars")
if url == "https://httpbin.org/headers":
# Show headers for comparison
print("\nHeaders seen by server:")
print("Regular:", regular_result.markdown.raw_markdown[:500])
print("\nUndetected:", undetected_result.markdown.raw_markdown[:500])
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -358,9 +358,77 @@ if __name__ == "__main__":
---
---
## 7. Anti-Bot Features (Stealth Mode & Undetected Browser)
Crawl4AI provides two powerful features to bypass bot detection:
### 7.1 Stealth Mode
Stealth mode uses playwright-stealth to modify browser fingerprints and behaviors. Enable it with a simple flag:
```python
browser_config = BrowserConfig(
enable_stealth=True, # Activates stealth mode
headless=False
)
```
**When to use**: Sites with basic bot detection (checking navigator.webdriver, plugins, etc.)
### 7.2 Undetected Browser
For advanced bot detection, use the undetected browser adapter:
```python
from crawl4ai import UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create undetected adapter
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(crawler_strategy=strategy, config=browser_config) as crawler:
# Your crawling code
```
**When to use**: Sites with sophisticated bot detection (Cloudflare, DataDome, etc.)
### 7.3 Combining Both
For maximum evasion, combine stealth mode with undetected browser:
```python
browser_config = BrowserConfig(
enable_stealth=True, # Enable stealth
headless=False
)
adapter = UndetectedAdapter() # Use undetected browser
```
### Choosing the Right Approach
| Detection Level | Recommended Approach |
|----------------|---------------------|
| No protection | Regular browser |
| Basic checks | Regular + Stealth mode |
| Advanced protection | Undetected browser |
| Maximum evasion | Undetected + Stealth mode |
**Best Practice**: Start with regular browser + stealth mode. Only use undetected browser if needed, as it may be slightly slower.
See [Undetected Browser Mode](undetected-browser.md) for detailed examples.
---
## Conclusion & Next Steps
Youve now explored several **advanced** features:
You've now explored several **advanced** features:
- **Proxy Usage**
- **PDF & Screenshot** capturing for large or critical pages
@@ -368,7 +436,10 @@ Youve now explored several **advanced** features:
- **Custom Headers** for language or specialized requests
- **Session Persistence** via storage state
- **Robots.txt Compliance**
- **Anti-Bot Features** (Stealth Mode & Undetected Browser)
With these power tools, you can build robust scraping workflows that mimic real user behavior, handle secure sites, capture detailed snapshots, and manage sessions across multiple runs—streamlining your entire data collection pipeline.
With these power tools, you can build robust scraping workflows that mimic real user behavior, handle secure sites, capture detailed snapshots, manage sessions across multiple runs, and bypass bot detection—streamlining your entire data collection pipeline.
**Last Updated**: 2025-01-01
**Note**: In future versions, we may enable stealth mode and undetected browser by default. For now, users should explicitly enable these features when needed.
**Last Updated**: 2025-01-17

View File

@@ -404,7 +404,182 @@ for result in results:
print(f"Duration: {dr.end_time - dr.start_time}")
```
## 6. Summary
## 6. URL-Specific Configurations
When crawling diverse content types, you often need different configurations for different URLs. For example:
- PDFs need specialized extraction
- Blog pages benefit from content filtering
- Dynamic sites need JavaScript execution
- API endpoints need JSON parsing
### 6.1 Basic URL Pattern Matching
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, MatchMode
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def crawl_mixed_content():
# Configure different strategies for different content
configs = [
# PDF files - specialized extraction
CrawlerRunConfig(
url_matcher="*.pdf",
scraping_strategy=PDFContentScrapingStrategy()
),
# Blog/article pages - content filtering
CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*"],
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
)
),
# Dynamic pages - JavaScript execution
CrawlerRunConfig(
url_matcher=lambda url: 'github.com' in url,
js_code="window.scrollTo(0, 500);"
),
# API endpoints - JSON extraction
CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or url.endswith('.json'),
# Custome settings for JSON extraction
),
# Default config for everything else
CrawlerRunConfig() # No url_matcher means it matches ALL URLs (fallback)
]
# Mixed URLs
urls = [
"https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf",
"https://blog.python.org/",
"https://github.com/microsoft/playwright",
"https://httpbin.org/json",
"https://example.com/"
]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls=urls,
config=configs # Pass list of configs
)
for result in results:
print(f"{result.url}: {len(result.markdown)} chars")
```
### 6.2 Advanced Pattern Matching
**Important**: A `CrawlerRunConfig` without `url_matcher` (or with `url_matcher=None`) matches ALL URLs. This makes it perfect as a default/fallback configuration.
The `url_matcher` parameter supports three types of patterns:
#### Glob Patterns (Strings)
```python
# Simple patterns
"*.pdf" # Any PDF file
"*/api/*" # Any URL with /api/ in path
"https://*.example.com/*" # Subdomain matching
"*://example.com/blog/*" # Any protocol
```
#### Custom Functions
```python
# Complex logic with lambdas
lambda url: url.startswith('https://') and 'secure' in url
lambda url: len(url) > 50 and url.count('/') > 5
lambda url: any(domain in url for domain in ['api.', 'data.', 'feed.'])
```
#### Mixed Lists with AND/OR Logic
```python
# Combine multiple conditions
CrawlerRunConfig(
url_matcher=[
"https://*", # Must be HTTPS
lambda url: 'internal' in url, # Must contain 'internal'
lambda url: not url.endswith('.pdf') # Must not be PDF
],
match_mode=MatchMode.AND # ALL conditions must match
)
```
### 6.3 Practical Example: News Site Crawler
```python
async def crawl_news_site():
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
rate_limiter=RateLimiter(base_delay=(1.0, 2.0))
)
configs = [
# Homepage - light extraction
CrawlerRunConfig(
url_matcher=lambda url: url.rstrip('/') == 'https://news.ycombinator.com',
css_selector="nav, .headline",
extraction_strategy=None
),
# Article pages - full extraction
CrawlerRunConfig(
url_matcher="*/article/*",
extraction_strategy=CosineStrategy(
semantic_filter="article content",
word_count_threshold=100
),
screenshot=True,
excluded_tags=["nav", "aside", "footer"]
),
# Author pages - metadata focus
CrawlerRunConfig(
url_matcher="*/author/*",
extraction_strategy=JsonCssExtractionStrategy({
"name": "h1.author-name",
"bio": ".author-bio",
"articles": "article.post-card h2"
})
),
# Everything else
CrawlerRunConfig()
]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls=news_urls,
config=configs,
dispatcher=dispatcher
)
```
### 6.4 Best Practices
1. **Order Matters**: Configs are evaluated in order - put specific patterns before general ones
2. **Default Config Behavior**:
- A config without `url_matcher` matches ALL URLs
- Always include a default config as the last item if you want to handle all URLs
- Without a default config, unmatched URLs will fail with "No matching configuration found"
3. **Test Your Patterns**: Use the config's `is_match()` method to test patterns:
```python
config = CrawlerRunConfig(url_matcher="*.pdf")
print(config.is_match("https://example.com/doc.pdf")) # True
default_config = CrawlerRunConfig() # No url_matcher
print(default_config.is_match("https://any-url.com")) # True - matches everything!
```
4. **Optimize for Performance**:
- Disable JS for static content
- Skip screenshots for data APIs
- Use appropriate extraction strategies
## 7. Summary
1.**Two Dispatcher Types**:

View File

@@ -0,0 +1,394 @@
# Undetected Browser Mode
## Overview
Crawl4AI offers two powerful anti-bot features to help you access websites with bot detection:
1. **Stealth Mode** - Uses playwright-stealth to modify browser fingerprints and behaviors
2. **Undetected Browser Mode** - Advanced browser adapter with deep-level patches for sophisticated bot detection
This guide covers both features and helps you choose the right approach for your needs.
## Anti-Bot Features Comparison
| Feature | Regular Browser | Stealth Mode | Undetected Browser |
|---------|----------------|--------------|-------------------|
| WebDriver Detection | ❌ | ✅ | ✅ |
| Navigator Properties | ❌ | ✅ | ✅ |
| Plugin Emulation | ❌ | ✅ | ✅ |
| CDP Detection | ❌ | Partial | ✅ |
| Deep Browser Patches | ❌ | ❌ | ✅ |
| Performance Impact | None | Minimal | Moderate |
| Setup Complexity | None | None | Minimal |
## When to Use Each Approach
### Use Regular Browser + Stealth Mode When:
- Sites have basic bot detection (checking navigator.webdriver, plugins, etc.)
- You need good performance with basic protection
- Sites check for common automation indicators
### Use Undetected Browser When:
- Sites employ sophisticated bot detection services (Cloudflare, DataDome, etc.)
- Stealth mode alone isn't sufficient
- You're willing to trade some performance for better evasion
### Best Practice: Progressive Enhancement
1. **Start with**: Regular browser + Stealth mode
2. **If blocked**: Switch to Undetected browser
3. **If still blocked**: Combine Undetected browser + Stealth mode
## Stealth Mode
Stealth mode is the simpler anti-bot solution that works with both regular and undetected browsers:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
# Enable stealth mode with regular browser
browser_config = BrowserConfig(
enable_stealth=True, # Simple flag to enable
headless=False # Better for avoiding detection
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://example.com")
```
### What Stealth Mode Does:
- Removes `navigator.webdriver` flag
- Modifies browser fingerprints
- Emulates realistic plugin behavior
- Adjusts navigator properties
- Fixes common automation leaks
## Undetected Browser Mode
For sites with sophisticated bot detection that stealth mode can't bypass, use the undetected browser adapter:
### Key Features
- **Drop-in Replacement**: Uses the same API as regular browser mode
- **Enhanced Stealth**: Built-in patches to evade common detection methods
- **Browser Adapter Pattern**: Seamlessly switch between regular and undetected modes
- **Automatic Installation**: `crawl4ai-setup` installs all necessary browser dependencies
### Quick Start
```python
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create browser config
browser_config = BrowserConfig(
headless=False, # Headless mode can be detected easier
verbose=True,
)
# Create the crawler strategy with undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Your crawling code here
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig()
)
print(result.markdown[:500])
asyncio.run(main())
```
## Combining Both Features
For maximum evasion, combine stealth mode with undetected browser:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig, UndetectedAdapter
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create browser config with stealth enabled
browser_config = BrowserConfig(
enable_stealth=True, # Enable stealth mode
headless=False
)
# Create undetected adapter
adapter = UndetectedAdapter()
# Create strategy with both features
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun("https://protected-site.com")
```
## Examples
### Example 1: Basic Stealth Mode
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
async def test_stealth_mode():
# Simple stealth mode configuration
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://bot.sannysoft.com",
config=CrawlerRunConfig(screenshot=True)
)
if result.success:
print("✓ Successfully accessed bot detection test site")
# Save screenshot to verify detection results
if result.screenshot:
import base64
with open("stealth_test.png", "wb") as f:
f.write(base64.b64decode(result.screenshot))
print("✓ Screenshot saved - check for green (passed) tests")
asyncio.run(test_stealth_mode())
```
### Example 2: Undetected Browser Mode
```python
import asyncio
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
UndetectedAdapter
)
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
async def main():
# Create browser config
browser_config = BrowserConfig(
headless=False,
verbose=True,
)
# Create the undetected adapter
undetected_adapter = UndetectedAdapter()
# Create the crawler strategy with the undetected adapter
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=undetected_adapter
)
# Create the crawler with our custom strategy
async with AsyncWebCrawler(
crawler_strategy=crawler_strategy,
config=browser_config
) as crawler:
# Configure the crawl
crawler_config = CrawlerRunConfig(
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter()
),
capture_console_messages=True, # Test adapter console capture
)
# Test on a site that typically detects bots
print("Testing undetected adapter...")
result: CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config
)
print(f"Status: {result.status_code}")
print(f"Success: {result.success}")
print(f"Console messages captured: {len(result.console_messages or [])}")
print(f"Markdown content (first 500 chars):\n{result.markdown.raw_markdown[:500]}")
if __name__ == "__main__":
asyncio.run(main())
```
## Browser Adapter Pattern
The undetected browser support is implemented using an adapter pattern, allowing seamless switching between different browser implementations:
```python
# Regular browser adapter (default)
from crawl4ai import PlaywrightAdapter
regular_adapter = PlaywrightAdapter()
# Undetected browser adapter
from crawl4ai import UndetectedAdapter
undetected_adapter = UndetectedAdapter()
```
The adapter handles:
- JavaScript execution
- Console message capture
- Error handling
- Browser-specific optimizations
## Best Practices
1. **Avoid Headless Mode**: Detection is easier in headless mode
```python
browser_config = BrowserConfig(headless=False)
```
2. **Use Reasonable Delays**: Don't rush through pages
```python
crawler_config = CrawlerRunConfig(
wait_time=3.0, # Wait 3 seconds after page load
delay_before_return_html=2.0 # Additional delay
)
```
3. **Rotate User Agents**: You can customize user agents
```python
browser_config = BrowserConfig(
headers={"User-Agent": "your-user-agent"}
)
```
4. **Handle Failures Gracefully**: Some sites may still detect and block
```python
if not result.success:
print(f"Crawl failed: {result.error_message}")
```
## Advanced Usage Tips
### Progressive Detection Handling
```python
async def crawl_with_progressive_evasion(url):
# Step 1: Try regular browser with stealth
browser_config = BrowserConfig(
enable_stealth=True,
headless=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url)
if result.success and "Access Denied" not in result.html:
return result
# Step 2: If blocked, try undetected browser
print("Regular + stealth blocked, trying undetected browser...")
adapter = UndetectedAdapter()
strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
browser_adapter=adapter
)
async with AsyncWebCrawler(
crawler_strategy=strategy,
config=browser_config
) as crawler:
result = await crawler.arun(url)
return result
```
## Installation
The undetected browser dependencies are automatically installed when you run:
```bash
crawl4ai-setup
```
This command installs all necessary browser dependencies for both regular and undetected modes.
## Limitations
- **Performance**: Slightly slower than regular mode due to additional patches
- **Headless Detection**: Some sites can still detect headless mode
- **Resource Usage**: May use more resources than regular mode
- **Not 100% Guaranteed**: Advanced anti-bot services are constantly evolving
## Troubleshooting
### Browser Not Found
Run the setup command:
```bash
crawl4ai-setup
```
### Detection Still Occurring
Try combining with other features:
```python
crawler_config = CrawlerRunConfig(
simulate_user=True, # Add user simulation
magic=True, # Enable magic mode
wait_time=5.0, # Longer waits
)
```
### Performance Issues
If experiencing slow performance:
```python
# Use selective undetected mode only for protected sites
if is_protected_site(url):
adapter = UndetectedAdapter()
else:
adapter = PlaywrightAdapter() # Default adapter
```
## Future Plans
**Note**: In future versions of Crawl4AI, we may enable stealth mode and undetected browser by default to provide better out-of-the-box success rates. For now, users should explicitly enable these features when needed.
## Conclusion
Crawl4AI provides flexible anti-bot solutions:
1. **Start Simple**: Use regular browser + stealth mode for most sites
2. **Escalate if Needed**: Switch to undetected browser for sophisticated protection
3. **Combine for Maximum Effect**: Use both features together when facing the toughest challenges
Remember:
- Always respect robots.txt and website terms of service
- Use appropriate delays to avoid overwhelming servers
- Consider the performance trade-offs of each approach
- Test progressively to find the minimum necessary evasion level
## See Also
- [Advanced Features](advanced-features.md) - Overview of all advanced features
- [Proxy & Security](proxy-security.md) - Using proxies with anti-bot features
- [Session Management](session-management.md) - Maintaining sessions across requests
- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies

View File

@@ -7,7 +7,7 @@
```python
async def arun_many(
urls: Union[List[str], List[Any]],
config: Optional[CrawlerRunConfig] = None,
config: Optional[Union[CrawlerRunConfig, List[CrawlerRunConfig]]] = None,
dispatcher: Optional[BaseDispatcher] = None,
...
) -> Union[List[CrawlResult], AsyncGenerator[CrawlResult, None]]:
@@ -15,7 +15,9 @@ async def arun_many(
Crawl multiple URLs concurrently or in batches.
:param urls: A list of URLs (or tasks) to crawl.
:param config: (Optional) A default `CrawlerRunConfig` applying to each crawl.
:param config: (Optional) Either:
- A single `CrawlerRunConfig` applying to all URLs
- A list of `CrawlerRunConfig` objects with url_matcher patterns
:param dispatcher: (Optional) A concurrency controller (e.g. MemoryAdaptiveDispatcher).
...
:return: Either a list of `CrawlResult` objects, or an async generator if streaming is enabled.
@@ -95,10 +97,70 @@ results = await crawler.arun_many(
)
```
### URL-Specific Configurations
Instead of using one config for all URLs, provide a list of configs with `url_matcher` patterns:
```python
from crawl4ai import CrawlerRunConfig, MatchMode
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# PDF files - specialized extraction
pdf_config = CrawlerRunConfig(
url_matcher="*.pdf",
scraping_strategy=PDFContentScrapingStrategy()
)
# Blog/article pages - content filtering
blog_config = CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*", "*python.org*"],
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
)
)
# Dynamic pages - JavaScript execution
github_config = CrawlerRunConfig(
url_matcher=lambda url: 'github.com' in url,
js_code="window.scrollTo(0, 500);"
)
# API endpoints - JSON extraction
api_config = CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or url.endswith('.json'),
# Custome settings for JSON extraction
)
# Default fallback config
default_config = CrawlerRunConfig() # No url_matcher means it never matches except as fallback
# Pass the list of configs - first match wins!
results = await crawler.arun_many(
urls=[
"https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", # → pdf_config
"https://blog.python.org/", # → blog_config
"https://github.com/microsoft/playwright", # → github_config
"https://httpbin.org/json", # → api_config
"https://example.com/" # → default_config
],
config=[pdf_config, blog_config, github_config, api_config, default_config]
)
```
**URL Matching Features**:
- **String patterns**: `"*.pdf"`, `"*/blog/*"`, `"*python.org*"`
- **Function matchers**: `lambda url: 'api' in url`
- **Mixed patterns**: Combine strings and functions with `MatchMode.OR` or `MatchMode.AND`
- **First match wins**: Configs are evaluated in order
**Key Points**:
- Each URL is processed by the same or separate sessions, depending on the dispatchers strategy.
- `dispatch_result` in each `CrawlResult` (if using concurrency) can hold memory and timing info. 
- If you need to handle authentication or session IDs, pass them in each individual task or within your run config.
- **Important**: Always include a default config (without `url_matcher`) as the last item if you want to handle all URLs. Otherwise, unmatched URLs will fail.
### Return Value

View File

@@ -208,6 +208,71 @@ config = CrawlerRunConfig(
See [Virtual Scroll documentation](../../advanced/virtual-scroll.md) for detailed examples.
---
### I) **URL Matching Configuration**
| **Parameter** | **Type / Default** | **What It Does** |
|------------------------|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------|
| **`url_matcher`** | `UrlMatcher` (None) | Pattern(s) to match URLs against. Can be: string (glob), function, or list of mixed types. **None means match ALL URLs** |
| **`match_mode`** | `MatchMode` (MatchMode.OR) | How to combine multiple matchers in a list: `MatchMode.OR` (any match) or `MatchMode.AND` (all must match) |
The `url_matcher` parameter enables URL-specific configurations when used with `arun_many()`:
```python
from crawl4ai import CrawlerRunConfig, MatchMode
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
# Simple string pattern (glob-style)
pdf_config = CrawlerRunConfig(
url_matcher="*.pdf",
scraping_strategy=PDFContentScrapingStrategy()
)
# Multiple patterns with OR logic (default)
blog_config = CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*", "*/news/*"],
match_mode=MatchMode.OR # Any pattern matches
)
# Function matcher
api_config = CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or url.endswith('.json'),
# Other settings like extraction_strategy
)
# Mixed: String + Function with AND logic
complex_config = CrawlerRunConfig(
url_matcher=[
lambda url: url.startswith('https://'), # Must be HTTPS
"*.org/*", # Must be .org domain
lambda url: 'docs' in url # Must contain 'docs'
],
match_mode=MatchMode.AND # ALL conditions must match
)
# Combined patterns and functions with AND logic
secure_docs = CrawlerRunConfig(
url_matcher=["https://*", lambda url: '.doc' in url],
match_mode=MatchMode.AND # Must be HTTPS AND contain .doc
)
# Default config - matches ALL URLs
default_config = CrawlerRunConfig() # No url_matcher = matches everything
```
**UrlMatcher Types:**
- **None (default)**: When `url_matcher` is None or not set, the config matches ALL URLs
- **String patterns**: Glob-style patterns like `"*.pdf"`, `"*/api/*"`, `"https://*.example.com/*"`
- **Functions**: `lambda url: bool` - Custom logic for complex matching
- **Lists**: Mix strings and functions, combined with `MatchMode.OR` or `MatchMode.AND`
**Important Behavior:**
- When passing a list of configs to `arun_many()`, URLs are matched against each config's `url_matcher` in order. First match wins!
- If no config matches a URL and there's no default config (one without `url_matcher`), the URL will fail with "No matching configuration found"
- Always include a default config as the last item if you want to handle all URLs
---## 2.2 Helper Methods
Both `BrowserConfig` and `CrawlerRunConfig` provide a `clone()` method to create modified copies:

View File

@@ -0,0 +1,43 @@
# 🛠️ Crawl4AI v0.7.1: Minor Cleanup Update
*July 17, 2025 • 2 min read*
---
A small maintenance release that removes unused code and improves documentation.
## 🎯 What's Changed
- **Removed unused StealthConfig** from `crawl4ai/browser_manager.py`
- **Updated documentation** with better examples and parameter explanations
- **Fixed virtual scroll configuration** examples in docs
## 🧹 Code Cleanup
Removed unused `StealthConfig` import and configuration that wasn't being used anywhere in the codebase. The project uses its own custom stealth implementation through JavaScript injection instead.
```python
# Removed unused code:
from playwright_stealth import StealthConfig
stealth_config = StealthConfig(...) # This was never used
```
## 📖 Documentation Updates
- Fixed adaptive crawling parameter examples
- Updated session management documentation
- Corrected virtual scroll configuration examples
## 🚀 Installation
```bash
pip install crawl4ai==0.7.1
```
No breaking changes - upgrade directly from v0.7.0.
---
Questions? Issues?
- GitHub: [github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
- Discord: [discord.gg/crawl4ai](https://discord.gg/jP8KfhDhyN)

View File

@@ -0,0 +1,98 @@
# 🚀 Crawl4AI v0.7.2: CI/CD & Dependency Optimization Update
*July 25, 2025 • 3 min read*
---
This release introduces automated CI/CD pipelines for seamless releases and optimizes dependencies for a lighter, more efficient package.
## 🎯 What's New
### 🔄 Automated Release Pipeline
- **GitHub Actions CI/CD**: Automated PyPI and Docker Hub releases on tag push
- **Multi-platform Docker images**: Support for both AMD64 and ARM64 architectures
- **Version consistency checks**: Ensures tag, package, and Docker versions align
- **Automated release notes**: GitHub releases created automatically
### 📦 Dependency Optimization
- **Moved sentence-transformers to optional dependencies**: Significantly reduces default installation size
- **Lighter Docker images**: Optimized Dockerfile for faster builds and smaller images
- **Better dependency management**: Core vs. optional dependencies clearly separated
## 🏗️ CI/CD Pipeline
The new automated release process ensures consistent, reliable releases:
```yaml
# Trigger releases with a simple tag
git tag v0.7.2
git push origin v0.7.2
# Automatically:
# ✅ Validates version consistency
# ✅ Builds and publishes to PyPI
# ✅ Builds multi-platform Docker images
# ✅ Pushes to Docker Hub with proper tags
# ✅ Creates GitHub release
```
## 💾 Lighter Installation
Default installation is now significantly smaller:
```bash
# Core installation (smaller, faster)
pip install crawl4ai==0.7.2
# With ML features (includes sentence-transformers)
pip install crawl4ai[transformer]==0.7.2
# Full installation
pip install crawl4ai[all]==0.7.2
```
## 🐳 Docker Improvements
Enhanced Docker support with multi-platform images:
```bash
# Pull the latest version
docker pull unclecode/crawl4ai:0.7.2
docker pull unclecode/crawl4ai:latest
# Available tags:
# - unclecode/crawl4ai:0.7.2 (specific version)
# - unclecode/crawl4ai:0.7 (minor version)
# - unclecode/crawl4ai:0 (major version)
# - unclecode/crawl4ai:latest
```
## 🔧 Technical Details
### Dependency Changes
- `sentence-transformers` moved from required to optional dependencies
- Reduces default installation by ~500MB
- No impact on functionality when transformer features aren't needed
### CI/CD Configuration
- GitHub Actions workflows for automated releases
- Version validation before publishing
- Parallel PyPI and Docker Hub deployments
- Automatic tagging strategy for Docker images
## 🚀 Installation
```bash
pip install crawl4ai==0.7.2
```
No breaking changes - direct upgrade from v0.7.0 or v0.7.1.
---
Questions? Issues?
- GitHub: [github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
- Discord: [discord.gg/crawl4ai](https://discord.gg/jP8KfhDhyN)
- Twitter: [@unclecode](https://x.com/unclecode)
*P.S. The new CI/CD pipeline will make future releases faster and more reliable. Thanks for your patience as we improve our release process!*

View File

@@ -29,6 +29,7 @@ class BrowserConfig:
text_mode=False,
light_mode=False,
extra_args=None,
enable_stealth=False,
# ... other advanced parameters omitted here
):
...
@@ -84,6 +85,11 @@ class BrowserConfig:
- Additional flags for the underlying browser.
- E.g. `["--disable-extensions"]`.
11. **`enable_stealth`**:
- If `True`, enables stealth mode using playwright-stealth.
- Modifies browser fingerprints to avoid basic bot detection.
- Default is `False`. Recommended for sites with bot protection.
### Helper Methods
Both configuration classes provide a `clone()` method to create modified copies:
@@ -209,7 +215,13 @@ class CrawlerRunConfig:
- The maximum number of concurrent crawl sessions.
- Helps prevent overwhelming the system.
14. **`display_mode`**:
14. **`url_matcher`** & **`match_mode`**:
- Enable URL-specific configurations when used with `arun_many()`.
- Set `url_matcher` to patterns (glob, function, or list) to match specific URLs.
- Use `match_mode` (OR/AND) to control how multiple patterns combine.
- See [URL-Specific Configurations](../api/arun_many.md#url-specific-configurations) for examples.
15. **`display_mode`**:
- The display mode for progress information (`DETAILED`, `BRIEF`, etc.).
- Affects how much information is printed during the crawl.

View File

@@ -350,15 +350,22 @@ if __name__ == "__main__":
## 6. Scraping Modes
Crawl4AI provides two different scraping strategies for HTML content processing: `WebScrapingStrategy` (BeautifulSoup-based, default) and `LXMLWebScrapingStrategy` (LXML-based). The LXML strategy offers significantly better performance, especially for large HTML documents.
Crawl4AI uses `LXMLWebScrapingStrategy` (LXML-based) as the default scraping strategy for HTML content processing. This strategy offers excellent performance, especially for large HTML documents.
**Note:** For backward compatibility, `WebScrapingStrategy` is still available as an alias for `LXMLWebScrapingStrategy`.
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, LXMLWebScrapingStrategy
async def main():
config = CrawlerRunConfig(
scraping_strategy=LXMLWebScrapingStrategy() # Faster alternative to default BeautifulSoup
# Default configuration already uses LXMLWebScrapingStrategy
config = CrawlerRunConfig()
# Or explicitly specify it if desired
config_explicit = CrawlerRunConfig(
scraping_strategy=LXMLWebScrapingStrategy()
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
@@ -417,21 +424,20 @@ class CustomScrapingStrategy(ContentScrapingStrategy):
### Performance Considerations
The LXML strategy can be up to 10-20x faster than BeautifulSoup strategy, particularly when processing large HTML documents. However, please note:
The LXML strategy provides excellent performance, particularly when processing large HTML documents, offering up to 10-20x faster processing compared to BeautifulSoup-based approaches.
1. LXML strategy is currently experimental
2. In some edge cases, the parsing results might differ slightly from BeautifulSoup
3. If you encounter any inconsistencies between LXML and BeautifulSoup results, please [raise an issue](https://github.com/codeium/crawl4ai/issues) with a reproducible example
Benefits of LXML strategy:
- Fast processing of large HTML documents (especially >100KB)
- Efficient memory usage
- Good handling of well-formed HTML
- Robust table detection and extraction
Choose LXML strategy when:
- Processing large HTML documents (recommended for >100KB)
- Performance is critical
- Working with well-formed HTML
### Backward Compatibility
Stick to BeautifulSoup strategy (default) when:
- Maximum compatibility is needed
- Working with malformed HTML
- Exact parsing behavior is critical
For users upgrading from earlier versions:
- `WebScrapingStrategy` is now an alias for `LXMLWebScrapingStrategy`
- Existing code using `WebScrapingStrategy` will continue to work without modification
- No changes are required to your existing code
---

View File

@@ -19,13 +19,15 @@ class MarkdownGenerationResult(BaseModel):
class CrawlResult(BaseModel):
url: str
html: str
fit_html: Optional[str] = None
success: bool
cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {}
links: Dict[str, List[Dict]] = {}
downloaded_files: Optional[List[str]] = None
js_execution_result: Optional[Dict[str, Any]] = None
screenshot: Optional[str] = None
pdf : Optional[bytes] = None
pdf: Optional[bytes] = None
mhtml: Optional[str] = None
markdown: Optional[Union[str, MarkdownGenerationResult]] = None
extracted_content: Optional[str] = None
@@ -35,6 +37,12 @@ class CrawlResult(BaseModel):
response_headers: Optional[dict] = None
status_code: Optional[int] = None
ssl_certificate: Optional[SSLCertificate] = None
dispatch_result: Optional[DispatchResult] = None
redirected_url: Optional[str] = None
network_requests: Optional[List[Dict[str, Any]]] = None
console_messages: Optional[List[Dict[str, Any]]] = None
tables: List[Dict] = Field(default_factory=list)
class Config:
arbitrary_types_allowed = True
```
@@ -45,11 +53,13 @@ class CrawlResult(BaseModel):
|-------------------------------------------|-----------------------------------------------------------------------------------------------------|
| **url (`str`)** | The final or actual URL crawled (in case of redirects). |
| **html (`str`)** | Original, unmodified page HTML. Good for debugging or custom processing. |
| **fit_html (`Optional[str]`)** | Preprocessed HTML optimized for extraction and content filtering. |
| **success (`bool`)** | `True` if the crawl completed without major errors, else `False`. |
| **cleaned_html (`Optional[str]`)** | Sanitized HTML with scripts/styles removed; can exclude tags if configured via `excluded_tags` etc. |
| **media (`Dict[str, List[Dict]]`)** | Extracted media info (images, audio, etc.), each with attributes like `src`, `alt`, `score`, etc. |
| **links (`Dict[str, List[Dict]]`)** | Extracted link data, split by `internal` and `external`. Each link usually has `href`, `text`, etc. |
| **downloaded_files (`Optional[List[str]]`)** | If `accept_downloads=True` in `BrowserConfig`, this lists the filepaths of saved downloads. |
| **js_execution_result (`Optional[Dict[str, Any]]`)** | Results from JavaScript execution during crawling. |
| **screenshot (`Optional[str]`)** | Screenshot of the page (base64-encoded) if `screenshot=True`. |
| **pdf (`Optional[bytes]`)** | PDF of the page if `pdf=True`. |
| **mhtml (`Optional[str]`)** | MHTML snapshot of the page if `capture_mhtml=True`. Contains the full page with all resources. |
@@ -61,6 +71,11 @@ class CrawlResult(BaseModel):
| **response_headers (`Optional[dict]`)** | HTTP response headers, if captured. |
| **status_code (`Optional[int]`)** | HTTP status code (e.g., 200 for OK). |
| **ssl_certificate (`Optional[SSLCertificate]`)** | SSL certificate info if `fetch_ssl_certificate=True`. |
| **dispatch_result (`Optional[DispatchResult]`)** | Additional concurrency and resource usage information when crawling URLs in parallel. |
| **redirected_url (`Optional[str]`)** | The URL after any redirects (different from `url` which is the final URL). |
| **network_requests (`Optional[List[Dict[str, Any]]]`)** | List of network requests, responses, and failures captured during the crawl if `capture_network_requests=True`. |
| **console_messages (`Optional[List[Dict[str, Any]]]`)** | List of browser console messages captured during the crawl if `capture_console_messages=True`. |
| **tables (`List[Dict]`)** | Table data extracted from HTML tables with structure `[{headers, rows, caption, summary}]`. |
---
@@ -172,7 +187,7 @@ Here:
---
## 5. More Fields: Links, Media, and More
## 5. More Fields: Links, Media, Tables and More
### 5.1 `links`
@@ -192,7 +207,77 @@ for img in images:
print("Image URL:", img["src"], "Alt:", img.get("alt"))
```
### 5.3 `screenshot`, `pdf`, and `mhtml`
### 5.3 `tables`
The `tables` field contains structured data extracted from HTML tables found on the crawled page. Tables are analyzed based on various criteria to determine if they are actual data tables (as opposed to layout tables), including:
- Presence of thead and tbody sections
- Use of th elements for headers
- Column consistency
- Text density
- And other factors
Tables that score above the threshold (default: 7) are extracted and stored in result.tables.
### Accessing Table data:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async def main():
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://www.w3schools.com/html/html_tables.asp",
config=CrawlerRunConfig(
table_score_threshold=7 # Minimum score for table detection
)
)
if result.success and result.tables:
print(f"Found {len(result.tables)} tables")
for i, table in enumerate(result.tables):
print(f"\nTable {i+1}:")
print(f"Caption: {table.get('caption', 'No caption')}")
print(f"Headers: {table['headers']}")
print(f"Rows: {len(table['rows'])}")
# Print first few rows as example
for j, row in enumerate(table['rows'][:3]):
print(f" Row {j+1}: {row}")
if __name__ == "__main__":
asyncio.run(main())
```
### Configuring Table Extraction:
You can adjust the sensitivity of the table detection algorithm with:
```python
config = CrawlerRunConfig(
table_score_threshold=5 # Lower value = more tables detected (default: 7)
)
```
Each extracted table contains:
- `headers`: Column header names
- `rows`: List of rows, each containing cell values
- `caption`: Table caption text (if available)
- `summary`: Table summary attribute (if specified)
### Table Extraction Tips
- Not all HTML tables are extracted - only those detected as "data tables" vs. layout tables.
- Tables with inconsistent cell counts, nested tables, or those used purely for layout may be skipped.
- If you're missing tables, try adjusting the `table_score_threshold` to a lower value (default is 7).
The table detection algorithm scores tables based on features like consistent columns, presence of headers, text density, and more. Tables scoring above the threshold are considered data tables worth extracting.
### 5.4 `screenshot`, `pdf`, and `mhtml`
If you set `screenshot=True`, `pdf=True`, or `capture_mhtml=True` in **`CrawlerRunConfig`**, then:
@@ -213,7 +298,7 @@ if result.mhtml:
The MHTML (MIME HTML) format is particularly useful as it captures the entire web page including all of its resources (CSS, images, scripts, etc.) in a single file, making it perfect for archiving or offline viewing.
### 5.4 `ssl_certificate`
### 5.5 `ssl_certificate`
If `fetch_ssl_certificate=True`, `result.ssl_certificate` holds details about the sites SSL cert, such as issuer, validity dates, etc.

View File

@@ -154,6 +154,30 @@ cp deploy/docker/.llm.env.example .llm.env
# Now edit .llm.env and add your API keys
```
**Flexible LLM Provider Configuration:**
The Docker setup now supports flexible LLM provider configuration through three methods:
1. **Environment Variable** (Highest Priority): Set `LLM_PROVIDER` to override the default
```bash
export LLM_PROVIDER="anthropic/claude-3-opus"
# Or in your .llm.env file:
# LLM_PROVIDER=anthropic/claude-3-opus
```
2. **API Request Parameter**: Specify provider per request
```json
{
"url": "https://example.com",
"f": "llm",
"provider": "groq/mixtral-8x7b"
}
```
3. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
The system automatically selects the appropriate API key based on the configured `api_key_env` in the config file.
#### 3. Build and Run with Compose
The `docker-compose.yml` file in the project root provides a simplified approach that automatically handles architecture detection using buildx.
@@ -668,7 +692,7 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini"
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored

View File

@@ -28,11 +28,8 @@ This page provides a comprehensive list of example scripts that demonstrate vari
| Example | Description | Link |
|---------|-------------|------|
| Deep Crawling | An extensive tutorial on deep crawling capabilities, demonstrating BFS and BestFirst strategies, stream vs. non-stream execution, filters, scorers, and advanced configurations. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/deepcrawl_example.py) |
<<<<<<< HEAD
| Virtual Scroll | Comprehensive examples for handling virtualized scrolling on sites like Twitter, Instagram. Demonstrates different scrolling scenarios with local test server. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/virtual_scroll_example.py) |
=======
| Adaptive Crawling | Demonstrates intelligent crawling that automatically determines when sufficient information has been gathered. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/adaptive_crawling/) |
>>>>>>> feature/progressive-crawling
| Dispatcher | Shows how to use the crawl dispatcher for advanced workload management. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/dispatcher_example.py) |
| Storage State | Tutorial on managing browser storage state for persistence. | [View Guide](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/storage_state_tutorial.md) |
| Network Console Capture | Demonstrates how to capture and analyze network requests and console logs. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/network_console_capture_example.py) |
@@ -57,6 +54,16 @@ This page provides a comprehensive list of example scripts that demonstrate vari
| Crypto Analysis | Demonstrates how to crawl and analyze cryptocurrency data. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/crypto_analysis_example.py) |
| SERP API | Demonstrates using Crawl4AI with search engine result pages. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/serp_api_project_11_feb.py) |
## Anti-Bot & Stealth Features
| Example | Description | Link |
|---------|-------------|------|
| Stealth Mode Quick Start | Five practical examples showing how to use stealth mode for bypassing basic bot detection. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/stealth_mode_quick_start.py) |
| Stealth Mode Comprehensive | Comprehensive demonstration of stealth mode features with bot detection testing and comparisons. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/stealth_mode_example.py) |
| Undetected Browser | Simple example showing how to use the undetected browser adapter. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/hello_world_undetected.py) |
| Undetected Browser Demo | Basic demo comparing regular and undetected browser modes. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/undetected_simple_demo.py) |
| Undetected Tests | Advanced tests comparing regular vs undetected browsers on various bot detection services. | [View Folder](https://github.com/unclecode/crawl4ai/tree/main/docs/examples/undetectability/) |
## Customization & Security
| Example | Description | Link |
@@ -117,4 +124,4 @@ Some examples may require:
## Contributing New Examples
If you've created an interesting example that demonstrates a unique use case or feature of Crawl4AI, we encourage you to contribute it to our examples collection. Please see our [contribution guidelines](https://github.com/unclecode/crawl4ai/blob/main/CONTRIBUTORS.md) for more information.
If you've created an interesting example that demonstrates a unique use case or feature of Crawl4AI, we encourage you to contribute it to our examples collection. Please see our [contribution guidelines](https://github.com/unclecode/crawl4ai/blob/main/CONTRIBUTORS.md) for more information.

View File

@@ -18,7 +18,7 @@ crawl4ai-setup
```
**What does it do?**
- Installs or updates required Playwright browsers (Chromium, Firefox, etc.)
- Installs or updates required browser dependencies for both regular and undetected modes
- Performs OS-level checks (e.g., missing libs on Linux)
- Confirms your environment is ready to crawl

View File

@@ -520,7 +520,8 @@ This approach is handy when you still want external links but need to block cert
### 4.1 Accessing `result.media`
By default, Crawl4AI collects images, audio, video URLs, and data tables it finds on the page. These are stored in `result.media`, a dictionary keyed by media type (e.g., `images`, `videos`, `audio`, `tables`).
By default, Crawl4AI collects images, audio and video URLs it finds on the page. These are stored in `result.media`, a dictionary keyed by media type (e.g., `images`, `videos`, `audio`).
**Note: Tables have been moved from `result.media["tables"]` to the new `result.tables` format for better organization and direct access.**
**Basic Example**:
@@ -534,14 +535,6 @@ if result.success:
print(f" Alt text: {img.get('alt', '')}")
print(f" Score: {img.get('score')}")
print(f" Description: {img.get('desc', '')}\n")
# Get tables
tables = result.media.get("tables", [])
print(f"Found {len(tables)} data tables in total.")
for i, table in enumerate(tables):
print(f"[Table {i}] Caption: {table.get('caption', 'No caption')}")
print(f" Columns: {len(table.get('headers', []))}")
print(f" Rows: {len(table.get('rows', []))}")
```
**Structure Example**:
@@ -568,19 +561,6 @@ result.media = {
"audio": [
# Similar structure but with audio-specific fields
],
"tables": [
{
"headers": ["Name", "Age", "Location"],
"rows": [
["John Doe", "34", "New York"],
["Jane Smith", "28", "San Francisco"],
["Alex Johnson", "42", "Chicago"]
],
"caption": "Employee Directory",
"summary": "Directory of company employees"
},
# More tables if present
]
}
```
@@ -608,53 +588,7 @@ crawler_cfg = CrawlerRunConfig(
This setting attempts to discard images from outside the primary domain, keeping only those from the site youre crawling.
### 3.3 Working with Tables
Crawl4AI can detect and extract structured data from HTML tables. Tables are analyzed based on various criteria to determine if they are actual data tables (as opposed to layout tables), including:
- Presence of thead and tbody sections
- Use of th elements for headers
- Column consistency
- Text density
- And other factors
Tables that score above the threshold (default: 7) are extracted and stored in `result.media.tables`.
**Accessing Table Data**:
```python
if result.success:
tables = result.media.get("tables", [])
print(f"Found {len(tables)} data tables on the page")
if tables:
# Access the first table
first_table = tables[0]
print(f"Table caption: {first_table.get('caption', 'No caption')}")
print(f"Headers: {first_table.get('headers', [])}")
# Print the first 3 rows
for i, row in enumerate(first_table.get('rows', [])[:3]):
print(f"Row {i+1}: {row}")
```
**Configuring Table Extraction**:
You can adjust the sensitivity of the table detection algorithm with:
```python
crawler_cfg = CrawlerRunConfig(
table_score_threshold=5 # Lower value = more tables detected (default: 7)
)
```
Each extracted table contains:
- `headers`: Column header names
- `rows`: List of rows, each containing cell values
- `caption`: Table caption text (if available)
- `summary`: Table summary attribute (if specified)
### 3.4 Additional Media Config
### 4.3 Additional Media Config
- **`screenshot`**: Set to `True` if you want a full-page screenshot stored as `base64` in `result.screenshot`.
- **`pdf`**: Set to `True` if you want a PDF version of the page in `result.pdf`.
@@ -695,7 +629,7 @@ The MHTML format is particularly useful because:
---
## 4. Putting It All Together: Link & Media Filtering
## 5. Putting It All Together: Link & Media Filtering
Heres a combined example demonstrating how to filter out external links, skip certain domains, and exclude external images:
@@ -743,7 +677,7 @@ if __name__ == "__main__":
---
## 5. Common Pitfalls & Tips
## 6. Common Pitfalls & Tips
1. **Conflicting Flags**:
- `exclude_external_links=True` but then also specifying `exclude_social_media_links=True` is typically fine, but understand that the first setting already discards *all* external links. The second becomes somewhat redundant.
@@ -762,10 +696,3 @@ if __name__ == "__main__":
---
**Thats it for Link & Media Analysis!** Youre now equipped to filter out unwanted sites and zero in on the images and videos that matter for your project.
### Table Extraction Tips
- Not all HTML tables are extracted - only those detected as "data tables" vs. layout tables.
- Tables with inconsistent cell counts, nested tables, or those used purely for layout may be skipped.
- If you're missing tables, try adjusting the `table_score_threshold` to a lower value (default is 7).
The table detection algorithm scores tables based on features like consistent columns, presence of headers, text density, and more. Tables scoring above the threshold are considered data tables worth extracting.

View File

@@ -0,0 +1,92 @@
# WebScrapingStrategy Migration Guide
## Overview
Crawl4AI has simplified its content scraping architecture. The BeautifulSoup-based `WebScrapingStrategy` has been deprecated in favor of the faster LXML-based implementation. However, **no action is required** - your existing code will continue to work.
## What Changed?
1. **`WebScrapingStrategy` is now an alias** for `LXMLWebScrapingStrategy`
2. **The BeautifulSoup implementation has been removed** (~1000 lines of redundant code)
3. **`LXMLWebScrapingStrategy` inherits directly** from `ContentScrapingStrategy`
4. **Performance remains optimal** with LXML as the sole implementation
## Backward Compatibility
**Your existing code continues to work without any changes:**
```python
# This still works perfectly
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, WebScrapingStrategy
config = CrawlerRunConfig(
scraping_strategy=WebScrapingStrategy() # Works as before
)
```
## Migration Options
You have three options:
### Option 1: Do Nothing (Recommended)
Your code will continue to work. `WebScrapingStrategy` is permanently aliased to `LXMLWebScrapingStrategy`.
### Option 2: Update Imports (Optional)
For clarity, you can update your imports:
```python
# Old (still works)
from crawl4ai import WebScrapingStrategy
strategy = WebScrapingStrategy()
# New (more explicit)
from crawl4ai import LXMLWebScrapingStrategy
strategy = LXMLWebScrapingStrategy()
```
### Option 3: Use Default Configuration
Since `LXMLWebScrapingStrategy` is the default, you can omit the strategy parameter:
```python
# Simplest approach - uses LXMLWebScrapingStrategy by default
config = CrawlerRunConfig()
```
## Type Hints
If you use type hints, both work:
```python
from crawl4ai import WebScrapingStrategy, LXMLWebScrapingStrategy
def process_with_strategy(strategy: WebScrapingStrategy) -> None:
# Works with both WebScrapingStrategy and LXMLWebScrapingStrategy
pass
# Both are valid
process_with_strategy(WebScrapingStrategy())
process_with_strategy(LXMLWebScrapingStrategy())
```
## Subclassing
If you've subclassed `WebScrapingStrategy`, it continues to work:
```python
class MyCustomStrategy(WebScrapingStrategy):
def __init__(self):
super().__init__()
# Your custom code
```
## Performance Benefits
By consolidating to LXML:
- **10-20x faster** HTML parsing for large documents
- **Lower memory usage**
- **Consistent behavior** across all use cases
- **Simplified maintenance** and bug fixes
## Summary
This change simplifies Crawl4AI's internals while maintaining 100% backward compatibility. Your existing code continues to work, and you get better performance automatically.

View File

@@ -45,6 +45,7 @@ nav:
- "Lazy Loading": "advanced/lazy-loading.md"
- "Hooks & Auth": "advanced/hooks-auth.md"
- "Proxy & Security": "advanced/proxy-security.md"
- "Undetected Browser": "advanced/undetected-browser.md"
- "Session Management": "advanced/session-management.md"
- "Multi-URL Crawling": "advanced/multi-url-crawling.md"
- "Crawl Dispatcher": "advanced/crawl-dispatcher.md"

View File

@@ -13,34 +13,34 @@ authors = [
{name = "Unclecode", email = "unclecode@kidocode.com"}
]
dependencies = [
"aiofiles>=24.1.0",
"aiohttp>=3.11.11",
"aiosqlite~=0.20",
"anyio>=4.0.0",
"lxml~=5.3",
"litellm>=1.53.1",
"numpy>=1.26.0,<3",
"pillow>=10.4",
"playwright>=1.49.0",
"patchright>=1.49.0",
"python-dotenv~=1.0",
"requests~=2.26",
"beautifulsoup4~=4.12",
"tf-playwright-stealth>=1.1.0",
"xxhash~=3.4",
"rank-bm25~=0.2",
"aiofiles>=24.1.0",
"snowballstemmer~=2.2",
"pydantic>=2.10",
"pyOpenSSL>=24.3.0",
"psutil>=6.1.1",
"PyYAML>=6.0",
"nltk>=3.9.1",
"playwright",
"rich>=13.9.4",
"cssselect>=1.2.0",
"httpx>=0.27.2",
"httpx[http2]>=0.27.2",
"fake-useragent>=2.0.3",
"click>=8.1.7",
"pyperclip>=1.8.2",
"chardet>=5.2.0",
"aiohttp>=3.11.11",
"brotli>=1.1.0",
"humanize>=4.10.0",
"lark>=1.2.2",

View File

@@ -1,26 +1,29 @@
# Note: These requirements are also specified in pyproject.toml
# This file is kept for development environment setup and compatibility
aiofiles>=24.1.0
aiohttp>=3.11.11
aiosqlite~=0.20
anyio>=4.0.0
lxml~=5.3
litellm>=1.53.1
numpy>=1.26.0,<3
pillow>=10.4
playwright>=1.49.0
patchright>=1.49.0
python-dotenv~=1.0
requests~=2.26
beautifulsoup4~=4.12
tf-playwright-stealth>=1.1.0
xxhash~=3.4
rank-bm25~=0.2
aiofiles>=24.1.0
colorama~=0.4
snowballstemmer~=2.2
pydantic>=2.10
pyOpenSSL>=24.3.0
psutil>=6.1.1
PyYAML>=6.0
nltk>=3.9.1
rich>=13.9.4
cssselect>=1.2.0
chardet>=5.2.0
brotli>=1.1.0
httpx[http2]>=0.27.2

View File

@@ -12,11 +12,8 @@ parent_dir = os.path.dirname(
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy as WebScrapingStrategyCurrent,
)
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
# This test compares the same strategy with itself now since WebScrapingStrategy is deprecated
@dataclass
@@ -32,8 +29,8 @@ class TestResult:
class StrategyTester:
def __init__(self):
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
self.new_scraper = LXMLWebScrapingStrategy()
self.current_scraper = LXMLWebScrapingStrategy() # Same strategy now
with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f:
self.WIKI_HTML = f.read()
self.results = {"new": [], "current": []}

344
tests/check_dependencies.py Executable file
View File

@@ -0,0 +1,344 @@
#!/usr/bin/env python3
"""
Dependency checker for Crawl4AI
Analyzes imports in the codebase and shows which files use them
"""
import ast
import os
import sys
from pathlib import Path
from typing import Set, Dict, List, Tuple
from collections import defaultdict
import re
import toml
# Standard library modules to ignore
STDLIB_MODULES = {
'abc', 'argparse', 'asyncio', 'base64', 'collections', 'concurrent', 'contextlib',
'copy', 'datetime', 'decimal', 'email', 'enum', 'functools', 'glob', 'hashlib',
'http', 'importlib', 'io', 'itertools', 'json', 'logging', 'math', 'mimetypes',
'multiprocessing', 'os', 'pathlib', 'pickle', 'platform', 'pprint', 'random',
're', 'shutil', 'signal', 'socket', 'sqlite3', 'string', 'subprocess', 'sys',
'tempfile', 'threading', 'time', 'traceback', 'typing', 'unittest', 'urllib',
'uuid', 'warnings', 'weakref', 'xml', 'zipfile', 'dataclasses', 'secrets',
'statistics', 'textwrap', 'queue', 'csv', 'gzip', 'tarfile', 'configparser',
'inspect', 'operator', 'struct', 'binascii', 'codecs', 'locale', 'gc',
'atexit', 'builtins', 'html', 'errno', 'fcntl', 'pwd', 'grp', 'resource',
'termios', 'tty', 'pty', 'select', 'selectors', 'ssl', 'zlib', 'bz2',
'lzma', 'types', 'copy', 'pydoc', 'profile', 'cProfile', 'timeit',
'trace', 'doctest', 'pdb', 'contextvars', 'dataclasses', 'graphlib',
'zoneinfo', 'tomllib', 'cgi', 'wsgiref', 'fileinput', 'linecache',
'tokenize', 'tabnanny', 'compileall', 'dis', 'pickletools', 'formatter',
'__future__', 'array', 'ctypes', 'heapq', 'bisect', 'array', 'weakref',
'types', 'copy', 'pprint', 'repr', 'numbers', 'cmath', 'fractions',
'statistics', 'itertools', 'functools', 'operator', 'pathlib', 'fileinput',
'stat', 'filecmp', 'tempfile', 'glob', 'fnmatch', 'linecache', 'shutil',
'pickle', 'copyreg', 'shelve', 'marshal', 'dbm', 'sqlite3', 'zlib', 'gzip',
'bz2', 'lzma', 'zipfile', 'tarfile', 'configparser', 'netrc', 'xdrlib',
'plistlib', 'hashlib', 'hmac', 'secrets', 'os', 'io', 'time', 'argparse',
'getopt', 'logging', 'getpass', 'curses', 'platform', 'errno', 'ctypes',
'threading', 'multiprocessing', 'concurrent', 'subprocess', 'sched', 'queue',
'contextvars', 'asyncio', 'socket', 'ssl', 'email', 'json', 'mailcap',
'mailbox', 'mimetypes', 'base64', 'binhex', 'binascii', 'quopri', 'uu',
'html', 'xml', 'webbrowser', 'cgi', 'cgitb', 'wsgiref', 'urllib', 'http',
'ftplib', 'poplib', 'imaplib', 'nntplib', 'smtplib', 'smtpd', 'telnetlib',
'uuid', 'socketserver', 'xmlrpc', 'ipaddress', 'audioop', 'aifc', 'sunau',
'wave', 'chunk', 'colorsys', 'imghdr', 'sndhdr', 'ossaudiodev', 'gettext',
'locale', 'turtle', 'cmd', 'shlex', 'tkinter', 'typing', 'pydoc', 'doctest',
'unittest', 'test', '2to3', 'distutils', 'venv', 'ensurepip', 'zipapp',
'py_compile', 'compileall', 'dis', 'pickletools', 'pdb', 'timeit', 'trace',
'tracemalloc', 'warnings', 'faulthandler', 'pdb', 'dataclasses', 'cgi',
'cgitb', 'chunk', 'crypt', 'imghdr', 'mailcap', 'nis', 'nntplib', 'optparse',
'ossaudiodev', 'pipes', 'smtpd', 'sndhdr', 'spwd', 'sunau', 'telnetlib',
'uu', 'xdrlib', 'msilib', 'pstats', 'rlcompleter', 'tkinter', 'ast'
}
# Known package name mappings (import name -> package name)
PACKAGE_MAPPINGS = {
'bs4': 'beautifulsoup4',
'PIL': 'pillow',
'cv2': 'opencv-python',
'sklearn': 'scikit-learn',
'yaml': 'PyYAML',
'OpenSSL': 'pyOpenSSL',
'sqlalchemy': 'SQLAlchemy',
'playwright': 'playwright',
'patchright': 'patchright',
'dotenv': 'python-dotenv',
'fake_useragent': 'fake-useragent',
'playwright_stealth': 'tf-playwright-stealth',
'sentence_transformers': 'sentence-transformers',
'rank_bm25': 'rank-bm25',
'snowballstemmer': 'snowballstemmer',
'PyPDF2': 'PyPDF2',
'pdf2image': 'pdf2image',
}
class ImportVisitor(ast.NodeVisitor):
"""AST visitor to extract imports from Python files"""
def __init__(self):
self.imports = {} # Changed to dict to store line numbers
self.from_imports = {}
def visit_Import(self, node):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name not in self.imports:
self.imports[module_name] = []
self.imports[module_name].append(node.lineno)
def visit_ImportFrom(self, node):
if node.module and node.level == 0: # absolute imports only
module_name = node.module.split('.')[0]
if module_name not in self.from_imports:
self.from_imports[module_name] = []
self.from_imports[module_name].append(node.lineno)
def extract_imports_from_file(filepath: Path) -> Dict[str, List[int]]:
"""Extract all imports from a Python file with line numbers"""
all_imports = {}
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
visitor = ImportVisitor()
visitor.visit(tree)
# Merge imports and from_imports
for module, lines in visitor.imports.items():
if module not in all_imports:
all_imports[module] = []
all_imports[module].extend(lines)
for module, lines in visitor.from_imports.items():
if module not in all_imports:
all_imports[module] = []
all_imports[module].extend(lines)
except Exception as e:
# Silently skip files that can't be parsed
pass
return all_imports
def get_codebase_imports_with_files(root_dir: Path) -> Dict[str, List[Tuple[str, List[int]]]]:
"""Get all imports from the crawl4ai library and docs folders with file locations and line numbers"""
import_to_files = defaultdict(list)
# Only scan crawl4ai library folder and docs folder
target_dirs = [
root_dir / 'crawl4ai',
root_dir / 'docs'
]
for target_dir in target_dirs:
if not target_dir.exists():
continue
for py_file in target_dir.rglob('*.py'):
# Skip __pycache__ directories
if '__pycache__' in py_file.parts:
continue
# Skip setup.py and similar files
if py_file.name in ['setup.py', 'setup.cfg', 'conf.py']:
continue
imports = extract_imports_from_file(py_file)
# Map each import to the file and line numbers
for imp, line_numbers in imports.items():
relative_path = py_file.relative_to(root_dir)
import_to_files[imp].append((str(relative_path), sorted(line_numbers)))
return dict(import_to_files)
def get_declared_dependencies() -> Set[str]:
"""Get declared dependencies from pyproject.toml and requirements.txt"""
declared = set()
# Read from pyproject.toml
if Path('pyproject.toml').exists():
with open('pyproject.toml', 'r') as f:
data = toml.load(f)
# Get main dependencies
deps = data.get('project', {}).get('dependencies', [])
for dep in deps:
# Parse dependency string (e.g., "numpy>=1.26.0,<3")
match = re.match(r'^([a-zA-Z0-9_-]+)', dep)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
# Get optional dependencies
optional = data.get('project', {}).get('optional-dependencies', {})
for group, deps in optional.items():
for dep in deps:
match = re.match(r'^([a-zA-Z0-9_-]+)', dep)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
# Also check requirements.txt as backup
if Path('requirements.txt').exists():
with open('requirements.txt', 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
match = re.match(r'^([a-zA-Z0-9_-]+)', line)
if match:
pkg_name = match.group(1).lower()
declared.add(pkg_name)
return declared
def normalize_package_name(name: str) -> str:
"""Normalize package name for comparison"""
# Handle known mappings first
if name in PACKAGE_MAPPINGS:
return PACKAGE_MAPPINGS[name].lower()
# Basic normalization
return name.lower().replace('_', '-')
def check_missing_dependencies():
"""Main function to check for missing dependencies"""
print("🔍 Analyzing crawl4ai library and docs folders...\n")
# Get all imports with their file locations
root_dir = Path('.')
import_to_files = get_codebase_imports_with_files(root_dir)
# Get declared dependencies
declared_deps = get_declared_dependencies()
# Normalize declared dependencies
normalized_declared = {normalize_package_name(dep) for dep in declared_deps}
# Categorize imports
external_imports = {}
local_imports = {}
# Known local packages
local_packages = {'crawl4ai'}
for imp, file_info in import_to_files.items():
# Skip standard library
if imp in STDLIB_MODULES:
continue
# Check if it's a local import
if any(imp.startswith(local) for local in local_packages):
local_imports[imp] = file_info
else:
external_imports[imp] = file_info
# Check which external imports are not declared
not_declared = {}
declared_imports = {}
for imp, file_info in external_imports.items():
normalized_imp = normalize_package_name(imp)
# Check if import is covered by declared dependencies
found = False
for declared in normalized_declared:
if normalized_imp == declared or normalized_imp.startswith(declared + '.') or declared.startswith(normalized_imp):
found = True
break
if found:
declared_imports[imp] = file_info
else:
not_declared[imp] = file_info
# Print results
print(f"📊 Summary:")
print(f" - Total unique imports: {len(import_to_files)}")
print(f" - External imports: {len(external_imports)}")
print(f" - Declared dependencies: {len(declared_deps)}")
print(f" - External imports NOT in dependencies: {len(not_declared)}\n")
if not_declared:
print("❌ External imports NOT declared in pyproject.toml or requirements.txt:\n")
# Sort by import name
for imp in sorted(not_declared.keys()):
file_info = not_declared[imp]
print(f" 📦 {imp}")
if imp in PACKAGE_MAPPINGS:
print(f" → Package name: {PACKAGE_MAPPINGS[imp]}")
# Show up to 3 files that use this import
for i, (file_path, line_numbers) in enumerate(file_info[:3]):
# Format line numbers for clickable output
if len(line_numbers) == 1:
print(f" - {file_path}:{line_numbers[0]}")
else:
# Show first few line numbers
line_str = ','.join(str(ln) for ln in line_numbers[:3])
if len(line_numbers) > 3:
line_str += f"... ({len(line_numbers)} imports)"
print(f" - {file_path}: lines {line_str}")
if len(file_info) > 3:
print(f" ... and {len(file_info) - 3} more files")
print()
# Check for potentially unused dependencies
print("\n🔎 Checking declared dependencies usage...\n")
# Get all used external packages
used_packages = set()
for imp in external_imports.keys():
normalized = normalize_package_name(imp)
used_packages.add(normalized)
# Find unused
unused = []
for dep in declared_deps:
normalized_dep = normalize_package_name(dep)
# Check if any import uses this dependency
found_usage = False
for used in used_packages:
if used == normalized_dep or used.startswith(normalized_dep) or normalized_dep.startswith(used):
found_usage = True
break
if not found_usage:
# Some packages are commonly unused directly
indirect_deps = {'wheel', 'setuptools', 'pip', 'colorama', 'certifi', 'packaging', 'urllib3'}
if normalized_dep not in indirect_deps:
unused.append(dep)
if unused:
print("⚠️ Declared dependencies with NO imports found:")
for dep in sorted(unused):
print(f" - {dep}")
print("\n Note: These might be used indirectly or by other dependencies")
else:
print("✅ All declared dependencies have corresponding imports")
print("\n" + "="*60)
print("💡 How to use this report:")
print(" 1. Check each ❌ import to see if it's legitimate")
print(" 2. If legitimate, add the package to pyproject.toml")
print(" 3. If it's an internal module or typo, fix the import")
print(" 4. Review unused dependencies - remove if truly not needed")
print("="*60)
if __name__ == '__main__':
check_missing_dependencies()

View File

@@ -0,0 +1,582 @@
"""
Comprehensive test suite for ProxyConfig in different forms:
1. String form (ip:port:username:password)
2. Dict form (dictionary with keys)
3. Object form (ProxyConfig instance)
4. Environment variable form (from env vars)
Tests cover all possible scenarios and edge cases using pytest.
"""
import asyncio
import os
import pytest
import tempfile
from unittest.mock import patch
from crawl4ai import AsyncWebCrawler, BrowserConfig
from crawl4ai.async_configs import CrawlerRunConfig, ProxyConfig
from crawl4ai.cache_context import CacheMode
class TestProxyConfig:
"""Comprehensive test suite for ProxyConfig functionality."""
# Test data for different scenarios
# get free proxy server from from webshare.io https://www.webshare.io/?referral_code=3sqog0y1fvsl
TEST_PROXY_DATA = {
"server": "",
"username": "",
"password": "",
"ip": ""
}
def setup_method(self):
"""Setup for each test method."""
self.test_url = "https://httpbin.org/ip" # Use httpbin for testing
# ==================== OBJECT FORM TESTS ====================
def test_proxy_config_object_creation_basic(self):
"""Test basic ProxyConfig object creation."""
proxy = ProxyConfig(server="127.0.0.1:8080")
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract IP
def test_proxy_config_object_creation_full(self):
"""Test ProxyConfig object creation with all parameters."""
proxy = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
assert proxy.server == f"http://{self.TEST_PROXY_DATA['server']}"
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
assert proxy.ip == self.TEST_PROXY_DATA['ip']
def test_proxy_config_object_ip_extraction(self):
"""Test automatic IP extraction from server URL."""
test_cases = [
("http://192.168.1.1:8080", "192.168.1.1"),
("https://10.0.0.1:3128", "10.0.0.1"),
("192.168.1.100:8080", "192.168.1.100"),
("proxy.example.com:8080", "proxy.example.com"),
]
for server, expected_ip in test_cases:
proxy = ProxyConfig(server=server)
assert proxy.ip == expected_ip, f"Failed for server: {server}"
def test_proxy_config_object_invalid_server(self):
"""Test ProxyConfig with invalid server formats."""
# Should not raise exception but may not extract IP properly
proxy = ProxyConfig(server="invalid-format")
assert proxy.server == "invalid-format"
# IP extraction might fail but object should still be created
# ==================== DICT FORM TESTS ====================
def test_proxy_config_from_dict_basic(self):
"""Test creating ProxyConfig from basic dictionary."""
proxy_dict = {"server": "127.0.0.1:8080"}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
def test_proxy_config_from_dict_full(self):
"""Test creating ProxyConfig from complete dictionary."""
proxy_dict = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password'],
"ip": self.TEST_PROXY_DATA['ip']
}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == proxy_dict["server"]
assert proxy.username == proxy_dict["username"]
assert proxy.password == proxy_dict["password"]
assert proxy.ip == proxy_dict["ip"]
def test_proxy_config_from_dict_missing_keys(self):
"""Test creating ProxyConfig from dictionary with missing keys."""
proxy_dict = {"server": "127.0.0.1:8080", "username": "user"}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username == "user"
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract
def test_proxy_config_from_dict_empty(self):
"""Test creating ProxyConfig from empty dictionary."""
proxy_dict = {}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server is None
assert proxy.username is None
assert proxy.password is None
assert proxy.ip is None
def test_proxy_config_from_dict_none_values(self):
"""Test creating ProxyConfig from dictionary with None values."""
proxy_dict = {
"server": "127.0.0.1:8080",
"username": None,
"password": None,
"ip": None
}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract despite None
# ==================== STRING FORM TESTS ====================
def test_proxy_config_from_string_full_format(self):
"""Test creating ProxyConfig from full string format (ip:port:username:password)."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == f"http://{self.TEST_PROXY_DATA['ip']}:6114"
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
assert proxy.ip == self.TEST_PROXY_DATA['ip']
def test_proxy_config_from_string_ip_port_only(self):
"""Test creating ProxyConfig from string with only ip:port."""
proxy_str = "192.168.1.1:8080"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == "http://192.168.1.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "192.168.1.1"
def test_proxy_config_from_string_invalid_format(self):
"""Test creating ProxyConfig from invalid string formats."""
invalid_formats = [
"invalid",
"ip:port:user", # Missing password (3 parts)
"ip:port:user:pass:extra", # Too many parts (5 parts)
"",
"::", # Empty parts but 3 total (invalid)
"::::", # Empty parts but 5 total (invalid)
]
for proxy_str in invalid_formats:
with pytest.raises(ValueError, match="Invalid proxy string format"):
ProxyConfig.from_string(proxy_str)
def test_proxy_config_from_string_edge_cases_that_work(self):
"""Test string formats that should work but might be edge cases."""
# These cases actually work as valid formats
edge_cases = [
(":", "http://:", ""), # ip:port format with empty values
(":::", "http://:", ""), # ip:port:user:pass format with empty values
]
for proxy_str, expected_server, expected_ip in edge_cases:
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == expected_server
assert proxy.ip == expected_ip
def test_proxy_config_from_string_edge_cases(self):
"""Test string parsing edge cases."""
# Test with different port numbers
proxy_str = "10.0.0.1:3128:user:pass"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == "http://10.0.0.1:3128"
# Test with special characters in credentials
proxy_str = "10.0.0.1:8080:user@domain:pass:word"
with pytest.raises(ValueError): # Should fail due to extra colon in password
ProxyConfig.from_string(proxy_str)
# ==================== ENVIRONMENT VARIABLE TESTS ====================
def test_proxy_config_from_env_single_proxy(self):
"""Test loading single proxy from environment variable."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 1
proxy = proxies[0]
assert proxy.ip == self.TEST_PROXY_DATA['ip']
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
def test_proxy_config_from_env_multiple_proxies(self):
"""Test loading multiple proxies from environment variable."""
proxy_list = [
"192.168.1.1:8080:user1:pass1",
"192.168.1.2:8080:user2:pass2",
"10.0.0.1:3128" # No auth
]
proxy_str = ",".join(proxy_list)
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 3
# Check first proxy
assert proxies[0].ip == "192.168.1.1"
assert proxies[0].username == "user1"
assert proxies[0].password == "pass1"
# Check second proxy
assert proxies[1].ip == "192.168.1.2"
assert proxies[1].username == "user2"
assert proxies[1].password == "pass2"
# Check third proxy (no auth)
assert proxies[2].ip == "10.0.0.1"
assert proxies[2].username is None
assert proxies[2].password is None
def test_proxy_config_from_env_empty_var(self):
"""Test loading from empty environment variable."""
with patch.dict(os.environ, {'TEST_PROXIES': ''}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 0
def test_proxy_config_from_env_missing_var(self):
"""Test loading from missing environment variable."""
# Ensure the env var doesn't exist
with patch.dict(os.environ, {}, clear=True):
proxies = ProxyConfig.from_env('NON_EXISTENT_VAR')
assert len(proxies) == 0
def test_proxy_config_from_env_with_empty_entries(self):
"""Test loading proxies with empty entries in the list."""
proxy_str = "192.168.1.1:8080:user:pass,,10.0.0.1:3128,"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 2 # Empty entries should be skipped
assert proxies[0].ip == "192.168.1.1"
assert proxies[1].ip == "10.0.0.1"
def test_proxy_config_from_env_with_invalid_entries(self):
"""Test loading proxies with some invalid entries."""
proxy_str = "192.168.1.1:8080:user:pass,invalid_proxy,10.0.0.1:3128"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
# Should handle errors gracefully and return valid proxies
proxies = ProxyConfig.from_env('TEST_PROXIES')
# Depending on implementation, might return partial list or empty
# This tests error handling
assert isinstance(proxies, list)
# ==================== SERIALIZATION TESTS ====================
def test_proxy_config_to_dict(self):
"""Test converting ProxyConfig to dictionary."""
proxy = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
result_dict = proxy.to_dict()
expected = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password'],
"ip": self.TEST_PROXY_DATA['ip']
}
assert result_dict == expected
def test_proxy_config_clone(self):
"""Test cloning ProxyConfig with modifications."""
original = ProxyConfig(
server="http://127.0.0.1:8080",
username="user",
password="pass"
)
# Clone with modifications
cloned = original.clone(username="new_user", password="new_pass")
# Original should be unchanged
assert original.username == "user"
assert original.password == "pass"
# Clone should have new values
assert cloned.username == "new_user"
assert cloned.password == "new_pass"
assert cloned.server == original.server # Unchanged value
def test_proxy_config_roundtrip_serialization(self):
"""Test that ProxyConfig can be serialized and deserialized without loss."""
original = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
# Serialize to dict and back
serialized = original.to_dict()
deserialized = ProxyConfig.from_dict(serialized)
assert deserialized.server == original.server
assert deserialized.username == original.username
assert deserialized.password == original.password
assert deserialized.ip == original.ip
# ==================== INTEGRATION TESTS ====================
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_object(self):
"""Test AsyncWebCrawler with ProxyConfig object."""
proxy_config = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password']
)
browser_config = BrowserConfig(headless=True)
# Test that the crawler accepts the ProxyConfig object without errors
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
# Note: This might fail due to actual proxy connection, but should not fail due to config issues
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000 # Short timeout for testing
)
)
# If we get here, proxy config was accepted
assert result is not None
except Exception as e:
# We expect connection errors with test proxies, but not config errors
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
assert "proxy_config" not in error_msg, f"Proxy config error: {e}"
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_dict(self):
"""Test AsyncWebCrawler with ProxyConfig from dictionary."""
proxy_dict = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password']
}
proxy_config = ProxyConfig.from_dict(proxy_dict)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000
)
)
assert result is not None
except Exception as e:
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_from_string(self):
"""Test AsyncWebCrawler with ProxyConfig from string."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
proxy_config = ProxyConfig.from_string(proxy_str)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000
)
)
assert result is not None
except Exception as e:
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
# ==================== EDGE CASES AND ERROR HANDLING ====================
def test_proxy_config_with_none_server(self):
"""Test ProxyConfig behavior with None server."""
proxy = ProxyConfig(server=None)
assert proxy.server is None
assert proxy.ip is None # Should not crash
def test_proxy_config_with_empty_string_server(self):
"""Test ProxyConfig behavior with empty string server."""
proxy = ProxyConfig(server="")
assert proxy.server == ""
assert proxy.ip is None or proxy.ip == ""
def test_proxy_config_special_characters_in_credentials(self):
"""Test ProxyConfig with special characters in username/password."""
special_chars_tests = [
("user@domain.com", "pass!@#$%"),
("user_123", "p@ssw0rd"),
("user-test", "pass-word"),
]
for username, password in special_chars_tests:
proxy = ProxyConfig(
server="http://127.0.0.1:8080",
username=username,
password=password
)
assert proxy.username == username
assert proxy.password == password
def test_proxy_config_unicode_handling(self):
"""Test ProxyConfig with unicode characters."""
proxy = ProxyConfig(
server="http://127.0.0.1:8080",
username="ユーザー", # Japanese characters
password="пароль" # Cyrillic characters
)
assert proxy.username == "ユーザー"
assert proxy.password == "пароль"
# ==================== PERFORMANCE TESTS ====================
def test_proxy_config_creation_performance(self):
"""Test that ProxyConfig creation is reasonably fast."""
import time
start_time = time.time()
for i in range(1000):
proxy = ProxyConfig(
server=f"http://192.168.1.{i % 255}:8080",
username=f"user{i}",
password=f"pass{i}"
)
end_time = time.time()
# Should be able to create 1000 configs in less than 1 second
assert (end_time - start_time) < 1.0
def test_proxy_config_from_env_performance(self):
"""Test that loading many proxies from env is reasonably fast."""
import time
# Create a large list of proxy strings
proxy_list = [f"192.168.1.{i}:8080:user{i}:pass{i}" for i in range(100)]
proxy_str = ",".join(proxy_list)
with patch.dict(os.environ, {'PERF_TEST_PROXIES': proxy_str}):
start_time = time.time()
proxies = ProxyConfig.from_env('PERF_TEST_PROXIES')
end_time = time.time()
assert len(proxies) == 100
# Should be able to parse 100 proxies in less than 1 second
assert (end_time - start_time) < 1.0
# ==================== STANDALONE TEST FUNCTIONS ====================
@pytest.mark.asyncio
async def test_dict_proxy():
"""Original test function for dict proxy - kept for backward compatibility."""
proxy_config = {
"server": "23.95.150.145:6114",
"username": "cfyswbwn",
"password": "1gs266hoqysi"
}
proxy_config_obj = ProxyConfig.from_dict(proxy_config)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("Dict proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"Dict proxy test error (expected): {e}")
@pytest.mark.asyncio
async def test_string_proxy():
"""Test function for string proxy format."""
proxy_str = "23.95.150.145:6114:cfyswbwn:1gs266hoqysi"
proxy_config_obj = ProxyConfig.from_string(proxy_str)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("String proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"String proxy test error (expected): {e}")
@pytest.mark.asyncio
async def test_env_proxy():
"""Test function for environment variable proxy."""
# Set environment variable
os.environ['TEST_PROXIES'] = "23.95.150.145:6114:cfyswbwn:1gs266hoqysi"
proxies = ProxyConfig.from_env('TEST_PROXIES')
if proxies:
proxy_config_obj = proxies[0] # Use first proxy
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("Environment proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"Environment proxy test error (expected): {e}")
else:
print("No proxies loaded from environment")
if __name__ == "__main__":
print("Running comprehensive ProxyConfig tests...")
print("=" * 50)
# Run the standalone test functions
print("\n1. Testing dict proxy format...")
asyncio.run(test_dict_proxy())
print("\n2. Testing string proxy format...")
asyncio.run(test_string_proxy())
print("\n3. Testing environment variable proxy format...")
asyncio.run(test_env_proxy())
print("\n" + "=" * 50)
print("To run the full pytest suite, use: pytest " + __file__)
print("=" * 50)

42
tests/test_arun_many.py Normal file
View File

@@ -0,0 +1,42 @@
"""
Test example for multiple crawler configs feature
"""
import asyncio
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
async def test_run_many():
default_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
# scraping_strategy=PDFContentScrapingStrategy()
)
test_urls = [
# "https://blog.python.org/", # Blog URL
"https://www.python.org/", # Generic HTTPS page
"https://www.kidocode.com/", # Generic HTTPS page
"https://www.example.com/", # Generic HTTPS page
# "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf",
]
async with AsyncWebCrawler() as crawler:
# Single config - traditional usage still works
print("Test 1: Single config (backwards compatible)")
result = await crawler.arun_many(
urls=test_urls[:2],
config=default_config
)
print(f"Crawled {len(result)} URLs with single config\n")
for item in result:
print(f" {item.url} -> {item.status_code}")
if __name__ == "__main__":
asyncio.run(test_run_many())

View File

@@ -0,0 +1,131 @@
"""
Test only the config matching logic without running crawler
"""
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from crawl4ai.async_configs import CrawlerRunConfig, MatchMode
def test_all_matching_scenarios():
print("Testing CrawlerRunConfig.is_match() method")
print("=" * 50)
# Test 1: Single string pattern
print("\n1. Single string pattern (glob style)")
config = CrawlerRunConfig(
url_matcher="*.pdf",
# For example we can set this => scraping_strategy=PDFContentScrapingStrategy()
)
test_urls = [
("https://example.com/file.pdf", True),
("https://example.com/doc.PDF", False), # Case sensitive
("https://example.com/file.txt", False),
("file.pdf", True),
]
for url, expected in test_urls:
result = config.is_match(url)
status = "" if result == expected else ""
print(f" {status} {url} -> {result}")
# Test 2: List of patterns with OR
print("\n2. List of patterns with OR (default)")
config = CrawlerRunConfig(
url_matcher=["*/article/*", "*/blog/*", "*.html"],
match_mode=MatchMode.OR
)
test_urls = [
("https://example.com/article/news", True),
("https://example.com/blog/post", True),
("https://example.com/page.html", True),
("https://example.com/page.php", False),
]
for url, expected in test_urls:
result = config.is_match(url)
status = "" if result == expected else ""
print(f" {status} {url} -> {result}")
# Test 3: Custom function
print("\n3. Custom function matcher")
config = CrawlerRunConfig(
url_matcher=lambda url: 'api' in url and (url.endswith('.json') or url.endswith('.xml'))
)
test_urls = [
("https://api.example.com/data.json", True),
("https://api.example.com/data.xml", True),
("https://api.example.com/data.html", False),
("https://example.com/data.json", False), # No 'api'
]
for url, expected in test_urls:
result = config.is_match(url)
status = "" if result == expected else ""
print(f" {status} {url} -> {result}")
# Test 4: Mixed list with AND
print("\n4. Mixed patterns and functions with AND")
config = CrawlerRunConfig(
url_matcher=[
"https://*", # Must be HTTPS
lambda url: '.com' in url, # Must have .com
lambda url: len(url) < 50 # Must be short
],
match_mode=MatchMode.AND
)
test_urls = [
("https://example.com/page", True),
("http://example.com/page", False), # Not HTTPS
("https://example.org/page", False), # No .com
("https://example.com/" + "x" * 50, False), # Too long
]
for url, expected in test_urls:
result = config.is_match(url)
status = "" if result == expected else ""
print(f" {status} {url} -> {result}")
# Test 5: Complex real-world scenario
print("\n5. Complex pattern combinations")
config = CrawlerRunConfig(
url_matcher=[
"*/api/v[0-9]/*", # API versioned endpoints
lambda url: 'graphql' in url, # GraphQL endpoints
"*.json" # JSON files
],
match_mode=MatchMode.OR
)
test_urls = [
("https://example.com/api/v1/users", True),
("https://example.com/api/v2/posts", True),
("https://example.com/graphql", True),
("https://example.com/data.json", True),
("https://example.com/api/users", False), # No version
]
for url, expected in test_urls:
result = config.is_match(url)
status = "" if result == expected else ""
print(f" {status} {url} -> {result}")
# Test 6: Edge cases
print("\n6. Edge cases")
# No matcher
config = CrawlerRunConfig()
result = config.is_match("https://example.com")
print(f" {'' if not result else ''} No matcher -> {result}")
# Empty list
config = CrawlerRunConfig(url_matcher=[])
result = config.is_match("https://example.com")
print(f" {'' if not result else ''} Empty list -> {result}")
# None in list (should be skipped)
config = CrawlerRunConfig(url_matcher=["*.pdf", None, "*.doc"])
result = config.is_match("test.pdf")
print(f" {'' if result else ''} List with None -> {result}")
print("\n" + "=" * 50)
print("All matching tests completed!")
if __name__ == "__main__":
test_all_matching_scenarios()

View File

@@ -0,0 +1,87 @@
"""
Test config selection logic in dispatchers
"""
import asyncio
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from crawl4ai.async_configs import CrawlerRunConfig, MatchMode
from crawl4ai.async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher
class TestDispatcher(BaseDispatcher):
"""Simple test dispatcher to verify config selection"""
async def crawl_url(self, url, config, task_id, **kwargs):
# Just return which config was selected
selected = self.select_config(url, config)
return {"url": url, "config_id": id(selected)}
async def run_urls(self, urls, crawler, config):
results = []
for url in urls:
result = await self.crawl_url(url, config, "test")
results.append(result)
return results
async def test_dispatcher_config_selection():
print("Testing dispatcher config selection")
print("=" * 50)
# Create test configs with different matchers
pdf_config = CrawlerRunConfig(url_matcher="*.pdf")
api_config = CrawlerRunConfig(url_matcher=lambda url: 'api' in url)
default_config = CrawlerRunConfig() # No matcher
configs = [pdf_config, api_config, default_config]
# Create test dispatcher
dispatcher = TestDispatcher()
# Test single config
print("\nTest 1: Single config")
result = await dispatcher.crawl_url("https://example.com/file.pdf", pdf_config, "test1")
assert result["config_id"] == id(pdf_config)
print("✓ Single config works")
# Test config list selection
print("\nTest 2: Config list selection")
test_cases = [
("https://example.com/file.pdf", id(pdf_config)),
("https://api.example.com/data", id(api_config)),
("https://example.com/page", id(configs[0])), # No match, uses first
]
for url, expected_id in test_cases:
result = await dispatcher.crawl_url(url, configs, "test")
assert result["config_id"] == expected_id, f"URL {url} got wrong config"
print(f"{url} -> correct config selected")
# Test with MemoryAdaptiveDispatcher
print("\nTest 3: MemoryAdaptiveDispatcher config selection")
mem_dispatcher = MemoryAdaptiveDispatcher()
# Test select_config method directly
selected = mem_dispatcher.select_config("https://example.com/doc.pdf", configs)
assert selected == pdf_config
print("✓ MemoryAdaptiveDispatcher.select_config works")
# Test empty config list
print("\nTest 4: Edge cases")
selected = mem_dispatcher.select_config("https://example.com", [])
assert isinstance(selected, CrawlerRunConfig) # Should return default
print("✓ Empty config list returns default config")
# Test None config
selected = mem_dispatcher.select_config("https://example.com", None)
assert isinstance(selected, CrawlerRunConfig) # Should return default
print("✓ None config returns default config")
print("\n" + "=" * 50)
print("All dispatcher tests passed! ✓")
if __name__ == "__main__":
asyncio.run(test_dispatcher_config_selection())

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""Test script to verify Docker API with LLM provider configuration."""
import requests
import json
import time
BASE_URL = "http://localhost:11235"
def test_health():
"""Test health endpoint."""
print("1. Testing health endpoint...")
response = requests.get(f"{BASE_URL}/health")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
print()
def test_schema():
"""Test schema endpoint to see configuration."""
print("2. Testing schema endpoint...")
response = requests.get(f"{BASE_URL}/schema")
print(f" Status: {response.status_code}")
# Print only browser config to keep output concise
print(f" Browser config keys: {list(response.json().get('browser', {}).keys())[:5]}...")
print()
def test_markdown_with_llm_filter():
"""Test markdown endpoint with LLM filter (should use configured provider)."""
print("3. Testing markdown endpoint with LLM filter...")
print(" This should use the Groq provider from LLM_PROVIDER env var")
# Note: This will fail with dummy API keys, but we can see if it tries to use Groq
payload = {
"url": "https://httpbin.org/html",
"f": "llm",
"q": "Extract the main content"
}
response = requests.post(f"{BASE_URL}/md", json=payload)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text[:200]}...")
else:
print(f" Success! Markdown length: {len(response.json().get('markdown', ''))} chars")
print()
def test_markdown_with_provider_override():
"""Test markdown endpoint with provider override in request."""
print("4. Testing markdown endpoint with provider override...")
print(" This should use OpenAI provider from request parameter")
payload = {
"url": "https://httpbin.org/html",
"f": "llm",
"q": "Extract the main content",
"provider": "openai/gpt-4" # Override to use OpenAI
}
response = requests.post(f"{BASE_URL}/md", json=payload)
print(f" Status: {response.status_code}")
if response.status_code != 200:
print(f" Error: {response.text[:200]}...")
else:
print(f" Success! Markdown length: {len(response.json().get('markdown', ''))} chars")
print()
def test_simple_crawl():
"""Test simple crawl without LLM."""
print("5. Testing simple crawl (no LLM required)...")
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {
"type": "BrowserConfig",
"params": {"headless": True}
},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {"cache_mode": "bypass"}
}
}
response = requests.post(f"{BASE_URL}/crawl", json=payload)
print(f" Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f" Success: {result.get('success')}")
print(f" Results count: {len(result.get('results', []))}")
if result.get('results'):
print(f" First result success: {result['results'][0].get('success')}")
else:
print(f" Error: {response.text[:200]}...")
print()
def test_playground():
"""Test if playground is accessible."""
print("6. Testing playground interface...")
response = requests.get(f"{BASE_URL}/playground")
print(f" Status: {response.status_code}")
print(f" Content-Type: {response.headers.get('content-type')}")
print()
if __name__ == "__main__":
print("=== Crawl4AI Docker API Tests ===\n")
print(f"Testing API at {BASE_URL}\n")
# Wait a bit for server to be fully ready
time.sleep(2)
test_health()
test_schema()
test_simple_crawl()
test_playground()
print("\nTesting LLM functionality (these may fail with dummy API keys):\n")
test_markdown_with_llm_filter()
test_markdown_with_provider_override()
print("\nTests completed!")

71
tests/test_memory_macos.py Executable file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Test script to verify macOS memory calculation accuracy."""
import psutil
import platform
import time
from crawl4ai.memory_utils import get_true_memory_usage_percent, get_memory_stats, get_true_available_memory_gb
def test_memory_calculation():
"""Test and compare memory calculations."""
print(f"Platform: {platform.system()}")
print(f"Python version: {platform.python_version()}")
print("-" * 60)
# Get psutil's view
vm = psutil.virtual_memory()
psutil_percent = vm.percent
psutil_available_gb = vm.available / (1024**3)
total_gb = vm.total / (1024**3)
# Get our corrected view
true_percent = get_true_memory_usage_percent()
true_available_gb = get_true_available_memory_gb()
true_percent_calc, available_calc, total_calc = get_memory_stats()
print("Memory Statistics Comparison:")
print(f"Total Memory: {total_gb:.2f} GB")
print()
print("PSUtil (Standard) Calculation:")
print(f" - Memory Used: {psutil_percent:.1f}%")
print(f" - Available: {psutil_available_gb:.2f} GB")
print()
print("Platform-Aware Calculation:")
print(f" - Memory Used: {true_percent:.1f}%")
print(f" - Available: {true_available_gb:.2f} GB")
print(f" - Difference: {true_available_gb - psutil_available_gb:.2f} GB of reclaimable memory")
print()
# Show the impact on dispatcher behavior
print("Impact on MemoryAdaptiveDispatcher:")
thresholds = {
"Normal": 90.0,
"Critical": 95.0,
"Recovery": 85.0
}
for name, threshold in thresholds.items():
psutil_triggered = psutil_percent >= threshold
true_triggered = true_percent >= threshold
print(f" - {name} Threshold ({threshold}%):")
print(f" PSUtil: {'TRIGGERED' if psutil_triggered else 'OK'}")
print(f" Platform-Aware: {'TRIGGERED' if true_triggered else 'OK'}")
if psutil_triggered != true_triggered:
print(f" → Difference: Platform-aware prevents false {'pressure' if psutil_triggered else 'recovery'}")
print()
# Monitor for a few seconds
print("Monitoring memory for 10 seconds...")
for i in range(10):
vm = psutil.virtual_memory()
true_pct = get_true_memory_usage_percent()
print(f" {i+1}s - PSUtil: {vm.percent:.1f}% | Platform-Aware: {true_pct:.1f}%", end="\r")
time.sleep(1)
print("\n")
if __name__ == "__main__":
test_memory_calculation()

117
tests/test_multi_config.py Normal file
View File

@@ -0,0 +1,117 @@
"""
Test example for multiple crawler configs feature
"""
import asyncio
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, MatchMode, CacheMode
async def test_multi_config():
# Create different configs for different URL patterns
# Config for PDF files
pdf_config = CrawlerRunConfig(
url_matcher="*.pdf",
)
# Config for articles (using multiple patterns with OR logic)
article_config = CrawlerRunConfig(
url_matcher=["*/news/*", "*blog*", "*/article/*"],
match_mode=MatchMode.OR,
screenshot=True,
)
# Config using custom matcher function
api_config = CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or 'json' in url,
)
# Config combining patterns and functions with AND logic
secure_docs_config = CrawlerRunConfig(
url_matcher=[
"*.doc*", # Matches .doc, .docx
lambda url: url.startswith('https://') # Must be HTTPS
],
match_mode=MatchMode.AND,
)
# Default config (no url_matcher means it won't match anything unless it's the fallback)
default_config = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
)
# List of configs - order matters! First match wins
configs = [
pdf_config,
article_config,
api_config,
secure_docs_config,
default_config # Fallback
]
# Test URLs - using real URLs that exist
test_urls = [
"https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", # Real PDF
"https://www.bbc.com/news/articles/c5y3e3glnldo", # News article
"https://blog.python.org/", # Blog URL
"https://api.github.com/users/github", # GitHub API (returns JSON)
"https://httpbin.org/json", # API endpoint that returns JSON
"https://www.python.org/", # Generic HTTPS page
"http://info.cern.ch/", # HTTP (not HTTPS) page
"https://example.com/", # → Default config
]
# Test the matching logic
print("Config matching test:")
print("-" * 50)
for url in test_urls:
for i, config in enumerate(configs):
if config.is_match(url):
print(f"{url} -> Config {i} matches")
break
else:
print(f"{url} -> No match, will use fallback (first config)")
print("\n" + "=" * 50 + "\n")
# Now test with actual crawler
async with AsyncWebCrawler() as crawler:
# Single config - traditional usage still works
print("Test 1: Single config (backwards compatible)")
result = await crawler.arun_many(
urls=["https://www.python.org/"],
config=default_config
)
print(f"Crawled {len(result)} URLs with single config\n")
# Multiple configs - new feature
print("Test 2: Multiple configs")
# Just test with 2 URLs to avoid timeout
results = await crawler.arun_many(
urls=test_urls[:2], # Just test first 2 URLs
config=configs # Pass list of configs
)
print(f"Crawled {len(results)} URLs with multiple configs")
# Using custom matcher inline
print("\nTest 3: Inline custom matcher")
custom_config = CrawlerRunConfig(
url_matcher=lambda url: len(url) > 50 and 'python' in url.lower(),
verbose=False
)
results = await crawler.arun_many(
urls=[
"https://docs.python.org/3/library/asyncio.html", # Long URL with 'python'
"https://python.org/", # Short URL with 'python' - won't match
"https://www.google.com/" # No 'python' - won't match
],
config=[custom_config, default_config]
)
print(f"Crawled {len(results)} URLs with custom matcher")
if __name__ == "__main__":
asyncio.run(test_multi_config())