diff --git a/crawl4ai/antibot_detector.py b/crawl4ai/antibot_detector.py
new file mode 100644
index 00000000..e2831e46
--- /dev/null
+++ b/crawl4ai/antibot_detector.py
@@ -0,0 +1,164 @@
+"""
+Anti-bot detection heuristics for crawl results.
+
+Examines HTTP status codes and HTML content patterns to determine
+if a crawl was blocked by anti-bot protection.
+
+Detection is layered: high-confidence structural markers trigger alone,
+while generic patterns require corroborating signals (status code + short page)
+to avoid false positives.
+"""
+
+import re
+from typing import Optional, Tuple
+
+
+# ---------------------------------------------------------------------------
+# Tier 1: High-confidence structural markers (single signal sufficient)
+# These are unique to block pages and virtually never appear in real content.
+# ---------------------------------------------------------------------------
+_TIER1_PATTERNS = [
+ # Akamai — full reference pattern: Reference #18.2d351ab8.1557333295.a4e16ab
+ (re.compile(r"Reference\s*#\s*[\d]+\.[0-9a-f]+\.\d+\.[0-9a-f]+", re.IGNORECASE),
+ "Akamai block (Reference #)"),
+ # Akamai — "Pardon Our Interruption" challenge page
+ (re.compile(r"Pardon\s+Our\s+Interruption", re.IGNORECASE),
+ "Akamai challenge (Pardon Our Interruption)"),
+ # Cloudflare — challenge form with anti-bot token
+ (re.compile(r'challenge-form.*?__cf_chl_f_tk=', re.IGNORECASE | re.DOTALL),
+ "Cloudflare challenge form"),
+ # Cloudflare — error code spans (1020 Access Denied, 1010, 1012, 1015)
+ (re.compile(r'\d{4}', re.IGNORECASE),
+ "Cloudflare firewall block"),
+ # Cloudflare — IUAM challenge script
+ (re.compile(r'/cdn-cgi/challenge-platform/\S+orchestrate', re.IGNORECASE),
+ "Cloudflare JS challenge"),
+ # PerimeterX / HUMAN — block page with app ID assignment (not prose mentions)
+ (re.compile(r"window\._pxAppId\s*=", re.IGNORECASE),
+ "PerimeterX block"),
+ # PerimeterX — captcha CDN
+ (re.compile(r"captcha\.px-cdn\.net", re.IGNORECASE),
+ "PerimeterX captcha"),
+ # DataDome — captcha delivery domain (structural, not the word "datadome")
+ (re.compile(r"captcha-delivery\.com", re.IGNORECASE),
+ "DataDome captcha"),
+ # Imperva/Incapsula — resource iframe
+ (re.compile(r"_Incapsula_Resource", re.IGNORECASE),
+ "Imperva/Incapsula block"),
+ # Imperva/Incapsula — incident ID
+ (re.compile(r"Incapsula\s+incident\s+ID", re.IGNORECASE),
+ "Imperva/Incapsula incident"),
+ # Sucuri firewall
+ (re.compile(r"Sucuri\s+WebSite\s+Firewall", re.IGNORECASE),
+ "Sucuri firewall block"),
+ # Kasada
+ (re.compile(r"KPSDK\.scriptStart\s*=\s*KPSDK\.now\(\)", re.IGNORECASE),
+ "Kasada challenge"),
+]
+
+# ---------------------------------------------------------------------------
+# Tier 2: Medium-confidence patterns — only match on SHORT pages (< 10KB)
+# These terms appear in real content (articles, login forms, security blogs)
+# so we require the page to be small to avoid false positives.
+# ---------------------------------------------------------------------------
+_TIER2_PATTERNS = [
+ # Akamai / generic — "Access Denied" (extremely common on legit 403s too)
+ (re.compile(r"Access\s+Denied", re.IGNORECASE),
+ "Access Denied on short page"),
+ # Cloudflare — "Just a moment" / "Checking your browser"
+ (re.compile(r"Checking\s+your\s+browser", re.IGNORECASE),
+ "Cloudflare browser check"),
+ (re.compile(r"
\s*Just\s+a\s+moment", re.IGNORECASE),
+ "Cloudflare interstitial"),
+ # CAPTCHA on a block page (not a login form — login forms are big pages)
+ (re.compile(r'class=["\']g-recaptcha["\']', re.IGNORECASE),
+ "reCAPTCHA on block page"),
+ (re.compile(r'class=["\']h-captcha["\']', re.IGNORECASE),
+ "hCaptcha on block page"),
+ # PerimeterX block page title
+ (re.compile(r"Access\s+to\s+This\s+Page\s+Has\s+Been\s+Blocked", re.IGNORECASE),
+ "PerimeterX block page"),
+ # Generic block phrases (only on short pages to avoid matching articles)
+ (re.compile(r"blocked\s+by\s+security", re.IGNORECASE),
+ "Blocked by security"),
+ (re.compile(r"Request\s+unsuccessful", re.IGNORECASE),
+ "Request unsuccessful (Imperva)"),
+]
+
+_TIER2_MAX_SIZE = 10000 # Only check tier 2 patterns on pages under 10KB
+
+# ---------------------------------------------------------------------------
+# Thresholds
+# ---------------------------------------------------------------------------
+_BLOCK_PAGE_MAX_SIZE = 5000 # 403 + short page = likely block
+_EMPTY_CONTENT_THRESHOLD = 100 # 200 + near-empty = JS-blocked render
+
+
+def _looks_like_data(html: str) -> bool:
+ """Check if content looks like a JSON/XML API response (not an HTML block page)."""
+ stripped = html.strip()
+ if not stripped:
+ return False
+ return stripped[0] in ('{', '[', '<' ) and not stripped.startswith(' Tuple[bool, str]:
+ """
+ Detect if a crawl result indicates anti-bot blocking.
+
+ Uses layered detection to maximize coverage while minimizing false positives:
+ - Tier 1 patterns (structural markers) trigger on any page size
+ - Tier 2 patterns (generic terms) only trigger on short pages (< 10KB)
+ - Status-code checks require corroborating content signals
+
+ Args:
+ status_code: HTTP status code from the response.
+ html: Raw HTML content from the response.
+ error_message: Error message from the crawl result, if any.
+
+ Returns:
+ Tuple of (is_blocked, reason). reason is empty string when not blocked.
+ """
+ html = html or ""
+ html_len = len(html)
+
+ # --- HTTP 429 is always rate limiting ---
+ if status_code == 429:
+ return True, "HTTP 429 Too Many Requests"
+
+ # --- Check first 15KB for tier 1 patterns (high confidence, any page size) ---
+ snippet = html[:15000]
+ if snippet:
+ for pattern, reason in _TIER1_PATTERNS:
+ if pattern.search(snippet):
+ return True, reason
+
+ # --- HTTP 403 + short page (no tier 1 match = check tier 2) ---
+ if status_code == 403 and html_len < _BLOCK_PAGE_MAX_SIZE:
+ # Skip JSON/XML API responses — short 403 from APIs are legit auth errors
+ if not _looks_like_data(html):
+ # Short 403 with almost no content is very likely a block
+ if html_len < _EMPTY_CONTENT_THRESHOLD:
+ return True, f"HTTP 403 with near-empty response ({html_len} bytes)"
+ # Check tier 2 patterns on 403 short pages
+ for pattern, reason in _TIER2_PATTERNS:
+ if pattern.search(snippet):
+ return True, f"{reason} (HTTP 403, {html_len} bytes)"
+
+ # --- Tier 2 patterns on any error status + short page ---
+ if status_code and status_code >= 400 and html_len < _TIER2_MAX_SIZE:
+ for pattern, reason in _TIER2_PATTERNS:
+ if pattern.search(snippet):
+ return True, f"{reason} (HTTP {status_code}, {html_len} bytes)"
+
+ # --- HTTP 200 + near-empty content (JS-rendered empty page) ---
+ if status_code == 200:
+ stripped = html.strip()
+ if len(stripped) < _EMPTY_CONTENT_THRESHOLD and not _looks_like_data(html):
+ return True, f"Near-empty content ({len(stripped)} bytes) with HTTP 200"
+
+ return False, ""
diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py
index b6397a4d..1c3c21b6 100644
--- a/crawl4ai/async_configs.py
+++ b/crawl4ai/async_configs.py
@@ -30,7 +30,7 @@ from .cache_context import CacheMode
from .proxy_strategy import ProxyRotationStrategy
import inspect
-from typing import Any, Callable, Dict, List, Optional, Union
+from typing import Any, Awaitable, Callable, Dict, List, Optional, Union
from enum import Enum
# Type alias for URL matching
@@ -353,19 +353,23 @@ class ProxyConfig:
username: Optional[str] = None,
password: Optional[str] = None,
ip: Optional[str] = None,
+ is_fallback: bool = False,
):
"""Configuration class for a single proxy.
-
+
Args:
server: Proxy server URL (e.g., "http://127.0.0.1:8080")
username: Optional username for proxy authentication
password: Optional password for proxy authentication
ip: Optional IP address for verification purposes
+ is_fallback: If True, proxy is only used when anti-bot blocking is
+ detected. If False (default), proxy is used on every request.
"""
self.server = server
self.username = username
self.password = password
-
+ self.is_fallback = is_fallback
+
# Extract IP from server if not explicitly provided
self.ip = ip or self._extract_ip_from_server()
@@ -425,7 +429,8 @@ class ProxyConfig:
server=proxy_dict.get("server"),
username=proxy_dict.get("username"),
password=proxy_dict.get("password"),
- ip=proxy_dict.get("ip")
+ ip=proxy_dict.get("ip"),
+ is_fallback=proxy_dict.get("is_fallback", False),
)
@staticmethod
@@ -455,7 +460,8 @@ class ProxyConfig:
"server": self.server,
"username": self.username,
"password": self.password,
- "ip": self.ip
+ "ip": self.ip,
+ "is_fallback": self.is_fallback,
}
def clone(self, **kwargs) -> "ProxyConfig":
@@ -1470,6 +1476,10 @@ class CrawlerRunConfig():
match_mode: MatchMode = MatchMode.OR,
# Experimental Parameters
experimental: Dict[str, Any] = None,
+ # Anti-Bot Retry Parameters
+ max_retries: int = 0,
+ fallback_proxy_configs: Optional[List["ProxyConfig"]] = None,
+ fallback_fetch_function: Optional[Callable[[str], Awaitable[str]]] = None,
):
# TODO: Planning to set properties dynamically based on the __init__ signature
self.url = url
@@ -1652,7 +1662,12 @@ class CrawlerRunConfig():
# Experimental Parameters
self.experimental = experimental or {}
-
+
+ # Anti-Bot Retry Parameters
+ self.max_retries = max_retries
+ self.fallback_proxy_configs = fallback_proxy_configs or []
+ self.fallback_fetch_function = fallback_fetch_function
+
# Compile C4A scripts if provided
if self.c4a_script and not self.js_code:
self._compile_c4a_script()
@@ -1887,6 +1902,8 @@ class CrawlerRunConfig():
"url_matcher": self.url_matcher,
"match_mode": self.match_mode,
"experimental": self.experimental,
+ "max_retries": self.max_retries,
+ "fallback_proxy_configs": [p.to_dict() for p in self.fallback_proxy_configs] if self.fallback_proxy_configs else [],
}
def clone(self, **kwargs):
diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py
index 70473b04..b6e3fa0f 100644
--- a/crawl4ai/async_webcrawler.py
+++ b/crawl4ai/async_webcrawler.py
@@ -51,6 +51,7 @@ from .utils import (
compute_head_fingerprint,
)
from .cache_validator import CacheValidator, CacheValidationResult
+from .antibot_detector import is_blocked
class AsyncWebCrawler:
@@ -373,13 +374,9 @@ class AsyncWebCrawler:
# Fetch fresh content if needed
if not cached_result or not html:
- t1 = time.perf_counter()
+ from urllib.parse import urlparse
- if config.user_agent:
- self.crawler_strategy.update_user_agent(
- config.user_agent)
-
- # Check robots.txt if enabled
+ # Check robots.txt if enabled (once, before any attempts)
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
@@ -395,74 +392,191 @@ class AsyncWebCrawler:
},
)
- ##############################
- # Call CrawlerStrategy.crawl #
- ##############################
- async_response = await self.crawler_strategy.crawl(
- url,
- config=config, # Pass the entire config object
- )
+ # --- Anti-bot retry setup ---
+ _fallback_proxy = None
+ if (config.proxy_config
+ and getattr(config.proxy_config, "is_fallback", False)):
+ _fallback_proxy = config.proxy_config
+ config.proxy_config = None
- html = sanitize_input_encode(async_response.html)
- screenshot_data = async_response.screenshot
- pdf_data = async_response.pdf_data
- js_execution_result = async_response.js_execution_result
+ _max_attempts = 1 + getattr(config, "max_retries", 0)
+ _fallback_proxies = getattr(config, "fallback_proxy_configs", None) or []
+ _proxy_activated = False
+ _block_reason = ""
+ _done = False
+ crawl_result = None
- t2 = time.perf_counter()
- self.logger.url_status(
- url=cache_context.display_url,
- success=bool(html),
- timing=t2 - t1,
- tag="FETCH",
- )
+ for _attempt in range(_max_attempts):
+ if _done:
+ break
- ###############################################################
- # Process the HTML content, Call CrawlerStrategy.process_html #
- ###############################################################
- from urllib.parse import urlparse
- crawl_result: CrawlResult = await self.aprocess_html(
- url=url,
- html=html,
- extracted_content=extracted_content,
- config=config, # Pass the config object instead of individual parameters
- screenshot_data=screenshot_data,
- pdf_data=pdf_data,
- verbose=config.verbose,
- is_raw_html=True if url.startswith("raw:") else False,
- redirected_url=async_response.redirected_url,
- original_scheme=urlparse(url).scheme,
- **kwargs,
- )
+ if _attempt > 0:
+ self.logger.warning(
+ message="Anti-bot retry {attempt}/{max_retries} for {url} — {reason}",
+ tag="ANTIBOT",
+ params={
+ "attempt": _attempt,
+ "max_retries": config.max_retries,
+ "url": url[:80],
+ "reason": _block_reason,
+ },
+ )
+ # Activate is_fallback proxy on first retry
+ if _fallback_proxy and not _proxy_activated:
+ config.proxy_config = _fallback_proxy
+ _proxy_activated = True
+ self.logger.info(
+ message="Activating fallback proxy: {proxy}",
+ tag="ANTIBOT",
+ params={"proxy": _fallback_proxy.server},
+ )
- crawl_result.status_code = async_response.status_code
- # For raw: URLs, don't fall back to the raw HTML string as redirected_url
- is_raw_url = url.startswith("raw:") or url.startswith("raw://")
- crawl_result.redirected_url = async_response.redirected_url or (None if is_raw_url else url)
- crawl_result.redirected_status_code = async_response.redirected_status_code
- crawl_result.response_headers = async_response.response_headers
- crawl_result.downloaded_files = async_response.downloaded_files
- crawl_result.js_execution_result = js_execution_result
- crawl_result.mhtml = async_response.mhtml_data
- crawl_result.ssl_certificate = async_response.ssl_certificate
- # Add captured network and console data if available
- crawl_result.network_requests = async_response.network_requests
- crawl_result.console_messages = async_response.console_messages
+ # Build list of proxies to try this round:
+ # current config.proxy_config first, then each fallback proxy
+ _proxies_this_round = [config.proxy_config] # main (may be None)
+ _proxies_this_round.extend(_fallback_proxies)
- crawl_result.success = bool(html)
- crawl_result.session_id = getattr(
- config, "session_id", None)
- crawl_result.cache_status = "miss"
+ for _p_idx, _proxy in enumerate(_proxies_this_round):
+ _is_fallback_proxy = _p_idx > 0
+ if _is_fallback_proxy:
+ self.logger.info(
+ message="Trying fallback proxy {idx}/{total}: {proxy}",
+ tag="ANTIBOT",
+ params={
+ "idx": _p_idx,
+ "total": len(_fallback_proxies),
+ "proxy": _proxy.server,
+ },
+ )
+
+ # Temporarily set the proxy for this attempt
+ _saved_proxy = config.proxy_config
+ if _is_fallback_proxy:
+ config.proxy_config = _proxy
+
+ try:
+ t1 = time.perf_counter()
+
+ if config.user_agent:
+ self.crawler_strategy.update_user_agent(
+ config.user_agent)
+
+ async_response = await self.crawler_strategy.crawl(
+ url, config=config)
+
+ html = sanitize_input_encode(async_response.html)
+ screenshot_data = async_response.screenshot
+ pdf_data = async_response.pdf_data
+ js_execution_result = async_response.js_execution_result
+
+ self.logger.url_status(
+ url=cache_context.display_url,
+ success=bool(html),
+ timing=time.perf_counter() - t1,
+ tag="FETCH",
+ )
+
+ crawl_result = await self.aprocess_html(
+ url=url, html=html,
+ extracted_content=extracted_content,
+ config=config,
+ screenshot_data=screenshot_data,
+ pdf_data=pdf_data,
+ verbose=config.verbose,
+ is_raw_html=True if url.startswith("raw:") else False,
+ redirected_url=async_response.redirected_url,
+ original_scheme=urlparse(url).scheme,
+ **kwargs,
+ )
+
+ crawl_result.status_code = async_response.status_code
+ is_raw_url = url.startswith("raw:") or url.startswith("raw://")
+ crawl_result.redirected_url = async_response.redirected_url or (None if is_raw_url else url)
+ crawl_result.redirected_status_code = async_response.redirected_status_code
+ crawl_result.response_headers = async_response.response_headers
+ crawl_result.downloaded_files = async_response.downloaded_files
+ crawl_result.js_execution_result = js_execution_result
+ crawl_result.mhtml = async_response.mhtml_data
+ crawl_result.ssl_certificate = async_response.ssl_certificate
+ crawl_result.network_requests = async_response.network_requests
+ crawl_result.console_messages = async_response.console_messages
+ crawl_result.success = bool(html)
+ crawl_result.session_id = getattr(config, "session_id", None)
+ crawl_result.cache_status = "miss"
+
+ # Check if blocked
+ _blocked, _block_reason = is_blocked(
+ async_response.status_code, html)
+ if not _blocked:
+ _done = True
+ break # Success — exit proxy loop
+
+ except Exception as _crawl_err:
+ if _is_fallback_proxy:
+ self.logger.error_status(
+ url=url,
+ error=f"Fallback proxy {_proxy.server} failed: {_crawl_err}",
+ tag="ANTIBOT",
+ )
+ _block_reason = str(_crawl_err)
+ else:
+ raise # Let main proxy errors propagate normally
+ finally:
+ if _is_fallback_proxy:
+ config.proxy_config = _saved_proxy
+
+ # --- Restore stashed is_fallback proxy for config integrity ---
+ if _fallback_proxy and not _proxy_activated:
+ config.proxy_config = _fallback_proxy
+
+ # --- Fallback fetch function (last resort after all retries+proxies exhausted) ---
+ if (crawl_result
+ and getattr(config, "fallback_fetch_function", None)):
+ _blocked, _ = is_blocked(
+ crawl_result.status_code, crawl_result.html or "")
+ if _blocked:
+ self.logger.warning(
+ message="All retries exhausted, invoking fallback_fetch_function for {url}",
+ tag="ANTIBOT",
+ params={"url": url[:80]},
+ )
+ try:
+ _fallback_html = await config.fallback_fetch_function(url)
+ if _fallback_html:
+ crawl_result = await self.aprocess_html(
+ url=url,
+ html=sanitize_input_encode(_fallback_html),
+ extracted_content=extracted_content,
+ config=config,
+ screenshot_data=None,
+ pdf_data=None,
+ verbose=config.verbose,
+ is_raw_html=True,
+ redirected_url=url,
+ original_scheme=urlparse(url).scheme,
+ **kwargs,
+ )
+ crawl_result.success = True
+ crawl_result.status_code = 200
+ crawl_result.session_id = getattr(config, "session_id", None)
+ crawl_result.cache_status = "miss"
+ except Exception as _fallback_err:
+ self.logger.error_status(
+ url=url,
+ error=f"Fallback fetch failed: {_fallback_err}",
+ tag="ANTIBOT",
+ )
# Compute head fingerprint for cache validation
- if html:
- head_end = html.lower().find('')
+ if crawl_result and crawl_result.html:
+ head_end = crawl_result.html.lower().find('')
if head_end != -1:
- head_html = html[:head_end + 7]
+ head_html = crawl_result.html[:head_end + 7]
crawl_result.head_fingerprint = compute_head_fingerprint(head_html)
self.logger.url_status(
url=cache_context.display_url,
- success=crawl_result.success,
+ success=crawl_result.success if crawl_result else False,
timing=time.perf_counter() - start_time,
tag="COMPLETE",
)
diff --git a/docs/md_v2/advanced/anti-bot-and-fallback.md b/docs/md_v2/advanced/anti-bot-and-fallback.md
new file mode 100644
index 00000000..15e6f72a
--- /dev/null
+++ b/docs/md_v2/advanced/anti-bot-and-fallback.md
@@ -0,0 +1,235 @@
+# Anti-Bot Detection & Fallback
+
+When crawling sites protected by anti-bot systems (Akamai, Cloudflare, PerimeterX, DataDome, Imperva, etc.), requests often get blocked with CAPTCHAs, 403 responses, or empty pages. Crawl4AI provides a layered retry and fallback system that automatically detects blocking and escalates through multiple strategies until content is retrieved.
+
+## How Detection Works
+
+After each crawl attempt, Crawl4AI inspects the HTTP status code and HTML content for known anti-bot signals:
+
+- **HTTP 403/429** with short or empty response bodies
+- **Challenge pages** — Cloudflare "Just a moment", Akamai "Access Denied", PerimeterX block pages
+- **CAPTCHA injection** — reCAPTCHA, hCaptcha, or vendor-specific challenges on otherwise empty pages
+- **Firewall blocks** — Imperva/Incapsula resource iframes, Sucuri firewall pages, Cloudflare error codes
+
+Detection uses structural HTML markers (specific element IDs, script sources, form actions) rather than generic keywords to minimize false positives. A normal page that happens to mention "CAPTCHA" or "Cloudflare" in its content will not be flagged.
+
+## Configuration Options
+
+All anti-bot retry options live on `CrawlerRunConfig`:
+
+| Parameter | Type | Default | Description |
+|---|---|---|---|
+| `max_retries` | `int` | `0` | Number of retry rounds when blocking is detected. `0` = no retries. |
+| `fallback_proxy_configs` | `list[ProxyConfig]` | `[]` | List of fallback proxies tried in order within each retry round. |
+| `fallback_fetch_function` | `async (str) -> str` | `None` | Async function called as last resort. Takes URL, returns raw HTML. |
+
+And on `ProxyConfig`:
+
+| Parameter | Type | Default | Description |
+|---|---|---|---|
+| `is_fallback` | `bool` | `False` | When `True`, this proxy is skipped on the first attempt and only activated after blocking is detected. |
+
+## Escalation Chain
+
+Each retry round tries the main proxy first, then each fallback proxy in order. If all rounds are exhausted and the page is still blocked, the fallback fetch function is called as a last resort.
+
+```
+For each round (1 + max_retries rounds):
+ 1. Try with main proxy_config (or no proxy if is_fallback=True on first round)
+ 2. If blocked → try fallback_proxy_configs[0]
+ 3. If blocked → try fallback_proxy_configs[1]
+ 4. ... continue through all fallback proxies
+ 5. If any attempt succeeds → done
+
+If all rounds exhausted and still blocked:
+ 6. Call fallback_fetch_function(url) → process returned HTML
+```
+
+Worst-case attempts before the fetch function: `(1 + max_retries) x (1 + len(fallback_proxy_configs))`
+
+## Usage Examples
+
+### Simple Retry (No Proxy)
+
+Retry the crawl up to 3 times when blocking is detected. Useful when blocks are intermittent or IP-based.
+
+```python
+from crawl4ai import AsyncWebCrawler
+from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
+
+async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
+ result = await crawler.arun(
+ url="https://example.com",
+ config=CrawlerRunConfig(max_retries=3),
+ )
+```
+
+### Proxy as Fallback Only
+
+Use `is_fallback=True` to skip the proxy on the first attempt. If the site doesn't block you, no proxy credits are consumed. If it does, the proxy activates on retry.
+
+```python
+from crawl4ai.async_configs import ProxyConfig
+
+config = CrawlerRunConfig(
+ max_retries=2,
+ proxy_config=ProxyConfig(
+ server="http://proxy.example.com:8080",
+ username="user",
+ password="pass",
+ is_fallback=True, # Only used when blocking is detected
+ ),
+)
+```
+
+### Fallback Proxy List
+
+Try a cheaper proxy first, escalate to a premium proxy if it fails. Both are tried within each retry round.
+
+```python
+config = CrawlerRunConfig(
+ max_retries=2,
+ proxy_config=ProxyConfig(
+ server="http://datacenter-proxy.example.com:8080",
+ username="user",
+ password="pass",
+ ),
+ fallback_proxy_configs=[
+ ProxyConfig(
+ server="http://residential-proxy.example.com:9090",
+ username="user",
+ password="pass",
+ ),
+ ],
+)
+```
+
+With this setup, each round tries the datacenter proxy first, then the residential proxy. With `max_retries=2`, worst case is 3 rounds x 2 proxies = 6 attempts.
+
+### Fallback Fetch Function
+
+When all browser-based attempts fail, call a custom async function as a last resort. This function receives the URL and must return raw HTML as a string. The returned HTML is processed through the normal pipeline (markdown generation, extraction, etc.).
+
+This is useful when you have access to a scraping API, a pre-fetched cache, or any other source of HTML.
+
+```python
+import aiohttp
+
+async def my_scraping_api(url: str) -> str:
+ """Fetch HTML via an external scraping API."""
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ "https://api.my-scraping-service.com/fetch",
+ params={"url": url, "format": "html"},
+ headers={"Authorization": "Bearer MY_TOKEN"},
+ ) as resp:
+ if resp.status == 200:
+ return await resp.text()
+ raise RuntimeError(f"API error: {resp.status}")
+
+config = CrawlerRunConfig(
+ max_retries=1,
+ fallback_fetch_function=my_scraping_api,
+)
+```
+
+The function can do anything — call an API, read from a database, return cached HTML, or make a simple HTTP request with a different library. Crawl4AI does not care how the HTML is obtained.
+
+### Full Escalation (All Features Combined)
+
+This example combines every layer: stealth mode, a fallback proxy that only activates when blocked, a list of escalation proxies tried each round, retries, and a final fetch function.
+
+```python
+import aiohttp
+from crawl4ai import AsyncWebCrawler
+from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig
+
+# Last-resort: fetch HTML via an external service
+async def external_fetch(url: str) -> str:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.my-service.com/scrape",
+ json={"url": url, "render_js": True},
+ headers={"Authorization": "Bearer MY_TOKEN"},
+ ) as resp:
+ return await resp.text()
+
+browser_config = BrowserConfig(
+ headless=True,
+ enable_stealth=True,
+)
+
+crawl_config = CrawlerRunConfig(
+ magic=True,
+ wait_until="load",
+ max_retries=2,
+
+ # Primary proxy — is_fallback=True means first attempt runs without it
+ proxy_config=ProxyConfig(
+ server="http://datacenter-proxy.example.com:8080",
+ username="user",
+ password="pass",
+ is_fallback=True,
+ ),
+
+ # Fallback proxies — tried in order after main proxy fails each round
+ fallback_proxy_configs=[
+ ProxyConfig(
+ server="http://residential-proxy.example.com:9090",
+ username="user",
+ password="pass",
+ ),
+ ],
+
+ # Last resort — called after all retries and proxies are exhausted
+ fallback_fetch_function=external_fetch,
+)
+
+async with AsyncWebCrawler(config=browser_config) as crawler:
+ result = await crawler.arun(
+ url="https://protected-site.com/products",
+ config=crawl_config,
+ )
+
+ if result.success:
+ print(f"Got {len(result.markdown.raw_markdown)} chars of markdown")
+ else:
+ print(f"All attempts failed: {result.error_message}")
+```
+
+**What happens step by step:**
+
+| Round | Attempt | What runs |
+|---|---|---|
+| 1 | 1 | No proxy (is_fallback skips it) — blocked |
+| 1 | 2 | Residential fallback proxy — blocked (bad IP) |
+| 2 | 1 | Datacenter proxy activated — blocked |
+| 2 | 2 | Residential fallback proxy — blocked |
+| 3 | 1 | Datacenter proxy — blocked |
+| 3 | 2 | Residential fallback proxy — blocked |
+| - | - | `external_fetch(url)` called — returns HTML |
+
+That's up to 6 browser attempts + 1 function call before giving up.
+
+## Tips
+
+- **Start with `max_retries=0`** and a `fallback_fetch_function` if you just want a safety net without burning time on retries.
+- **Use `is_fallback=True`** on your proxy to avoid consuming proxy credits on sites that don't need them.
+- **Order fallback proxies cheapest-first** — datacenter proxies before residential, residential before premium.
+- **Combine with stealth mode** — `BrowserConfig(enable_stealth=True)` and `CrawlerRunConfig(magic=True)` reduce the chance of being blocked in the first place.
+- **`wait_until="load"`** is important for anti-bot sites — the default `domcontentloaded` can return before the anti-bot sensor finishes.
+- **You don't need a primary proxy to use fallback proxies.** If you skip `proxy_config` and only pass `fallback_proxy_configs`, the first attempt each round runs with no proxy. This is useful when you want to try direct access first and only escalate to proxies if blocked:
+ ```python
+ config = CrawlerRunConfig(
+ max_retries=1,
+ fallback_proxy_configs=[proxy_A, proxy_B],
+ )
+ # Round 1: no proxy → proxy_A → proxy_B
+ # Round 2: no proxy → proxy_A → proxy_B
+ ```
+
+## See Also
+
+- [Proxy & Security](proxy-security.md) — Proxy setup, authentication, and rotation
+- [Undetected Browser](undetected-browser.md) — Stealth mode and browser fingerprint evasion
+- [Session Management](session-management.md) — Maintaining sessions across requests
diff --git a/docs/md_v2/advanced/proxy-security.md b/docs/md_v2/advanced/proxy-security.md
index d14e59ff..84a26aca 100644
--- a/docs/md_v2/advanced/proxy-security.md
+++ b/docs/md_v2/advanced/proxy-security.md
@@ -302,3 +302,7 @@ def safe_proxy_repr(proxy: ProxyConfig):
- Ensure `ProxyConfig.from_env()` actually loaded entries (`len(proxies) > 0`).
- Attach `proxy_rotation_strategy` to `CrawlerRunConfig`.
- Validate the proxy definitions you pass into the strategy.
+
+## See Also
+
+- [Anti-Bot Detection & Fallback](anti-bot-and-fallback.md) — Automatic retry with proxy escalation and fallback functions when anti-bot blocking is detected
diff --git a/docs/md_v2/advanced/undetected-browser.md b/docs/md_v2/advanced/undetected-browser.md
index 310701e6..32b73fc5 100644
--- a/docs/md_v2/advanced/undetected-browser.md
+++ b/docs/md_v2/advanced/undetected-browser.md
@@ -391,4 +391,5 @@ Remember:
- [Advanced Features](advanced-features.md) - Overview of all advanced features
- [Proxy & Security](proxy-security.md) - Using proxies with anti-bot features
- [Session Management](session-management.md) - Maintaining sessions across requests
-- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies
\ No newline at end of file
+- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies
+- [Anti-Bot Detection & Fallback](anti-bot-and-fallback.md) - Automatic retry and proxy escalation when blocking is detected
\ No newline at end of file
diff --git a/docs/md_v2/api/parameters.md b/docs/md_v2/api/parameters.md
index 06a1e751..ef7f16af 100644
--- a/docs/md_v2/api/parameters.md
+++ b/docs/md_v2/api/parameters.md
@@ -109,8 +109,11 @@ We group them by category.
| **`timezone_id`** | `str or None` (None) | Browser's timezone (e.g., "America/New_York", "Europe/Paris"). |
| **`geolocation`** | `GeolocationConfig or None` (None) | GPS coordinates configuration. Use `GeolocationConfig(latitude=..., longitude=..., accuracy=...)`. |
| **`fetch_ssl_certificate`** | `bool` (False) | If `True`, fetches and includes SSL certificate information in the result. |
-| **`proxy_config`** | `ProxyConfig or dict or None` (None) | Proxy configuration for this specific crawl. Can override browser-level proxy settings. |
+| **`proxy_config`** | `ProxyConfig or dict or None` (None) | Proxy configuration for this specific crawl. Can override browser-level proxy settings. Set `is_fallback=True` on the ProxyConfig to only use the proxy when anti-bot blocking is detected. |
| **`proxy_rotation_strategy`** | `ProxyRotationStrategy` (None) | Strategy for rotating proxies during crawl operations. |
+| **`max_retries`** | `int` (0) | Number of retry rounds when anti-bot blocking is detected. Each round tries the main proxy and all fallback proxies. |
+| **`fallback_proxy_configs`** | `list[ProxyConfig]` ([]) | List of fallback proxies tried in order within each retry round after the main proxy fails. |
+| **`fallback_fetch_function`**| `async (str) -> str or None` (None) | Async function called as last resort after all retries are exhausted. Takes URL, returns raw HTML. See [Anti-Bot & Fallback](../advanced/anti-bot-and-fallback.md). |
---
diff --git a/docs/md_v2/core/browser-crawler-config.md b/docs/md_v2/core/browser-crawler-config.md
index 4b35ee7c..e839ecc8 100644
--- a/docs/md_v2/core/browser-crawler-config.md
+++ b/docs/md_v2/core/browser-crawler-config.md
@@ -275,17 +275,22 @@ class CrawlerRunConfig:
- **`geolocation`**: GPS coordinates via `GeolocationConfig(latitude=48.8566, longitude=2.3522)`
- See [Identity Based Crawling](../advanced/identity-based-crawling.md#7-locale-timezone-and-geolocation-control)
-10.⠀**Proxy Configuration**:
- - **`proxy_config`**: Proxy server configuration (ProxyConfig object or dict) e.g. {"server": "...", "username": "...", "password"}
+10.⠀**Proxy Configuration**:
+ - **`proxy_config`**: Proxy server configuration (ProxyConfig object or dict) e.g. {"server": "...", "username": "...", "password"}. Set `is_fallback=True` to only use the proxy when anti-bot blocking is detected.
- **`proxy_rotation_strategy`**: Strategy for rotating proxies during crawls
-11.⠀**Page Interaction Parameters**:
+11.⠀**Anti-Bot Retry & Fallback** (see [Anti-Bot & Fallback](../advanced/anti-bot-and-fallback.md)):
+ - **`max_retries`**: Number of retry rounds when blocking is detected (default: 0)
+ - **`fallback_proxy_configs`**: List of fallback proxies tried in order within each retry round
+ - **`fallback_fetch_function`**: Async function called as last resort — takes URL, returns raw HTML
+
+12.⠀**Page Interaction Parameters**:
- **`scan_full_page`**: If `True`, scroll through the entire page to load all content
- **`wait_until`**: Condition to wait for when navigating (e.g., "domcontentloaded", "networkidle")
- **`page_timeout`**: Timeout in milliseconds for page operations (default: 60000)
- **`delay_before_return_html`**: Delay in seconds before retrieving final HTML.
-12.⠀**`url_matcher`** & **`match_mode`**:
+13.⠀**`url_matcher`** & **`match_mode`**:
- Enable URL-specific configurations when used with `arun_many()`.
- Set `url_matcher` to patterns (glob, function, or list) to match specific URLs.
- Use `match_mode` (OR/AND) to control how multiple patterns combine.
diff --git a/mkdocs.yml b/mkdocs.yml
index baa88a61..c49f2dd6 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -47,6 +47,7 @@ nav:
- "Lazy Loading": "advanced/lazy-loading.md"
- "Hooks & Auth": "advanced/hooks-auth.md"
- "Proxy & Security": "advanced/proxy-security.md"
+ - "Anti-Bot & Fallback": "advanced/anti-bot-and-fallback.md"
- "Undetected Browser": "advanced/undetected-browser.md"
- "Session Management": "advanced/session-management.md"
- "Multi-URL Crawling": "advanced/multi-url-crawling.md"
diff --git a/tests/proxy/test_antibot_detector.py b/tests/proxy/test_antibot_detector.py
new file mode 100644
index 00000000..94fe992f
--- /dev/null
+++ b/tests/proxy/test_antibot_detector.py
@@ -0,0 +1,315 @@
+"""
+Unit tests for antibot_detector.is_blocked().
+
+Tests are organized into:
+ - TRUE POSITIVES: Real block pages that MUST be detected
+ - TRUE NEGATIVES: Legitimate pages that MUST NOT be flagged
+ - EDGE CASES: Boundary conditions
+"""
+
+import sys, os
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
+
+from crawl4ai.antibot_detector import is_blocked
+
+PASS = 0
+FAIL = 0
+
+def check(name, result, expected_blocked, expected_substr=None):
+ global PASS, FAIL
+ blocked, reason = result
+ ok = blocked == expected_blocked
+ if expected_substr and blocked:
+ ok = ok and expected_substr.lower() in reason.lower()
+ status = "PASS" if ok else "FAIL"
+ if not ok:
+ FAIL += 1
+ print(f" {status}: {name}")
+ print(f" got blocked={blocked}, reason={reason!r}")
+ print(f" expected blocked={expected_blocked}" +
+ (f", substr={expected_substr!r}" if expected_substr else ""))
+ else:
+ PASS += 1
+ if blocked:
+ print(f" {status}: {name} -> {reason}")
+ else:
+ print(f" {status}: {name} -> not blocked")
+
+
+# =========================================================================
+# TRUE POSITIVES — real block pages that MUST be detected
+# =========================================================================
+print("\n=== TRUE POSITIVES (must detect as blocked) ===\n")
+
+# --- Akamai ---
+check("Akamai Reference #",
+ is_blocked(403, 'Access Denied\nYour request was blocked.\nReference #18.2d351ab8.1557333295.a4e16ab'),
+ True, "Akamai")
+
+check("Akamai Pardon Our Interruption",
+ is_blocked(403, 'Pardon Our Interruption
Anti-bot solutions like DataDome, PerimeterX, and Cloudflare ' +
+ 'help detect and block bot traffic. When a bot is detected, ' +
+ 'services show a CAPTCHA or Access Denied page. ' +
+ 'Common signals include blocked by security warnings.
' +
+ '
The g-recaptcha and h-captcha widgets are used for challenges.