Fix critical RCE via deserialization and eval() in /crawl endpoint

- Replace raw eval() in _compute_field() with AST-validated
  _safe_eval_expression() that blocks __import__, dunder attribute
  access, and import statements while preserving safe transforms
- Add ALLOWED_DESERIALIZE_TYPES allowlist to from_serializable_dict()
  preventing arbitrary class instantiation from API input
- Update security contact email and add v0.8.1 security fixes to
  SECURITY.md with researcher acknowledgment
- Add 17 security tests covering both fixes
This commit is contained in:
unclecode
2026-01-30 08:46:01 +00:00
parent ad5ebf166a
commit 0104db6de2
4 changed files with 288 additions and 7 deletions

View File

@@ -24,7 +24,7 @@ Instead, please report via one of these methods:
- Fill in the details - Fill in the details
2. **Email** 2. **Email**
- Send details to: aravind@crawl4ai.com and nasrin@crawl4ai.com - Send details to: unclecode@crawl4ai.com (CC: nasrin@crawl4ai.com and aravind@crawl4ai.com)
- Use subject: `[SECURITY] Brief description` - Use subject: `[SECURITY] Brief description`
- Include: - Include:
- Description of the vulnerability - Description of the vulnerability
@@ -98,10 +98,21 @@ When using Crawl4AI as a Python library:
| CVE-pending-1 | CRITICAL | RCE via hooks `__import__` | Removed from allowed builtins | | CVE-pending-1 | CRITICAL | RCE via hooks `__import__` | Removed from allowed builtins |
| CVE-pending-2 | HIGH | LFI via `file://` URLs | URL scheme validation added | | CVE-pending-2 | HIGH | LFI via `file://` URLs | URL scheme validation added |
### Fixed in v0.8.1
| ID | Severity | Description | Fix |
|----|----------|-------------|-----|
| CVE-pending-3 | CRITICAL | RCE via deserialization + `eval()` in `/crawl` endpoint | Allowlisted deserializable types; AST-validated computed field expressions |
See [Security Advisory](https://github.com/unclecode/crawl4ai/security/advisories) for details. See [Security Advisory](https://github.com/unclecode/crawl4ai/security/advisories) for details.
## Security Features ## Security Features
### v0.8.1+
- **Deserialization Allowlist**: Only known-safe types can be instantiated via API config
- **Safe Expression Evaluation**: Computed fields use AST validation (no `__import__`, no dunder access)
### v0.8.0+ ### v0.8.0+
- **URL Scheme Validation**: Blocks `file://`, `javascript:`, `data:` URLs on API - **URL Scheme Validation**: Blocks `file://`, `javascript:`, `data:` URLs on API
@@ -115,7 +126,8 @@ See [Security Advisory](https://github.com/unclecode/crawl4ai/security/advisorie
We thank the following security researchers for responsibly disclosing vulnerabilities: We thank the following security researchers for responsibly disclosing vulnerabilities:
- **[Neo by ProjectDiscovery](https://projectdiscovery.io/blog/introducing-neo)** - RCE and LFI vulnerabilities (December 2025) - **Alec M** — RCE via deserialization in `/crawl` endpoint (January 2026)
- **[Neo by ProjectDiscovery](https://projectdiscovery.io/blog/introducing-neo)** — RCE and LFI vulnerabilities (December 2025)
--- ---

View File

@@ -41,6 +41,41 @@ class MatchMode(Enum):
# from .proxy_strategy import ProxyConfig # from .proxy_strategy import ProxyConfig
# Allowlist of types that can be deserialized via from_serializable_dict().
# This prevents arbitrary class instantiation from untrusted input (e.g. API requests).
ALLOWED_DESERIALIZE_TYPES = {
# Config classes
"BrowserConfig", "CrawlerRunConfig", "HTTPCrawlerConfig",
"LLMConfig", "ProxyConfig", "GeolocationConfig",
"SeedingConfig", "VirtualScrollConfig", "LinkPreviewConfig",
# Extraction strategies
"JsonCssExtractionStrategy", "JsonXPathExtractionStrategy",
"JsonLxmlExtractionStrategy", "LLMExtractionStrategy",
"CosineStrategy", "RegexExtractionStrategy",
# Markdown / content
"DefaultMarkdownGenerator",
"PruningContentFilter", "BM25ContentFilter", "LLMContentFilter",
# Scraping
"LXMLWebScrapingStrategy",
# Chunking
"RegexChunking",
# Deep crawl
"BFSDeepCrawlStrategy", "DFSDeepCrawlStrategy", "BestFirstCrawlingStrategy",
# Filters & scorers
"FilterChain", "URLPatternFilter", "DomainFilter",
"ContentTypeFilter", "URLFilter", "SEOFilter", "ContentRelevanceFilter",
"KeywordRelevanceScorer", "URLScorer", "CompositeScorer",
"DomainAuthorityScorer", "FreshnessScorer", "PathDepthScorer",
# Enums
"CacheMode", "MatchMode", "DisplayMode",
# Dispatchers
"MemoryAdaptiveDispatcher", "SemaphoreDispatcher",
# Table extraction
"DefaultTableExtraction", "NoTableExtraction",
# Proxy
"RoundRobinProxyStrategy",
}
def to_serializable_dict(obj: Any, ignore_default_value : bool = False): def to_serializable_dict(obj: Any, ignore_default_value : bool = False):
""" """
@@ -134,15 +169,21 @@ def from_serializable_dict(data: Any) -> Any:
if data["type"] == "dict" and "value" in data: if data["type"] == "dict" and "value" in data:
return {k: from_serializable_dict(v) for k, v in data["value"].items()} return {k: from_serializable_dict(v) for k, v in data["value"].items()}
# Security: only allow known-safe types to be deserialized
type_name = data["type"]
if type_name not in ALLOWED_DESERIALIZE_TYPES:
raise ValueError(
f"Deserialization of type '{type_name}' is not allowed. "
f"Only allowlisted configuration and strategy types can be deserialized."
)
cls = None cls = None
# If you are receiving an error while trying to convert a dict to an object:
# Either add a module to `modules_paths` list, or add the `data["type"]` to the crawl4ai __init__.py file
module_paths = ["crawl4ai"] module_paths = ["crawl4ai"]
for module_path in module_paths: for module_path in module_paths:
try: try:
mod = importlib.import_module(module_path) mod = importlib.import_module(module_path)
if hasattr(mod, data["type"]): if hasattr(mod, type_name):
cls = getattr(mod, data["type"]) cls = getattr(mod, type_name)
break break
except (ImportError, AttributeError): except (ImportError, AttributeError):
continue continue

View File

@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import ast
import inspect import inspect
from typing import Any, List, Dict, Optional, Tuple, Pattern, Union from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -1001,6 +1002,69 @@ class LLMExtractionStrategy(ExtractionStrategy):
####################################################### #######################################################
# New extraction strategies for JSON-based extraction # # New extraction strategies for JSON-based extraction #
####################################################### #######################################################
# Safe builtins allowed in computed field expressions
_SAFE_EVAL_BUILTINS = {
"str": str, "int": int, "float": float, "bool": bool,
"len": len, "round": round, "abs": abs, "min": min, "max": max,
"sum": sum, "sorted": sorted, "reversed": reversed,
"list": list, "dict": dict, "tuple": tuple, "set": set,
"enumerate": enumerate, "zip": zip, "map": map, "filter": filter,
"any": any, "all": all, "range": range,
"True": True, "False": False, "None": None,
"isinstance": isinstance, "type": type,
}
def _safe_eval_expression(expression: str, local_vars: dict) -> Any:
"""
Evaluate a computed field expression safely using AST validation.
Allows simple transforms (math, string methods, attribute access on data)
while blocking dangerous operations (__import__, dunder access, etc.).
Args:
expression: The Python expression string to evaluate.
local_vars: The local variables (extracted item fields) available to the expression.
Returns:
The result of evaluating the expression.
Raises:
ValueError: If the expression contains disallowed constructs.
"""
try:
tree = ast.parse(expression, mode="eval")
except SyntaxError as e:
raise ValueError(f"Invalid expression syntax: {e}")
for node in ast.walk(tree):
# Block import statements
if isinstance(node, (ast.Import, ast.ImportFrom)):
raise ValueError("Import statements are not allowed in expressions")
# Block attribute access to dunder attributes (e.g., __class__, __globals__)
if isinstance(node, ast.Attribute) and node.attr.startswith("_"):
raise ValueError(
f"Access to private/dunder attribute '{node.attr}' is not allowed"
)
# Block calls to __import__ or any name starting with _
if isinstance(node, ast.Call):
func = node.func
if isinstance(func, ast.Name) and func.id.startswith("_"):
raise ValueError(
f"Calling '{func.id}' is not allowed in expressions"
)
if isinstance(func, ast.Attribute) and func.attr.startswith("_"):
raise ValueError(
f"Calling '{func.attr}' is not allowed in expressions"
)
safe_globals = {"__builtins__": _SAFE_EVAL_BUILTINS}
return eval(compile(tree, "<expression>", "eval"), safe_globals, local_vars)
class JsonElementExtractionStrategy(ExtractionStrategy): class JsonElementExtractionStrategy(ExtractionStrategy):
""" """
Abstract base class for extracting structured JSON from HTML content. Abstract base class for extracting structured JSON from HTML content.
@@ -1236,7 +1300,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
def _compute_field(self, item, field): def _compute_field(self, item, field):
try: try:
if "expression" in field: if "expression" in field:
return eval(field["expression"], {}, item) return _safe_eval_expression(field["expression"], item)
elif "function" in field: elif "function" in field:
return field["function"](item) return field["function"](item)
except Exception as e: except Exception as e:

View File

@@ -160,6 +160,170 @@ class TestHooksEnabled(unittest.TestCase):
os.environ.pop("CRAWL4AI_HOOKS_ENABLED", None) os.environ.pop("CRAWL4AI_HOOKS_ENABLED", None)
class TestComputedFieldSafety(unittest.TestCase):
"""Test that computed field expressions block dangerous operations.
Mirrors the AST-based _safe_eval_expression() logic from extraction_strategy.py
to test without importing heavy crawl4ai dependencies.
"""
def setUp(self):
"""Set up the safe eval function (local copy of the logic)."""
import ast
SAFE_BUILTINS = {
"str": str, "int": int, "float": float, "bool": bool,
"len": len, "round": round, "abs": abs, "min": min, "max": max,
"sum": sum, "sorted": sorted, "reversed": reversed,
"list": list, "dict": dict, "tuple": tuple, "set": set,
"enumerate": enumerate, "zip": zip, "map": map, "filter": filter,
"any": any, "all": all, "range": range,
"True": True, "False": False, "None": None,
"isinstance": isinstance, "type": type,
}
def safe_eval(expression, local_vars):
tree = ast.parse(expression, mode="eval")
for node in ast.walk(tree):
if isinstance(node, (ast.Import, ast.ImportFrom)):
raise ValueError("Import statements are not allowed")
if isinstance(node, ast.Attribute) and node.attr.startswith("_"):
raise ValueError(f"Access to '{node.attr}' is not allowed")
if isinstance(node, ast.Call):
func = node.func
if isinstance(func, ast.Name) and func.id.startswith("_"):
raise ValueError(f"Calling '{func.id}' is not allowed")
if isinstance(func, ast.Attribute) and func.attr.startswith("_"):
raise ValueError(f"Calling '{func.attr}' is not allowed")
safe_globals = {"__builtins__": SAFE_BUILTINS}
return eval(compile(tree, "<expression>", "eval"), safe_globals, local_vars)
self.safe_eval = safe_eval
# === SECURITY TESTS: These expressions must be BLOCKED ===
def test_import_blocked(self):
"""__import__('os') must be blocked."""
with self.assertRaises(ValueError):
self.safe_eval("__import__('os').system('id')", {})
def test_dunder_attribute_blocked(self):
"""Access to __class__, __globals__, etc. must be blocked."""
with self.assertRaises(ValueError):
self.safe_eval("''.__class__.__bases__", {})
def test_dunder_method_call_blocked(self):
"""Calls to dunder methods must be blocked."""
with self.assertRaises(ValueError):
self.safe_eval("x.__getattribute__('y')", {"x": {}})
def test_os_popen_via_import_blocked(self):
"""The exact POC from the vulnerability report must be blocked."""
with self.assertRaises(ValueError):
self.safe_eval('__import__("os").popen("id").read()', {})
# === FUNCTIONALITY TESTS: These expressions must WORK ===
def test_simple_math(self):
"""Basic arithmetic on item values must work."""
result = self.safe_eval("price * 1.1", {"price": 100})
self.assertAlmostEqual(result, 110.0)
def test_string_method(self):
"""String methods on item values must work."""
result = self.safe_eval("name.upper()", {"name": "hello"})
self.assertEqual(result, "HELLO")
def test_string_concatenation(self):
"""String concatenation must work."""
result = self.safe_eval("first + ' ' + last", {"first": "John", "last": "Doe"})
self.assertEqual(result, "John Doe")
def test_dict_access(self):
"""Dict-style field access must work."""
result = self.safe_eval("a + b", {"a": 10, "b": 20})
self.assertEqual(result, 30)
def test_builtin_functions(self):
"""Safe builtins like len, str, int must work."""
result = self.safe_eval("len(name)", {"name": "hello"})
self.assertEqual(result, 5)
def test_round_function(self):
"""round() must work for numeric formatting."""
result = self.safe_eval("round(price, 2)", {"price": 10.456})
self.assertEqual(result, 10.46)
class TestDeserializationAllowlist(unittest.TestCase):
"""Test that the deserialization allowlist blocks non-allowlisted types.
Tests the allowlist constant directly without importing heavy dependencies.
"""
def setUp(self):
"""Set up the allowlist (local copy of the constant)."""
self.allowed_types = {
"BrowserConfig", "CrawlerRunConfig", "HTTPCrawlerConfig",
"LLMConfig", "ProxyConfig", "GeolocationConfig",
"SeedingConfig", "VirtualScrollConfig", "LinkPreviewConfig",
"JsonCssExtractionStrategy", "JsonXPathExtractionStrategy",
"JsonLxmlExtractionStrategy", "LLMExtractionStrategy",
"CosineStrategy", "RegexExtractionStrategy",
"DefaultMarkdownGenerator",
"PruningContentFilter", "BM25ContentFilter", "LLMContentFilter",
"LXMLWebScrapingStrategy",
"RegexChunking",
"BFSDeepCrawlStrategy", "DFSDeepCrawlStrategy", "BestFirstCrawlingStrategy",
"FilterChain", "URLPatternFilter", "DomainFilter",
"ContentTypeFilter", "URLFilter", "SEOFilter", "ContentRelevanceFilter",
"KeywordRelevanceScorer", "URLScorer", "CompositeScorer",
"DomainAuthorityScorer", "FreshnessScorer", "PathDepthScorer",
"CacheMode", "MatchMode", "DisplayMode",
"MemoryAdaptiveDispatcher", "SemaphoreDispatcher",
"DefaultTableExtraction", "NoTableExtraction",
"RoundRobinProxyStrategy",
}
# === SECURITY TESTS: Non-allowlisted types must be BLOCKED ===
def test_arbitrary_class_not_in_allowlist(self):
"""AsyncWebCrawler must NOT be in the allowlist."""
self.assertNotIn("AsyncWebCrawler", self.allowed_types)
def test_crawler_hub_not_in_allowlist(self):
"""CrawlerHub must NOT be in the allowlist."""
self.assertNotIn("CrawlerHub", self.allowed_types)
def test_browser_profiler_not_in_allowlist(self):
"""BrowserProfiler must NOT be in the allowlist."""
self.assertNotIn("BrowserProfiler", self.allowed_types)
def test_docker_client_not_in_allowlist(self):
"""Crawl4aiDockerClient must NOT be in the allowlist."""
self.assertNotIn("Crawl4aiDockerClient", self.allowed_types)
# === FUNCTIONALITY TESTS: Allowlisted types must be present ===
def test_allowlist_has_core_config_types(self):
"""Core config types must be in the allowlist."""
required = {"BrowserConfig", "CrawlerRunConfig", "LLMConfig", "ProxyConfig"}
self.assertTrue(required.issubset(self.allowed_types))
def test_allowlist_has_extraction_strategies(self):
"""Extraction strategy types must be in the allowlist."""
required = {
"JsonCssExtractionStrategy", "LLMExtractionStrategy",
"RegexExtractionStrategy",
}
self.assertTrue(required.issubset(self.allowed_types))
def test_allowlist_has_enums(self):
"""Enum types must be in the allowlist."""
required = {"CacheMode", "MatchMode", "DisplayMode"}
self.assertTrue(required.issubset(self.allowed_types))
if __name__ == '__main__': if __name__ == '__main__':
print("=" * 60) print("=" * 60)
print("Crawl4AI Security Fixes - Unit Tests") print("Crawl4AI Security Fixes - Unit Tests")