Add anti-bot detection, retry, and fallback system
Automatically detect when crawls are blocked by anti-bot systems (Akamai, Cloudflare, PerimeterX, DataDome, Imperva, etc.) and escalate through configurable retry and fallback strategies. New features on CrawlerRunConfig: - max_retries: retry rounds when blocking is detected - fallback_proxy_configs: list of fallback proxies tried each round - fallback_fetch_function: async last-resort function returning raw HTML New field on ProxyConfig: - is_fallback: skip proxy on first attempt, activate only when blocked Escalation chain per round: main proxy → fallback proxies in order. After all rounds: fallback_fetch_function as last resort. Detection uses tiered heuristics — structural HTML markers (high confidence) trigger on any page, generic patterns only on short error pages to avoid false positives.
This commit is contained in:
164
crawl4ai/antibot_detector.py
Normal file
164
crawl4ai/antibot_detector.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""
|
||||
Anti-bot detection heuristics for crawl results.
|
||||
|
||||
Examines HTTP status codes and HTML content patterns to determine
|
||||
if a crawl was blocked by anti-bot protection.
|
||||
|
||||
Detection is layered: high-confidence structural markers trigger alone,
|
||||
while generic patterns require corroborating signals (status code + short page)
|
||||
to avoid false positives.
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tier 1: High-confidence structural markers (single signal sufficient)
|
||||
# These are unique to block pages and virtually never appear in real content.
|
||||
# ---------------------------------------------------------------------------
|
||||
_TIER1_PATTERNS = [
|
||||
# Akamai — full reference pattern: Reference #18.2d351ab8.1557333295.a4e16ab
|
||||
(re.compile(r"Reference\s*#\s*[\d]+\.[0-9a-f]+\.\d+\.[0-9a-f]+", re.IGNORECASE),
|
||||
"Akamai block (Reference #)"),
|
||||
# Akamai — "Pardon Our Interruption" challenge page
|
||||
(re.compile(r"Pardon\s+Our\s+Interruption", re.IGNORECASE),
|
||||
"Akamai challenge (Pardon Our Interruption)"),
|
||||
# Cloudflare — challenge form with anti-bot token
|
||||
(re.compile(r'challenge-form.*?__cf_chl_f_tk=', re.IGNORECASE | re.DOTALL),
|
||||
"Cloudflare challenge form"),
|
||||
# Cloudflare — error code spans (1020 Access Denied, 1010, 1012, 1015)
|
||||
(re.compile(r'<span\s+class="cf-error-code">\d{4}</span>', re.IGNORECASE),
|
||||
"Cloudflare firewall block"),
|
||||
# Cloudflare — IUAM challenge script
|
||||
(re.compile(r'/cdn-cgi/challenge-platform/\S+orchestrate', re.IGNORECASE),
|
||||
"Cloudflare JS challenge"),
|
||||
# PerimeterX / HUMAN — block page with app ID assignment (not prose mentions)
|
||||
(re.compile(r"window\._pxAppId\s*=", re.IGNORECASE),
|
||||
"PerimeterX block"),
|
||||
# PerimeterX — captcha CDN
|
||||
(re.compile(r"captcha\.px-cdn\.net", re.IGNORECASE),
|
||||
"PerimeterX captcha"),
|
||||
# DataDome — captcha delivery domain (structural, not the word "datadome")
|
||||
(re.compile(r"captcha-delivery\.com", re.IGNORECASE),
|
||||
"DataDome captcha"),
|
||||
# Imperva/Incapsula — resource iframe
|
||||
(re.compile(r"_Incapsula_Resource", re.IGNORECASE),
|
||||
"Imperva/Incapsula block"),
|
||||
# Imperva/Incapsula — incident ID
|
||||
(re.compile(r"Incapsula\s+incident\s+ID", re.IGNORECASE),
|
||||
"Imperva/Incapsula incident"),
|
||||
# Sucuri firewall
|
||||
(re.compile(r"Sucuri\s+WebSite\s+Firewall", re.IGNORECASE),
|
||||
"Sucuri firewall block"),
|
||||
# Kasada
|
||||
(re.compile(r"KPSDK\.scriptStart\s*=\s*KPSDK\.now\(\)", re.IGNORECASE),
|
||||
"Kasada challenge"),
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tier 2: Medium-confidence patterns — only match on SHORT pages (< 10KB)
|
||||
# These terms appear in real content (articles, login forms, security blogs)
|
||||
# so we require the page to be small to avoid false positives.
|
||||
# ---------------------------------------------------------------------------
|
||||
_TIER2_PATTERNS = [
|
||||
# Akamai / generic — "Access Denied" (extremely common on legit 403s too)
|
||||
(re.compile(r"Access\s+Denied", re.IGNORECASE),
|
||||
"Access Denied on short page"),
|
||||
# Cloudflare — "Just a moment" / "Checking your browser"
|
||||
(re.compile(r"Checking\s+your\s+browser", re.IGNORECASE),
|
||||
"Cloudflare browser check"),
|
||||
(re.compile(r"<title>\s*Just\s+a\s+moment", re.IGNORECASE),
|
||||
"Cloudflare interstitial"),
|
||||
# CAPTCHA on a block page (not a login form — login forms are big pages)
|
||||
(re.compile(r'class=["\']g-recaptcha["\']', re.IGNORECASE),
|
||||
"reCAPTCHA on block page"),
|
||||
(re.compile(r'class=["\']h-captcha["\']', re.IGNORECASE),
|
||||
"hCaptcha on block page"),
|
||||
# PerimeterX block page title
|
||||
(re.compile(r"Access\s+to\s+This\s+Page\s+Has\s+Been\s+Blocked", re.IGNORECASE),
|
||||
"PerimeterX block page"),
|
||||
# Generic block phrases (only on short pages to avoid matching articles)
|
||||
(re.compile(r"blocked\s+by\s+security", re.IGNORECASE),
|
||||
"Blocked by security"),
|
||||
(re.compile(r"Request\s+unsuccessful", re.IGNORECASE),
|
||||
"Request unsuccessful (Imperva)"),
|
||||
]
|
||||
|
||||
_TIER2_MAX_SIZE = 10000 # Only check tier 2 patterns on pages under 10KB
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thresholds
|
||||
# ---------------------------------------------------------------------------
|
||||
_BLOCK_PAGE_MAX_SIZE = 5000 # 403 + short page = likely block
|
||||
_EMPTY_CONTENT_THRESHOLD = 100 # 200 + near-empty = JS-blocked render
|
||||
|
||||
|
||||
def _looks_like_data(html: str) -> bool:
|
||||
"""Check if content looks like a JSON/XML API response (not an HTML block page)."""
|
||||
stripped = html.strip()
|
||||
if not stripped:
|
||||
return False
|
||||
return stripped[0] in ('{', '[', '<' ) and not stripped.startswith('<html') and not stripped.startswith('<!') and not stripped.startswith('<HTML')
|
||||
|
||||
|
||||
def is_blocked(
|
||||
status_code: Optional[int],
|
||||
html: str,
|
||||
error_message: Optional[str] = None,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Detect if a crawl result indicates anti-bot blocking.
|
||||
|
||||
Uses layered detection to maximize coverage while minimizing false positives:
|
||||
- Tier 1 patterns (structural markers) trigger on any page size
|
||||
- Tier 2 patterns (generic terms) only trigger on short pages (< 10KB)
|
||||
- Status-code checks require corroborating content signals
|
||||
|
||||
Args:
|
||||
status_code: HTTP status code from the response.
|
||||
html: Raw HTML content from the response.
|
||||
error_message: Error message from the crawl result, if any.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_blocked, reason). reason is empty string when not blocked.
|
||||
"""
|
||||
html = html or ""
|
||||
html_len = len(html)
|
||||
|
||||
# --- HTTP 429 is always rate limiting ---
|
||||
if status_code == 429:
|
||||
return True, "HTTP 429 Too Many Requests"
|
||||
|
||||
# --- Check first 15KB for tier 1 patterns (high confidence, any page size) ---
|
||||
snippet = html[:15000]
|
||||
if snippet:
|
||||
for pattern, reason in _TIER1_PATTERNS:
|
||||
if pattern.search(snippet):
|
||||
return True, reason
|
||||
|
||||
# --- HTTP 403 + short page (no tier 1 match = check tier 2) ---
|
||||
if status_code == 403 and html_len < _BLOCK_PAGE_MAX_SIZE:
|
||||
# Skip JSON/XML API responses — short 403 from APIs are legit auth errors
|
||||
if not _looks_like_data(html):
|
||||
# Short 403 with almost no content is very likely a block
|
||||
if html_len < _EMPTY_CONTENT_THRESHOLD:
|
||||
return True, f"HTTP 403 with near-empty response ({html_len} bytes)"
|
||||
# Check tier 2 patterns on 403 short pages
|
||||
for pattern, reason in _TIER2_PATTERNS:
|
||||
if pattern.search(snippet):
|
||||
return True, f"{reason} (HTTP 403, {html_len} bytes)"
|
||||
|
||||
# --- Tier 2 patterns on any error status + short page ---
|
||||
if status_code and status_code >= 400 and html_len < _TIER2_MAX_SIZE:
|
||||
for pattern, reason in _TIER2_PATTERNS:
|
||||
if pattern.search(snippet):
|
||||
return True, f"{reason} (HTTP {status_code}, {html_len} bytes)"
|
||||
|
||||
# --- HTTP 200 + near-empty content (JS-rendered empty page) ---
|
||||
if status_code == 200:
|
||||
stripped = html.strip()
|
||||
if len(stripped) < _EMPTY_CONTENT_THRESHOLD and not _looks_like_data(html):
|
||||
return True, f"Near-empty content ({len(stripped)} bytes) with HTTP 200"
|
||||
|
||||
return False, ""
|
||||
@@ -30,7 +30,7 @@ from .cache_context import CacheMode
|
||||
from .proxy_strategy import ProxyRotationStrategy
|
||||
|
||||
import inspect
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union
|
||||
from enum import Enum
|
||||
|
||||
# Type alias for URL matching
|
||||
@@ -353,19 +353,23 @@ class ProxyConfig:
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
ip: Optional[str] = None,
|
||||
is_fallback: bool = False,
|
||||
):
|
||||
"""Configuration class for a single proxy.
|
||||
|
||||
|
||||
Args:
|
||||
server: Proxy server URL (e.g., "http://127.0.0.1:8080")
|
||||
username: Optional username for proxy authentication
|
||||
password: Optional password for proxy authentication
|
||||
ip: Optional IP address for verification purposes
|
||||
is_fallback: If True, proxy is only used when anti-bot blocking is
|
||||
detected. If False (default), proxy is used on every request.
|
||||
"""
|
||||
self.server = server
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
self.is_fallback = is_fallback
|
||||
|
||||
# Extract IP from server if not explicitly provided
|
||||
self.ip = ip or self._extract_ip_from_server()
|
||||
|
||||
@@ -425,7 +429,8 @@ class ProxyConfig:
|
||||
server=proxy_dict.get("server"),
|
||||
username=proxy_dict.get("username"),
|
||||
password=proxy_dict.get("password"),
|
||||
ip=proxy_dict.get("ip")
|
||||
ip=proxy_dict.get("ip"),
|
||||
is_fallback=proxy_dict.get("is_fallback", False),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -455,7 +460,8 @@ class ProxyConfig:
|
||||
"server": self.server,
|
||||
"username": self.username,
|
||||
"password": self.password,
|
||||
"ip": self.ip
|
||||
"ip": self.ip,
|
||||
"is_fallback": self.is_fallback,
|
||||
}
|
||||
|
||||
def clone(self, **kwargs) -> "ProxyConfig":
|
||||
@@ -1470,6 +1476,10 @@ class CrawlerRunConfig():
|
||||
match_mode: MatchMode = MatchMode.OR,
|
||||
# Experimental Parameters
|
||||
experimental: Dict[str, Any] = None,
|
||||
# Anti-Bot Retry Parameters
|
||||
max_retries: int = 0,
|
||||
fallback_proxy_configs: Optional[List["ProxyConfig"]] = None,
|
||||
fallback_fetch_function: Optional[Callable[[str], Awaitable[str]]] = None,
|
||||
):
|
||||
# TODO: Planning to set properties dynamically based on the __init__ signature
|
||||
self.url = url
|
||||
@@ -1652,7 +1662,12 @@ class CrawlerRunConfig():
|
||||
|
||||
# Experimental Parameters
|
||||
self.experimental = experimental or {}
|
||||
|
||||
|
||||
# Anti-Bot Retry Parameters
|
||||
self.max_retries = max_retries
|
||||
self.fallback_proxy_configs = fallback_proxy_configs or []
|
||||
self.fallback_fetch_function = fallback_fetch_function
|
||||
|
||||
# Compile C4A scripts if provided
|
||||
if self.c4a_script and not self.js_code:
|
||||
self._compile_c4a_script()
|
||||
@@ -1887,6 +1902,8 @@ class CrawlerRunConfig():
|
||||
"url_matcher": self.url_matcher,
|
||||
"match_mode": self.match_mode,
|
||||
"experimental": self.experimental,
|
||||
"max_retries": self.max_retries,
|
||||
"fallback_proxy_configs": [p.to_dict() for p in self.fallback_proxy_configs] if self.fallback_proxy_configs else [],
|
||||
}
|
||||
|
||||
def clone(self, **kwargs):
|
||||
|
||||
@@ -51,6 +51,7 @@ from .utils import (
|
||||
compute_head_fingerprint,
|
||||
)
|
||||
from .cache_validator import CacheValidator, CacheValidationResult
|
||||
from .antibot_detector import is_blocked
|
||||
|
||||
|
||||
class AsyncWebCrawler:
|
||||
@@ -373,13 +374,9 @@ class AsyncWebCrawler:
|
||||
|
||||
# Fetch fresh content if needed
|
||||
if not cached_result or not html:
|
||||
t1 = time.perf_counter()
|
||||
from urllib.parse import urlparse
|
||||
|
||||
if config.user_agent:
|
||||
self.crawler_strategy.update_user_agent(
|
||||
config.user_agent)
|
||||
|
||||
# Check robots.txt if enabled
|
||||
# Check robots.txt if enabled (once, before any attempts)
|
||||
if config and config.check_robots_txt:
|
||||
if not await self.robots_parser.can_fetch(
|
||||
url, self.browser_config.user_agent
|
||||
@@ -395,74 +392,191 @@ class AsyncWebCrawler:
|
||||
},
|
||||
)
|
||||
|
||||
##############################
|
||||
# Call CrawlerStrategy.crawl #
|
||||
##############################
|
||||
async_response = await self.crawler_strategy.crawl(
|
||||
url,
|
||||
config=config, # Pass the entire config object
|
||||
)
|
||||
# --- Anti-bot retry setup ---
|
||||
_fallback_proxy = None
|
||||
if (config.proxy_config
|
||||
and getattr(config.proxy_config, "is_fallback", False)):
|
||||
_fallback_proxy = config.proxy_config
|
||||
config.proxy_config = None
|
||||
|
||||
html = sanitize_input_encode(async_response.html)
|
||||
screenshot_data = async_response.screenshot
|
||||
pdf_data = async_response.pdf_data
|
||||
js_execution_result = async_response.js_execution_result
|
||||
_max_attempts = 1 + getattr(config, "max_retries", 0)
|
||||
_fallback_proxies = getattr(config, "fallback_proxy_configs", None) or []
|
||||
_proxy_activated = False
|
||||
_block_reason = ""
|
||||
_done = False
|
||||
crawl_result = None
|
||||
|
||||
t2 = time.perf_counter()
|
||||
self.logger.url_status(
|
||||
url=cache_context.display_url,
|
||||
success=bool(html),
|
||||
timing=t2 - t1,
|
||||
tag="FETCH",
|
||||
)
|
||||
for _attempt in range(_max_attempts):
|
||||
if _done:
|
||||
break
|
||||
|
||||
###############################################################
|
||||
# Process the HTML content, Call CrawlerStrategy.process_html #
|
||||
###############################################################
|
||||
from urllib.parse import urlparse
|
||||
crawl_result: CrawlResult = await self.aprocess_html(
|
||||
url=url,
|
||||
html=html,
|
||||
extracted_content=extracted_content,
|
||||
config=config, # Pass the config object instead of individual parameters
|
||||
screenshot_data=screenshot_data,
|
||||
pdf_data=pdf_data,
|
||||
verbose=config.verbose,
|
||||
is_raw_html=True if url.startswith("raw:") else False,
|
||||
redirected_url=async_response.redirected_url,
|
||||
original_scheme=urlparse(url).scheme,
|
||||
**kwargs,
|
||||
)
|
||||
if _attempt > 0:
|
||||
self.logger.warning(
|
||||
message="Anti-bot retry {attempt}/{max_retries} for {url} — {reason}",
|
||||
tag="ANTIBOT",
|
||||
params={
|
||||
"attempt": _attempt,
|
||||
"max_retries": config.max_retries,
|
||||
"url": url[:80],
|
||||
"reason": _block_reason,
|
||||
},
|
||||
)
|
||||
# Activate is_fallback proxy on first retry
|
||||
if _fallback_proxy and not _proxy_activated:
|
||||
config.proxy_config = _fallback_proxy
|
||||
_proxy_activated = True
|
||||
self.logger.info(
|
||||
message="Activating fallback proxy: {proxy}",
|
||||
tag="ANTIBOT",
|
||||
params={"proxy": _fallback_proxy.server},
|
||||
)
|
||||
|
||||
crawl_result.status_code = async_response.status_code
|
||||
# For raw: URLs, don't fall back to the raw HTML string as redirected_url
|
||||
is_raw_url = url.startswith("raw:") or url.startswith("raw://")
|
||||
crawl_result.redirected_url = async_response.redirected_url or (None if is_raw_url else url)
|
||||
crawl_result.redirected_status_code = async_response.redirected_status_code
|
||||
crawl_result.response_headers = async_response.response_headers
|
||||
crawl_result.downloaded_files = async_response.downloaded_files
|
||||
crawl_result.js_execution_result = js_execution_result
|
||||
crawl_result.mhtml = async_response.mhtml_data
|
||||
crawl_result.ssl_certificate = async_response.ssl_certificate
|
||||
# Add captured network and console data if available
|
||||
crawl_result.network_requests = async_response.network_requests
|
||||
crawl_result.console_messages = async_response.console_messages
|
||||
# Build list of proxies to try this round:
|
||||
# current config.proxy_config first, then each fallback proxy
|
||||
_proxies_this_round = [config.proxy_config] # main (may be None)
|
||||
_proxies_this_round.extend(_fallback_proxies)
|
||||
|
||||
crawl_result.success = bool(html)
|
||||
crawl_result.session_id = getattr(
|
||||
config, "session_id", None)
|
||||
crawl_result.cache_status = "miss"
|
||||
for _p_idx, _proxy in enumerate(_proxies_this_round):
|
||||
_is_fallback_proxy = _p_idx > 0
|
||||
if _is_fallback_proxy:
|
||||
self.logger.info(
|
||||
message="Trying fallback proxy {idx}/{total}: {proxy}",
|
||||
tag="ANTIBOT",
|
||||
params={
|
||||
"idx": _p_idx,
|
||||
"total": len(_fallback_proxies),
|
||||
"proxy": _proxy.server,
|
||||
},
|
||||
)
|
||||
|
||||
# Temporarily set the proxy for this attempt
|
||||
_saved_proxy = config.proxy_config
|
||||
if _is_fallback_proxy:
|
||||
config.proxy_config = _proxy
|
||||
|
||||
try:
|
||||
t1 = time.perf_counter()
|
||||
|
||||
if config.user_agent:
|
||||
self.crawler_strategy.update_user_agent(
|
||||
config.user_agent)
|
||||
|
||||
async_response = await self.crawler_strategy.crawl(
|
||||
url, config=config)
|
||||
|
||||
html = sanitize_input_encode(async_response.html)
|
||||
screenshot_data = async_response.screenshot
|
||||
pdf_data = async_response.pdf_data
|
||||
js_execution_result = async_response.js_execution_result
|
||||
|
||||
self.logger.url_status(
|
||||
url=cache_context.display_url,
|
||||
success=bool(html),
|
||||
timing=time.perf_counter() - t1,
|
||||
tag="FETCH",
|
||||
)
|
||||
|
||||
crawl_result = await self.aprocess_html(
|
||||
url=url, html=html,
|
||||
extracted_content=extracted_content,
|
||||
config=config,
|
||||
screenshot_data=screenshot_data,
|
||||
pdf_data=pdf_data,
|
||||
verbose=config.verbose,
|
||||
is_raw_html=True if url.startswith("raw:") else False,
|
||||
redirected_url=async_response.redirected_url,
|
||||
original_scheme=urlparse(url).scheme,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
crawl_result.status_code = async_response.status_code
|
||||
is_raw_url = url.startswith("raw:") or url.startswith("raw://")
|
||||
crawl_result.redirected_url = async_response.redirected_url or (None if is_raw_url else url)
|
||||
crawl_result.redirected_status_code = async_response.redirected_status_code
|
||||
crawl_result.response_headers = async_response.response_headers
|
||||
crawl_result.downloaded_files = async_response.downloaded_files
|
||||
crawl_result.js_execution_result = js_execution_result
|
||||
crawl_result.mhtml = async_response.mhtml_data
|
||||
crawl_result.ssl_certificate = async_response.ssl_certificate
|
||||
crawl_result.network_requests = async_response.network_requests
|
||||
crawl_result.console_messages = async_response.console_messages
|
||||
crawl_result.success = bool(html)
|
||||
crawl_result.session_id = getattr(config, "session_id", None)
|
||||
crawl_result.cache_status = "miss"
|
||||
|
||||
# Check if blocked
|
||||
_blocked, _block_reason = is_blocked(
|
||||
async_response.status_code, html)
|
||||
if not _blocked:
|
||||
_done = True
|
||||
break # Success — exit proxy loop
|
||||
|
||||
except Exception as _crawl_err:
|
||||
if _is_fallback_proxy:
|
||||
self.logger.error_status(
|
||||
url=url,
|
||||
error=f"Fallback proxy {_proxy.server} failed: {_crawl_err}",
|
||||
tag="ANTIBOT",
|
||||
)
|
||||
_block_reason = str(_crawl_err)
|
||||
else:
|
||||
raise # Let main proxy errors propagate normally
|
||||
finally:
|
||||
if _is_fallback_proxy:
|
||||
config.proxy_config = _saved_proxy
|
||||
|
||||
# --- Restore stashed is_fallback proxy for config integrity ---
|
||||
if _fallback_proxy and not _proxy_activated:
|
||||
config.proxy_config = _fallback_proxy
|
||||
|
||||
# --- Fallback fetch function (last resort after all retries+proxies exhausted) ---
|
||||
if (crawl_result
|
||||
and getattr(config, "fallback_fetch_function", None)):
|
||||
_blocked, _ = is_blocked(
|
||||
crawl_result.status_code, crawl_result.html or "")
|
||||
if _blocked:
|
||||
self.logger.warning(
|
||||
message="All retries exhausted, invoking fallback_fetch_function for {url}",
|
||||
tag="ANTIBOT",
|
||||
params={"url": url[:80]},
|
||||
)
|
||||
try:
|
||||
_fallback_html = await config.fallback_fetch_function(url)
|
||||
if _fallback_html:
|
||||
crawl_result = await self.aprocess_html(
|
||||
url=url,
|
||||
html=sanitize_input_encode(_fallback_html),
|
||||
extracted_content=extracted_content,
|
||||
config=config,
|
||||
screenshot_data=None,
|
||||
pdf_data=None,
|
||||
verbose=config.verbose,
|
||||
is_raw_html=True,
|
||||
redirected_url=url,
|
||||
original_scheme=urlparse(url).scheme,
|
||||
**kwargs,
|
||||
)
|
||||
crawl_result.success = True
|
||||
crawl_result.status_code = 200
|
||||
crawl_result.session_id = getattr(config, "session_id", None)
|
||||
crawl_result.cache_status = "miss"
|
||||
except Exception as _fallback_err:
|
||||
self.logger.error_status(
|
||||
url=url,
|
||||
error=f"Fallback fetch failed: {_fallback_err}",
|
||||
tag="ANTIBOT",
|
||||
)
|
||||
|
||||
# Compute head fingerprint for cache validation
|
||||
if html:
|
||||
head_end = html.lower().find('</head>')
|
||||
if crawl_result and crawl_result.html:
|
||||
head_end = crawl_result.html.lower().find('</head>')
|
||||
if head_end != -1:
|
||||
head_html = html[:head_end + 7]
|
||||
head_html = crawl_result.html[:head_end + 7]
|
||||
crawl_result.head_fingerprint = compute_head_fingerprint(head_html)
|
||||
|
||||
self.logger.url_status(
|
||||
url=cache_context.display_url,
|
||||
success=crawl_result.success,
|
||||
success=crawl_result.success if crawl_result else False,
|
||||
timing=time.perf_counter() - start_time,
|
||||
tag="COMPLETE",
|
||||
)
|
||||
|
||||
235
docs/md_v2/advanced/anti-bot-and-fallback.md
Normal file
235
docs/md_v2/advanced/anti-bot-and-fallback.md
Normal file
@@ -0,0 +1,235 @@
|
||||
# Anti-Bot Detection & Fallback
|
||||
|
||||
When crawling sites protected by anti-bot systems (Akamai, Cloudflare, PerimeterX, DataDome, Imperva, etc.), requests often get blocked with CAPTCHAs, 403 responses, or empty pages. Crawl4AI provides a layered retry and fallback system that automatically detects blocking and escalates through multiple strategies until content is retrieved.
|
||||
|
||||
## How Detection Works
|
||||
|
||||
After each crawl attempt, Crawl4AI inspects the HTTP status code and HTML content for known anti-bot signals:
|
||||
|
||||
- **HTTP 403/429** with short or empty response bodies
|
||||
- **Challenge pages** — Cloudflare "Just a moment", Akamai "Access Denied", PerimeterX block pages
|
||||
- **CAPTCHA injection** — reCAPTCHA, hCaptcha, or vendor-specific challenges on otherwise empty pages
|
||||
- **Firewall blocks** — Imperva/Incapsula resource iframes, Sucuri firewall pages, Cloudflare error codes
|
||||
|
||||
Detection uses structural HTML markers (specific element IDs, script sources, form actions) rather than generic keywords to minimize false positives. A normal page that happens to mention "CAPTCHA" or "Cloudflare" in its content will not be flagged.
|
||||
|
||||
## Configuration Options
|
||||
|
||||
All anti-bot retry options live on `CrawlerRunConfig`:
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|---|---|---|---|
|
||||
| `max_retries` | `int` | `0` | Number of retry rounds when blocking is detected. `0` = no retries. |
|
||||
| `fallback_proxy_configs` | `list[ProxyConfig]` | `[]` | List of fallback proxies tried in order within each retry round. |
|
||||
| `fallback_fetch_function` | `async (str) -> str` | `None` | Async function called as last resort. Takes URL, returns raw HTML. |
|
||||
|
||||
And on `ProxyConfig`:
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|---|---|---|---|
|
||||
| `is_fallback` | `bool` | `False` | When `True`, this proxy is skipped on the first attempt and only activated after blocking is detected. |
|
||||
|
||||
## Escalation Chain
|
||||
|
||||
Each retry round tries the main proxy first, then each fallback proxy in order. If all rounds are exhausted and the page is still blocked, the fallback fetch function is called as a last resort.
|
||||
|
||||
```
|
||||
For each round (1 + max_retries rounds):
|
||||
1. Try with main proxy_config (or no proxy if is_fallback=True on first round)
|
||||
2. If blocked → try fallback_proxy_configs[0]
|
||||
3. If blocked → try fallback_proxy_configs[1]
|
||||
4. ... continue through all fallback proxies
|
||||
5. If any attempt succeeds → done
|
||||
|
||||
If all rounds exhausted and still blocked:
|
||||
6. Call fallback_fetch_function(url) → process returned HTML
|
||||
```
|
||||
|
||||
Worst-case attempts before the fetch function: `(1 + max_retries) x (1 + len(fallback_proxy_configs))`
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Simple Retry (No Proxy)
|
||||
|
||||
Retry the crawl up to 3 times when blocking is detected. Useful when blocks are intermittent or IP-based.
|
||||
|
||||
```python
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||
|
||||
async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://example.com",
|
||||
config=CrawlerRunConfig(max_retries=3),
|
||||
)
|
||||
```
|
||||
|
||||
### Proxy as Fallback Only
|
||||
|
||||
Use `is_fallback=True` to skip the proxy on the first attempt. If the site doesn't block you, no proxy credits are consumed. If it does, the proxy activates on retry.
|
||||
|
||||
```python
|
||||
from crawl4ai.async_configs import ProxyConfig
|
||||
|
||||
config = CrawlerRunConfig(
|
||||
max_retries=2,
|
||||
proxy_config=ProxyConfig(
|
||||
server="http://proxy.example.com:8080",
|
||||
username="user",
|
||||
password="pass",
|
||||
is_fallback=True, # Only used when blocking is detected
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
### Fallback Proxy List
|
||||
|
||||
Try a cheaper proxy first, escalate to a premium proxy if it fails. Both are tried within each retry round.
|
||||
|
||||
```python
|
||||
config = CrawlerRunConfig(
|
||||
max_retries=2,
|
||||
proxy_config=ProxyConfig(
|
||||
server="http://datacenter-proxy.example.com:8080",
|
||||
username="user",
|
||||
password="pass",
|
||||
),
|
||||
fallback_proxy_configs=[
|
||||
ProxyConfig(
|
||||
server="http://residential-proxy.example.com:9090",
|
||||
username="user",
|
||||
password="pass",
|
||||
),
|
||||
],
|
||||
)
|
||||
```
|
||||
|
||||
With this setup, each round tries the datacenter proxy first, then the residential proxy. With `max_retries=2`, worst case is 3 rounds x 2 proxies = 6 attempts.
|
||||
|
||||
### Fallback Fetch Function
|
||||
|
||||
When all browser-based attempts fail, call a custom async function as a last resort. This function receives the URL and must return raw HTML as a string. The returned HTML is processed through the normal pipeline (markdown generation, extraction, etc.).
|
||||
|
||||
This is useful when you have access to a scraping API, a pre-fetched cache, or any other source of HTML.
|
||||
|
||||
```python
|
||||
import aiohttp
|
||||
|
||||
async def my_scraping_api(url: str) -> str:
|
||||
"""Fetch HTML via an external scraping API."""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
"https://api.my-scraping-service.com/fetch",
|
||||
params={"url": url, "format": "html"},
|
||||
headers={"Authorization": "Bearer MY_TOKEN"},
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
return await resp.text()
|
||||
raise RuntimeError(f"API error: {resp.status}")
|
||||
|
||||
config = CrawlerRunConfig(
|
||||
max_retries=1,
|
||||
fallback_fetch_function=my_scraping_api,
|
||||
)
|
||||
```
|
||||
|
||||
The function can do anything — call an API, read from a database, return cached HTML, or make a simple HTTP request with a different library. Crawl4AI does not care how the HTML is obtained.
|
||||
|
||||
### Full Escalation (All Features Combined)
|
||||
|
||||
This example combines every layer: stealth mode, a fallback proxy that only activates when blocked, a list of escalation proxies tried each round, retries, and a final fetch function.
|
||||
|
||||
```python
|
||||
import aiohttp
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig
|
||||
|
||||
# Last-resort: fetch HTML via an external service
|
||||
async def external_fetch(url: str) -> str:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
"https://api.my-service.com/scrape",
|
||||
json={"url": url, "render_js": True},
|
||||
headers={"Authorization": "Bearer MY_TOKEN"},
|
||||
) as resp:
|
||||
return await resp.text()
|
||||
|
||||
browser_config = BrowserConfig(
|
||||
headless=True,
|
||||
enable_stealth=True,
|
||||
)
|
||||
|
||||
crawl_config = CrawlerRunConfig(
|
||||
magic=True,
|
||||
wait_until="load",
|
||||
max_retries=2,
|
||||
|
||||
# Primary proxy — is_fallback=True means first attempt runs without it
|
||||
proxy_config=ProxyConfig(
|
||||
server="http://datacenter-proxy.example.com:8080",
|
||||
username="user",
|
||||
password="pass",
|
||||
is_fallback=True,
|
||||
),
|
||||
|
||||
# Fallback proxies — tried in order after main proxy fails each round
|
||||
fallback_proxy_configs=[
|
||||
ProxyConfig(
|
||||
server="http://residential-proxy.example.com:9090",
|
||||
username="user",
|
||||
password="pass",
|
||||
),
|
||||
],
|
||||
|
||||
# Last resort — called after all retries and proxies are exhausted
|
||||
fallback_fetch_function=external_fetch,
|
||||
)
|
||||
|
||||
async with AsyncWebCrawler(config=browser_config) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://protected-site.com/products",
|
||||
config=crawl_config,
|
||||
)
|
||||
|
||||
if result.success:
|
||||
print(f"Got {len(result.markdown.raw_markdown)} chars of markdown")
|
||||
else:
|
||||
print(f"All attempts failed: {result.error_message}")
|
||||
```
|
||||
|
||||
**What happens step by step:**
|
||||
|
||||
| Round | Attempt | What runs |
|
||||
|---|---|---|
|
||||
| 1 | 1 | No proxy (is_fallback skips it) — blocked |
|
||||
| 1 | 2 | Residential fallback proxy — blocked (bad IP) |
|
||||
| 2 | 1 | Datacenter proxy activated — blocked |
|
||||
| 2 | 2 | Residential fallback proxy — blocked |
|
||||
| 3 | 1 | Datacenter proxy — blocked |
|
||||
| 3 | 2 | Residential fallback proxy — blocked |
|
||||
| - | - | `external_fetch(url)` called — returns HTML |
|
||||
|
||||
That's up to 6 browser attempts + 1 function call before giving up.
|
||||
|
||||
## Tips
|
||||
|
||||
- **Start with `max_retries=0`** and a `fallback_fetch_function` if you just want a safety net without burning time on retries.
|
||||
- **Use `is_fallback=True`** on your proxy to avoid consuming proxy credits on sites that don't need them.
|
||||
- **Order fallback proxies cheapest-first** — datacenter proxies before residential, residential before premium.
|
||||
- **Combine with stealth mode** — `BrowserConfig(enable_stealth=True)` and `CrawlerRunConfig(magic=True)` reduce the chance of being blocked in the first place.
|
||||
- **`wait_until="load"`** is important for anti-bot sites — the default `domcontentloaded` can return before the anti-bot sensor finishes.
|
||||
- **You don't need a primary proxy to use fallback proxies.** If you skip `proxy_config` and only pass `fallback_proxy_configs`, the first attempt each round runs with no proxy. This is useful when you want to try direct access first and only escalate to proxies if blocked:
|
||||
```python
|
||||
config = CrawlerRunConfig(
|
||||
max_retries=1,
|
||||
fallback_proxy_configs=[proxy_A, proxy_B],
|
||||
)
|
||||
# Round 1: no proxy → proxy_A → proxy_B
|
||||
# Round 2: no proxy → proxy_A → proxy_B
|
||||
```
|
||||
|
||||
## See Also
|
||||
|
||||
- [Proxy & Security](proxy-security.md) — Proxy setup, authentication, and rotation
|
||||
- [Undetected Browser](undetected-browser.md) — Stealth mode and browser fingerprint evasion
|
||||
- [Session Management](session-management.md) — Maintaining sessions across requests
|
||||
@@ -302,3 +302,7 @@ def safe_proxy_repr(proxy: ProxyConfig):
|
||||
- Ensure `ProxyConfig.from_env()` actually loaded entries (`len(proxies) > 0`).
|
||||
- Attach `proxy_rotation_strategy` to `CrawlerRunConfig`.
|
||||
- Validate the proxy definitions you pass into the strategy.
|
||||
|
||||
## See Also
|
||||
|
||||
- [Anti-Bot Detection & Fallback](anti-bot-and-fallback.md) — Automatic retry with proxy escalation and fallback functions when anti-bot blocking is detected
|
||||
|
||||
@@ -391,4 +391,5 @@ Remember:
|
||||
- [Advanced Features](advanced-features.md) - Overview of all advanced features
|
||||
- [Proxy & Security](proxy-security.md) - Using proxies with anti-bot features
|
||||
- [Session Management](session-management.md) - Maintaining sessions across requests
|
||||
- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies
|
||||
- [Identity Based Crawling](identity-based-crawling.md) - Additional anti-detection strategies
|
||||
- [Anti-Bot Detection & Fallback](anti-bot-and-fallback.md) - Automatic retry and proxy escalation when blocking is detected
|
||||
@@ -109,8 +109,11 @@ We group them by category.
|
||||
| **`timezone_id`** | `str or None` (None) | Browser's timezone (e.g., "America/New_York", "Europe/Paris"). |
|
||||
| **`geolocation`** | `GeolocationConfig or None` (None) | GPS coordinates configuration. Use `GeolocationConfig(latitude=..., longitude=..., accuracy=...)`. |
|
||||
| **`fetch_ssl_certificate`** | `bool` (False) | If `True`, fetches and includes SSL certificate information in the result. |
|
||||
| **`proxy_config`** | `ProxyConfig or dict or None` (None) | Proxy configuration for this specific crawl. Can override browser-level proxy settings. |
|
||||
| **`proxy_config`** | `ProxyConfig or dict or None` (None) | Proxy configuration for this specific crawl. Can override browser-level proxy settings. Set `is_fallback=True` on the ProxyConfig to only use the proxy when anti-bot blocking is detected. |
|
||||
| **`proxy_rotation_strategy`** | `ProxyRotationStrategy` (None) | Strategy for rotating proxies during crawl operations. |
|
||||
| **`max_retries`** | `int` (0) | Number of retry rounds when anti-bot blocking is detected. Each round tries the main proxy and all fallback proxies. |
|
||||
| **`fallback_proxy_configs`** | `list[ProxyConfig]` ([]) | List of fallback proxies tried in order within each retry round after the main proxy fails. |
|
||||
| **`fallback_fetch_function`**| `async (str) -> str or None` (None) | Async function called as last resort after all retries are exhausted. Takes URL, returns raw HTML. See [Anti-Bot & Fallback](../advanced/anti-bot-and-fallback.md). |
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -275,17 +275,22 @@ class CrawlerRunConfig:
|
||||
- **`geolocation`**: GPS coordinates via `GeolocationConfig(latitude=48.8566, longitude=2.3522)`
|
||||
- See [Identity Based Crawling](../advanced/identity-based-crawling.md#7-locale-timezone-and-geolocation-control)
|
||||
|
||||
10.⠀**Proxy Configuration**:
|
||||
- **`proxy_config`**: Proxy server configuration (ProxyConfig object or dict) e.g. {"server": "...", "username": "...", "password"}
|
||||
10.⠀**Proxy Configuration**:
|
||||
- **`proxy_config`**: Proxy server configuration (ProxyConfig object or dict) e.g. {"server": "...", "username": "...", "password"}. Set `is_fallback=True` to only use the proxy when anti-bot blocking is detected.
|
||||
- **`proxy_rotation_strategy`**: Strategy for rotating proxies during crawls
|
||||
|
||||
11.⠀**Page Interaction Parameters**:
|
||||
11.⠀**Anti-Bot Retry & Fallback** (see [Anti-Bot & Fallback](../advanced/anti-bot-and-fallback.md)):
|
||||
- **`max_retries`**: Number of retry rounds when blocking is detected (default: 0)
|
||||
- **`fallback_proxy_configs`**: List of fallback proxies tried in order within each retry round
|
||||
- **`fallback_fetch_function`**: Async function called as last resort — takes URL, returns raw HTML
|
||||
|
||||
12.⠀**Page Interaction Parameters**:
|
||||
- **`scan_full_page`**: If `True`, scroll through the entire page to load all content
|
||||
- **`wait_until`**: Condition to wait for when navigating (e.g., "domcontentloaded", "networkidle")
|
||||
- **`page_timeout`**: Timeout in milliseconds for page operations (default: 60000)
|
||||
- **`delay_before_return_html`**: Delay in seconds before retrieving final HTML.
|
||||
|
||||
12.⠀**`url_matcher`** & **`match_mode`**:
|
||||
13.⠀**`url_matcher`** & **`match_mode`**:
|
||||
- Enable URL-specific configurations when used with `arun_many()`.
|
||||
- Set `url_matcher` to patterns (glob, function, or list) to match specific URLs.
|
||||
- Use `match_mode` (OR/AND) to control how multiple patterns combine.
|
||||
|
||||
@@ -47,6 +47,7 @@ nav:
|
||||
- "Lazy Loading": "advanced/lazy-loading.md"
|
||||
- "Hooks & Auth": "advanced/hooks-auth.md"
|
||||
- "Proxy & Security": "advanced/proxy-security.md"
|
||||
- "Anti-Bot & Fallback": "advanced/anti-bot-and-fallback.md"
|
||||
- "Undetected Browser": "advanced/undetected-browser.md"
|
||||
- "Session Management": "advanced/session-management.md"
|
||||
- "Multi-URL Crawling": "advanced/multi-url-crawling.md"
|
||||
|
||||
315
tests/proxy/test_antibot_detector.py
Normal file
315
tests/proxy/test_antibot_detector.py
Normal file
@@ -0,0 +1,315 @@
|
||||
"""
|
||||
Unit tests for antibot_detector.is_blocked().
|
||||
|
||||
Tests are organized into:
|
||||
- TRUE POSITIVES: Real block pages that MUST be detected
|
||||
- TRUE NEGATIVES: Legitimate pages that MUST NOT be flagged
|
||||
- EDGE CASES: Boundary conditions
|
||||
"""
|
||||
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
|
||||
|
||||
from crawl4ai.antibot_detector import is_blocked
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def check(name, result, expected_blocked, expected_substr=None):
|
||||
global PASS, FAIL
|
||||
blocked, reason = result
|
||||
ok = blocked == expected_blocked
|
||||
if expected_substr and blocked:
|
||||
ok = ok and expected_substr.lower() in reason.lower()
|
||||
status = "PASS" if ok else "FAIL"
|
||||
if not ok:
|
||||
FAIL += 1
|
||||
print(f" {status}: {name}")
|
||||
print(f" got blocked={blocked}, reason={reason!r}")
|
||||
print(f" expected blocked={expected_blocked}" +
|
||||
(f", substr={expected_substr!r}" if expected_substr else ""))
|
||||
else:
|
||||
PASS += 1
|
||||
if blocked:
|
||||
print(f" {status}: {name} -> {reason}")
|
||||
else:
|
||||
print(f" {status}: {name} -> not blocked")
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# TRUE POSITIVES — real block pages that MUST be detected
|
||||
# =========================================================================
|
||||
print("\n=== TRUE POSITIVES (must detect as blocked) ===\n")
|
||||
|
||||
# --- Akamai ---
|
||||
check("Akamai Reference #",
|
||||
is_blocked(403, '<html><body>Access Denied\nYour request was blocked.\nReference #18.2d351ab8.1557333295.a4e16ab</body></html>'),
|
||||
True, "Akamai")
|
||||
|
||||
check("Akamai Pardon Our Interruption",
|
||||
is_blocked(403, '<html><head><title>Pardon Our Interruption</title></head><body><p>Please verify you are human</p></body></html>'),
|
||||
True, "Pardon")
|
||||
|
||||
check("Akamai 403 short Access Denied",
|
||||
is_blocked(403, '<html><body><h1>Access Denied</h1></body></html>'),
|
||||
True) # Detected via near-empty 403 or Access Denied pattern
|
||||
|
||||
# --- Cloudflare ---
|
||||
check("Cloudflare challenge form",
|
||||
is_blocked(403, '''<html><body>
|
||||
<form id="challenge-form" action="/cdn-cgi/l/chk_jschl?__cf_chl_f_tk=abc123">
|
||||
<input type="hidden" name="jschl_vc" value="test"/>
|
||||
</form></body></html>'''),
|
||||
True, "Cloudflare challenge")
|
||||
|
||||
check("Cloudflare error 1020",
|
||||
is_blocked(403, '''<html><body>
|
||||
<div class="cf-wrapper"><span class="cf-error-code">1020</span></div>
|
||||
<p>Access denied</p></body></html>'''),
|
||||
True, "Cloudflare firewall")
|
||||
|
||||
check("Cloudflare IUAM script",
|
||||
is_blocked(403, '<html><script src="/cdn-cgi/challenge-platform/h/g/orchestrate/jsch/v1"></script></html>'),
|
||||
True, "Cloudflare JS challenge")
|
||||
|
||||
check("Cloudflare Just a moment",
|
||||
is_blocked(403, '<html><head><title>Just a moment...</title></head><body>Checking your browser</body></html>'),
|
||||
True) # Detected via near-empty 403 or Cloudflare pattern
|
||||
|
||||
check("Cloudflare Checking your browser (short 503)",
|
||||
is_blocked(503, '<html><body>Checking your browser before accessing the site.</body></html>'),
|
||||
True, "Cloudflare browser check")
|
||||
|
||||
# --- PerimeterX ---
|
||||
check("PerimeterX block page",
|
||||
is_blocked(403, '''<html><head><title>Access to This Page Has Been Blocked</title></head>
|
||||
<body><div id="px-captcha"></div>
|
||||
<script>window._pxAppId = 'PX12345';</script></body></html>'''),
|
||||
True, "PerimeterX")
|
||||
|
||||
check("PerimeterX captcha CDN",
|
||||
is_blocked(403, '<html><body><script src="https://captcha.px-cdn.net/PX12345/captcha.js"></script></body></html>'),
|
||||
True, "PerimeterX captcha")
|
||||
|
||||
# --- DataDome ---
|
||||
check("DataDome captcha delivery",
|
||||
is_blocked(403, '''<html><body><script>
|
||||
var dd = {'rt':'i','cid':'AHrlq...','host':'geo.captcha-delivery.com'};
|
||||
</script></body></html>'''),
|
||||
True, "DataDome")
|
||||
|
||||
# --- Imperva/Incapsula ---
|
||||
check("Imperva Incapsula Resource",
|
||||
is_blocked(403, '<html><body><iframe src="/_Incapsula_Resource?incident_id=123&sess_id=abc"></iframe></body></html>'),
|
||||
True, "Imperva")
|
||||
|
||||
check("Imperva incident ID",
|
||||
is_blocked(200, '<html><body>Request unsuccessful. Incapsula incident ID: 12345-67890</body></html>'),
|
||||
True, "Incapsula incident")
|
||||
|
||||
# --- Sucuri ---
|
||||
check("Sucuri firewall",
|
||||
is_blocked(403, '<html><body><h1>Sucuri WebSite Firewall - Access Denied</h1></body></html>'),
|
||||
True, "Sucuri")
|
||||
|
||||
# --- Kasada ---
|
||||
check("Kasada challenge",
|
||||
is_blocked(403, '<html><script>KPSDK.scriptStart = KPSDK.now();</script></html>'),
|
||||
True, "Kasada")
|
||||
|
||||
# --- HTTP 429 ---
|
||||
check("HTTP 429 rate limit",
|
||||
is_blocked(429, '<html><body>Rate limit exceeded</body></html>'),
|
||||
True, "429")
|
||||
|
||||
check("HTTP 429 empty body",
|
||||
is_blocked(429, ''),
|
||||
True, "429")
|
||||
|
||||
# --- Empty 200 ---
|
||||
check("HTTP 200 empty page",
|
||||
is_blocked(200, ''),
|
||||
True, "empty")
|
||||
|
||||
check("HTTP 200 whitespace only",
|
||||
is_blocked(200, ' \n\n '),
|
||||
True, "empty")
|
||||
|
||||
# --- 403 near-empty ---
|
||||
check("HTTP 403 near-empty (10 bytes)",
|
||||
is_blocked(403, '<html></html>'),
|
||||
True, "403")
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# TRUE NEGATIVES — legitimate pages that MUST NOT be flagged
|
||||
# =========================================================================
|
||||
print("\n=== TRUE NEGATIVES (must NOT detect as blocked) ===\n")
|
||||
|
||||
# --- Normal pages ---
|
||||
check("Normal 200 page (example.com size)",
|
||||
is_blocked(200, '<html><head><title>Example</title></head><body>' + 'x' * 500 + '</body></html>'),
|
||||
False)
|
||||
|
||||
check("Normal 200 large page",
|
||||
is_blocked(200, '<html><body>' + '<p>Some content here.</p>\n' * 5000 + '</body></html>'),
|
||||
False)
|
||||
|
||||
# --- Security articles (false positive trap!) ---
|
||||
check("Article about bot detection (large page)",
|
||||
is_blocked(200, '<html><head><title>How to Detect Bots</title></head><body>' +
|
||||
'<h1>How to Detect Bots on Your Website</h1>' +
|
||||
'<p>Anti-bot solutions like DataDome, PerimeterX, and Cloudflare ' +
|
||||
'help detect and block bot traffic. When a bot is detected, ' +
|
||||
'services show a CAPTCHA or Access Denied page. ' +
|
||||
'Common signals include blocked by security warnings.</p>' +
|
||||
'<p>The g-recaptcha and h-captcha widgets are used for challenges.</p>' +
|
||||
'<p>' + 'More article content. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
check("DataDome marketing page (large)",
|
||||
is_blocked(200, '<html><body><h1>DataDome Bot Protection</h1>' +
|
||||
'<p>DataDome protects websites from bot attacks. ' +
|
||||
'Our solution detects automated traffic using advanced fingerprinting. ' +
|
||||
'Competitors like PerimeterX use window._pxAppId for tracking.</p>' +
|
||||
'<p>' + 'Marketing content. ' * 1000 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
|
||||
# --- Login pages with CAPTCHA (not a block!) ---
|
||||
check("Login page with reCAPTCHA (large page)",
|
||||
is_blocked(200, '<html><head><title>Sign In</title></head><body>' +
|
||||
'<nav>Home | Products | Contact</nav>' +
|
||||
'<form action="/login" method="POST">' +
|
||||
'<input name="email" type="email"/>' +
|
||||
'<input name="password" type="password"/>' +
|
||||
'<div class="g-recaptcha" data-sitekey="abc123"></div>' +
|
||||
'<button type="submit">Sign In</button>' +
|
||||
'</form>' +
|
||||
'<footer>Copyright 2024</footer>' +
|
||||
'<p>' + 'Page content. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
check("Signup page with hCaptcha (large page)",
|
||||
is_blocked(200, '<html><body>' +
|
||||
'<h1>Create Account</h1>' +
|
||||
'<form><div class="h-captcha" data-sitekey="xyz"></div></form>' +
|
||||
'<p>' + 'Registration info. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
# --- Legitimate 403 pages (not anti-bot) ---
|
||||
check("Apache directory listing denied (403, large-ish)",
|
||||
is_blocked(403, '<html><head><title>403 Forbidden</title></head><body>' +
|
||||
'<h1>Forbidden</h1>' +
|
||||
'<p>You don\'t have permission to access this resource on this server.</p>' +
|
||||
'<hr><address>Apache/2.4.41 (Ubuntu) Server at example.com Port 80</address>' +
|
||||
'<p>' + 'Server info. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
check("Nginx 403 (large page)",
|
||||
is_blocked(403, '<html><head><title>403 Forbidden</title></head><body>' +
|
||||
'<center><h1>403 Forbidden</h1></center>' +
|
||||
'<hr><center>nginx/1.18.0</center>' +
|
||||
'<p>' + 'Content. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
check("API 403 auth required (JSON)",
|
||||
is_blocked(403, '{"error": "Forbidden", "message": "Invalid API key", "code": 403}'),
|
||||
False)
|
||||
|
||||
# --- Cloudflare-served normal pages (not blocked!) ---
|
||||
check("Cloudflare-served normal page with footer",
|
||||
is_blocked(200, '<html><body>' +
|
||||
'<h1>Welcome to Our Site</h1>' +
|
||||
'<p>This is a normal page served through Cloudflare CDN.</p>' +
|
||||
'<footer>Performance & security by Cloudflare</footer>' +
|
||||
'<p>' + 'Normal content. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
# --- Small but legitimate pages ---
|
||||
check("Small valid 200 page (150 bytes)",
|
||||
is_blocked(200, '<html><head><title>OK</title></head><body><p>Your request was processed successfully.</p></body></html>'),
|
||||
False)
|
||||
|
||||
check("Small JSON 200 response",
|
||||
is_blocked(200, '{"status": "ok", "data": {"id": 123, "name": "test"}, "timestamp": "2024-01-01T00:00:00Z"}'),
|
||||
False)
|
||||
|
||||
check("Redirect page 200",
|
||||
is_blocked(200, '<html><head><meta http-equiv="refresh" content="0;url=/dashboard"></head><body>Redirecting...</body></html>'),
|
||||
False)
|
||||
|
||||
# --- 503 legitimate server errors ---
|
||||
check("Legitimate 503 maintenance (large)",
|
||||
is_blocked(503, '<html><body><h1>Service Temporarily Unavailable</h1>' +
|
||||
'<p>We are performing scheduled maintenance. Please try again later.</p>' +
|
||||
'<p>' + 'Maintenance info. ' * 500 + '</p>' +
|
||||
'</body></html>'),
|
||||
False)
|
||||
|
||||
# --- 200 with short but real content ---
|
||||
check("Short thank you page (200, 120 bytes)",
|
||||
is_blocked(200, '<html><body><h1>Thank You!</h1><p>Your order has been placed. Confirmation email sent.</p></body></html>'),
|
||||
False)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# EDGE CASES
|
||||
# =========================================================================
|
||||
print("\n=== EDGE CASES ===\n")
|
||||
|
||||
check("None status code + empty html",
|
||||
is_blocked(None, ''),
|
||||
False)
|
||||
|
||||
check("None status code + block content",
|
||||
is_blocked(None, '<html><body>Reference #18.2d351ab8.1557333295.a4e16ab</body></html>'),
|
||||
True, "Akamai")
|
||||
|
||||
check("200 + tier1 pattern (Imperva deceptive 200)",
|
||||
is_blocked(200, '<html><body>Request unsuccessful. Incapsula incident ID: 555-999</body></html>'),
|
||||
True, "Incapsula")
|
||||
|
||||
check("403 + 4999 bytes (just under threshold)",
|
||||
is_blocked(403, '<html><body>Access Denied' + 'x' * 4950 + '</body></html>'),
|
||||
True, "Access Denied")
|
||||
|
||||
check("403 + 5001 bytes (just over threshold, no tier2 match)",
|
||||
is_blocked(403, '<html><body>Some error page' + 'x' * 4960 + '</body></html>'),
|
||||
False)
|
||||
|
||||
check("403 + 9999 bytes with generic block text",
|
||||
is_blocked(403, '<html><body>blocked by security' + 'x' * 9950 + '</body></html>'),
|
||||
True, "Blocked by security")
|
||||
|
||||
check("403 + 10001 bytes with generic block text (too big for tier2)",
|
||||
is_blocked(403, '<html><body>blocked by security' + 'x' * 9970 + '</body></html>'),
|
||||
False)
|
||||
|
||||
check("200 + whitespace-padded but 89 bytes content (above threshold for meaningful)",
|
||||
is_blocked(200, ' ' * 10 + 'x' * 89 + ' ' * 10),
|
||||
True, "empty")
|
||||
|
||||
check("200 + exactly 100 bytes stripped (at threshold)",
|
||||
is_blocked(200, 'x' * 100),
|
||||
False)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# SUMMARY
|
||||
# =========================================================================
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f"RESULTS: {PASS} passed, {FAIL} failed out of {PASS + FAIL} tests")
|
||||
print(f"{'=' * 60}")
|
||||
if FAIL > 0:
|
||||
print("SOME TESTS FAILED!")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("ALL TESTS PASSED!")
|
||||
Reference in New Issue
Block a user