Compare commits

..

2 Commits

Author SHA1 Message Date
AHMET YILMAZ
8e1362acf5 Fix async generator type mismatch in Docker Client streaming
- Fixed single_result_generator to properly handle async generators from deep crawl strategies
- Added proper __aiter__ checking to distinguish between CrawlResult and async generators
- Await and yield individual results from nested async generators
- Streaming functionality now works correctly for all patterns (SDK, Direct API, Docker Client)
- All 22 comprehensive tests passing with 100% success rate
- Live streaming test confirmed working end-to-end
2025-08-15 15:49:11 +08:00
AHMET YILMAZ
07e9d651fb feat: Comprehensive deep crawl streaming functionality restoration
🚀 Major Achievements:
-  ORJSON Serialization System: Complete implementation with custom handlers
-  Global Deprecated Properties System: DeprecatedPropertiesMixin for automatic exclusion
-  Deep Crawl Streaming: Fully restored with proper CrawlResultContainer handling
-  Docker Client Streaming: Fixed async generator patterns and result type checking
-  Server API Improvements: Correct method selection logic and streaming responses
-  Type Safety: Dict-as-logger detection to prevent crashes

📊 Test Results: 100% success rate on comprehensive test suite (10/10 tests passing)

🔧 Files Modified:
- crawl4ai/models.py: ORJSON + DeprecatedPropertiesMixin implementation
- deploy/docker/api.py: Streaming endpoint fixes + CrawlResultContainer handling
- deploy/docker/server.py: Production imports + ORJSON response handling
- crawl4ai/docker_client.py: Async generator streaming fixes
- crawl4ai/deep_crawling/bfs_strategy.py: Logger type safety
- .gitignore: Development environment cleanup
- tests/test_comprehensive_fixes.py: Rich-based comprehensive test suite

🎯 Impact: Production-ready deep crawl streaming functionality with comprehensive testing coverage
2025-08-15 15:31:36 +08:00
47 changed files with 1804 additions and 7384 deletions

10
.gitignore vendored
View File

@@ -1,6 +1,11 @@
# Scripts folder (private tools)
.scripts/
# Local development CLI (private)
local_dev.py
dev
DEV_CLI_README.md
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -270,4 +275,7 @@ docs/**/data
.codecat/
docs/apps/linkdin/debug*/
docs/apps/linkdin/samples/insights/*
docs/apps/linkdin/samples/insights/*
# Production checklist (local, not for version control)
PRODUCTION_CHECKLIST.md

View File

@@ -1,136 +0,0 @@
# Makefile for Crawl4AI Telemetry Testing
# Usage: make test-telemetry, make test-unit, make test-integration, etc.
.PHONY: help test-all test-telemetry test-unit test-integration test-privacy test-performance test-slow test-coverage test-verbose clean
# Default Python executable
PYTHON := .venv/bin/python
PYTEST := $(PYTHON) -m pytest
help:
@echo "Crawl4AI Telemetry Testing Commands:"
@echo ""
@echo " test-all Run all telemetry tests"
@echo " test-telemetry Run all telemetry tests (same as test-all)"
@echo " test-unit Run unit tests only"
@echo " test-integration Run integration tests only"
@echo " test-privacy Run privacy compliance tests only"
@echo " test-performance Run performance tests only"
@echo " test-slow Run slow tests only"
@echo " test-coverage Run tests with coverage report"
@echo " test-verbose Run tests with verbose output"
@echo " test-specific TEST= Run specific test (e.g., make test-specific TEST=test_telemetry.py::TestTelemetryConfig)"
@echo " clean Clean test artifacts"
@echo ""
@echo "Environment Variables:"
@echo " CRAWL4AI_TELEMETRY_TEST_REAL=1 Enable real telemetry during tests"
@echo " PYTEST_ARGS Additional pytest arguments"
# Run all telemetry tests
test-all test-telemetry:
$(PYTEST) tests/telemetry/ -v
# Run unit tests only
test-unit:
$(PYTEST) tests/telemetry/ -m "unit" -v
# Run integration tests only
test-integration:
$(PYTEST) tests/telemetry/ -m "integration" -v
# Run privacy compliance tests only
test-privacy:
$(PYTEST) tests/telemetry/ -m "privacy" -v
# Run performance tests only
test-performance:
$(PYTEST) tests/telemetry/ -m "performance" -v
# Run slow tests only
test-slow:
$(PYTEST) tests/telemetry/ -m "slow" -v
# Run tests with coverage
test-coverage:
$(PYTEST) tests/telemetry/ --cov=crawl4ai.telemetry --cov-report=html --cov-report=term-missing -v
# Run tests with verbose output
test-verbose:
$(PYTEST) tests/telemetry/ -vvv --tb=long
# Run specific test
test-specific:
$(PYTEST) tests/telemetry/$(TEST) -v
# Run tests excluding slow ones
test-fast:
$(PYTEST) tests/telemetry/ -m "not slow" -v
# Run tests in parallel
test-parallel:
$(PYTEST) tests/telemetry/ -n auto -v
# Clean test artifacts
clean:
rm -rf .pytest_cache/
rm -rf htmlcov/
rm -rf .coverage
find tests/ -name "*.pyc" -delete
find tests/ -name "__pycache__" -type d -exec rm -rf {} +
rm -rf tests/telemetry/__pycache__/
# Lint test files
lint-tests:
$(PYTHON) -m flake8 tests/telemetry/
$(PYTHON) -m pylint tests/telemetry/
# Type check test files
typecheck-tests:
$(PYTHON) -m mypy tests/telemetry/
# Run all quality checks
check-tests: lint-tests typecheck-tests test-unit
# Install test dependencies
install-test-deps:
$(PYTHON) -m pip install pytest pytest-asyncio pytest-mock pytest-cov pytest-xdist
# Setup development environment for testing
setup-dev:
$(PYTHON) -m pip install -e .
$(MAKE) install-test-deps
# Generate test report
test-report:
$(PYTEST) tests/telemetry/ --html=test-report.html --self-contained-html -v
# Run performance benchmarks
benchmark:
$(PYTEST) tests/telemetry/test_privacy_performance.py::TestTelemetryPerformance -v --benchmark-only
# Test different environments
test-docker-env:
CRAWL4AI_DOCKER=true $(PYTEST) tests/telemetry/ -k "docker" -v
test-cli-env:
$(PYTEST) tests/telemetry/ -k "cli" -v
# Validate telemetry implementation
validate:
@echo "Running telemetry validation suite..."
$(MAKE) test-unit
$(MAKE) test-privacy
$(MAKE) test-performance
@echo "Validation complete!"
# Debug failing tests
debug:
$(PYTEST) tests/telemetry/ --pdb -x -v
# Show test markers
show-markers:
$(PYTEST) --markers
# Show test collection (dry run)
show-tests:
$(PYTEST) tests/telemetry/ --collect-only -q

View File

@@ -27,11 +27,9 @@
Crawl4AI turns the web into clean, LLM ready Markdown for RAG, agents, and data pipelines. Fast, controllable, battle tested by a 50k+ star community.
[✨ Check out latest update v0.7.4](#-recent-updates)
[✨ Check out latest update v0.7.3](#-recent-updates)
✨ New in v0.7.4: Revolutionary LLM Table Extraction with intelligent chunking, enhanced concurrency fixes, memory management refactor, and critical stability improvements. [Release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.4.md)
✨ Recent v0.7.3: Undetected Browser Support, Multi-URL Configurations, Memory Monitoring, Enhanced Table Extraction, GitHub Sponsors. [Release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.3.md)
✨ New in v0.7.3: Undetected Browser Support, Multi-URL Configurations, Memory Monitoring, Enhanced Table Extraction, GitHub Sponsors. [Release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.3.md)
<details>
<summary>🤓 <strong>My Personal Story</strong></summary>
@@ -304,9 +302,9 @@ The new Docker implementation includes:
### Getting Started
```bash
# Pull and run the latest release
docker pull unclecode/crawl4ai:latest
docker run -d -p 11235:11235 --name crawl4ai --shm-size=1g unclecode/crawl4ai:latest
# Pull and run the latest release candidate
docker pull unclecode/crawl4ai:0.7.0
docker run -d -p 11235:11235 --name crawl4ai --shm-size=1g unclecode/crawl4ai:0.7.0
# Visit the playground at http://localhost:11235/playground
```
@@ -544,40 +542,6 @@ async def test_news_crawl():
## ✨ Recent Updates
<details>
<summary><strong>Version 0.7.4 Release Highlights - The Intelligent Table Extraction & Performance Update</strong></summary>
- **🚀 LLMTableExtraction**: Revolutionary table extraction with intelligent chunking for massive tables:
```python
from crawl4ai import LLMTableExtraction, LLMConfig
# Configure intelligent table extraction
table_strategy = LLMTableExtraction(
llm_config=LLMConfig(provider="openai/gpt-4.1-mini"),
enable_chunking=True, # Handle massive tables
chunk_token_threshold=5000, # Smart chunking threshold
overlap_threshold=100, # Maintain context between chunks
extraction_type="structured" # Get structured data output
)
config = CrawlerRunConfig(table_extraction_strategy=table_strategy)
result = await crawler.arun("https://complex-tables-site.com", config=config)
# Tables are automatically chunked, processed, and merged
for table in result.tables:
print(f"Extracted table: {len(table['data'])} rows")
```
- **⚡ Dispatcher Bug Fix**: Fixed sequential processing bottleneck in arun_many for fast-completing tasks
- **🧹 Memory Management Refactor**: Consolidated memory utilities into main utils module for cleaner architecture
- **🔧 Browser Manager Fixes**: Resolved race conditions in concurrent page creation with thread-safe locking
- **🔗 Advanced URL Processing**: Better handling of raw:// URLs and base tag link resolution
- **🛡️ Enhanced Proxy Support**: Flexible proxy configuration supporting both dict and string formats
[Full v0.7.4 Release Notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.4.md)
</details>
<details>
<summary><strong>Version 0.7.3 Release Highlights - The Multi-Config Intelligence Update</strong></summary>

View File

@@ -1,190 +0,0 @@
# Crawl4AI Telemetry Testing Implementation
## Overview
This document summarizes the comprehensive testing strategy implementation for Crawl4AI's opt-in telemetry system. The implementation provides thorough test coverage across unit tests, integration tests, privacy compliance tests, and performance tests.
## Implementation Summary
### 📊 Test Statistics
- **Total Tests**: 40 tests
- **Success Rate**: 100% (40/40 passing)
- **Test Categories**: 4 categories (Unit, Integration, Privacy, Performance)
- **Code Coverage**: 51% (625 statements, 308 missing)
### 🗂️ Test Structure
#### 1. **Unit Tests** (`tests/telemetry/test_telemetry.py`)
- `TestTelemetryConfig`: Configuration management and persistence
- `TestEnvironmentDetection`: CLI, Docker, API server environment detection
- `TestTelemetryManager`: Singleton pattern and exception capture
- `TestConsentManager`: Docker default behavior and environment overrides
- `TestPublicAPI`: Public enable/disable/status functions
- `TestIntegration`: Crawler exception capture integration
#### 2. **Integration Tests** (`tests/telemetry/test_integration.py`)
- `TestTelemetryCLI`: CLI command testing (status, enable, disable)
- `TestAsyncWebCrawlerIntegration`: Real crawler integration with decorators
- `TestDockerIntegration`: Docker environment-specific behavior
- `TestTelemetryProviderIntegration`: Sentry provider initialization and fallbacks
#### 3. **Privacy & Performance Tests** (`tests/telemetry/test_privacy_performance.py`)
- `TestTelemetryPrivacy`: Data sanitization and PII protection
- `TestTelemetryPerformance`: Decorator overhead measurement
- `TestTelemetryScalability`: Multiple and concurrent exception handling
#### 4. **Hello World Test** (`tests/telemetry/test_hello_world_telemetry.py`)
- Basic telemetry functionality validation
### 🔧 Testing Infrastructure
#### **Pytest Configuration** (`pytest.ini`)
```ini
[pytest]
testpaths = tests/telemetry
markers =
unit: Unit tests
integration: Integration tests
privacy: Privacy compliance tests
performance: Performance tests
asyncio_mode = auto
```
#### **Test Fixtures** (`tests/conftest.py`)
- `temp_config_dir`: Temporary configuration directory
- `enabled_telemetry_config`: Pre-configured enabled telemetry
- `disabled_telemetry_config`: Pre-configured disabled telemetry
- `mock_sentry_provider`: Mocked Sentry provider for testing
#### **Makefile Targets** (`Makefile.telemetry`)
```makefile
test-all: Run all telemetry tests
test-unit: Run unit tests only
test-integration: Run integration tests only
test-privacy: Run privacy tests only
test-performance: Run performance tests only
test-coverage: Run tests with coverage report
test-watch: Run tests in watch mode
test-parallel: Run tests in parallel
```
## 🎯 Key Features Tested
### Privacy Compliance
- ✅ No URLs captured in telemetry data
- ✅ No content captured in telemetry data
- ✅ No PII (personally identifiable information) captured
- ✅ Sanitized context only (error types, stack traces without content)
### Performance Impact
- ✅ Telemetry decorator overhead < 1ms
- ✅ Async decorator overhead < 1ms
- ✅ Disabled telemetry has minimal performance impact
- ✅ Configuration loading performance acceptable
- ✅ Multiple exception capture scalability
- ✅ Concurrent exception capture handling
### Integration Points
- ✅ CLI command integration (status, enable, disable)
- ✅ AsyncWebCrawler decorator integration
- ✅ Docker environment auto-detection
- ✅ Sentry provider initialization
- ✅ Graceful degradation without Sentry
- ✅ Environment variable overrides
### Core Functionality
- ✅ Configuration persistence and loading
- ✅ Consent management (Docker defaults, user prompts)
- ✅ Environment detection (CLI, Docker, Jupyter, etc.)
- ✅ Singleton pattern for TelemetryManager
- ✅ Exception capture and forwarding
- ✅ Provider abstraction (Sentry, Null)
## 🚀 Usage Examples
### Run All Tests
```bash
make -f Makefile.telemetry test-all
```
### Run Specific Test Categories
```bash
# Unit tests only
make -f Makefile.telemetry test-unit
# Integration tests only
make -f Makefile.telemetry test-integration
# Privacy tests only
make -f Makefile.telemetry test-privacy
# Performance tests only
make -f Makefile.telemetry test-performance
```
### Coverage Report
```bash
make -f Makefile.telemetry test-coverage
```
### Parallel Execution
```bash
make -f Makefile.telemetry test-parallel
```
## 📁 File Structure
```
tests/
├── conftest.py # Shared pytest fixtures
└── telemetry/
├── test_hello_world_telemetry.py # Basic functionality test
├── test_telemetry.py # Unit tests
├── test_integration.py # Integration tests
└── test_privacy_performance.py # Privacy & performance tests
# Configuration
pytest.ini # Pytest configuration with markers
Makefile.telemetry # Convenient test execution targets
```
## 🔍 Test Isolation & Mocking
### Environment Isolation
- Tests run in isolated temporary directories
- Environment variables are properly mocked/isolated
- No interference between test runs
- Clean state for each test
### Mock Strategies
- `unittest.mock` for external dependencies
- Temporary file systems for configuration testing
- Subprocess mocking for CLI command testing
- Time measurement for performance testing
## 📈 Coverage Analysis
Current test coverage: **51%** (625 statements)
### Well-Covered Areas:
- Core configuration management (78%)
- Telemetry initialization (69%)
- Environment detection (64%)
### Areas for Future Enhancement:
- Consent management UI (20% - interactive prompts)
- Sentry provider implementation (25% - network calls)
- Base provider abstractions (49% - error handling paths)
## 🎉 Implementation Success
The comprehensive testing strategy has been **successfully implemented** with:
-**100% test pass rate** (40/40 tests passing)
-**Complete test infrastructure** (fixtures, configuration, targets)
-**Privacy compliance verification** (no PII, URLs, or content captured)
-**Performance validation** (minimal overhead confirmed)
-**Integration testing** (CLI, Docker, AsyncWebCrawler)
-**CI/CD ready** (Makefile targets for automation)
The telemetry system now has robust test coverage ensuring reliability, privacy compliance, and performance characteristics while maintaining comprehensive validation of all core functionality.

View File

@@ -29,12 +29,6 @@ from .extraction_strategy import (
)
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
from .table_extraction import (
TableExtractionStrategy,
DefaultTableExtraction,
NoTableExtraction,
LLMTableExtraction,
)
from .content_filter_strategy import (
PruningContentFilter,
BM25ContentFilter,
@@ -162,9 +156,6 @@ __all__ = [
"ChunkingStrategy",
"RegexChunking",
"DefaultMarkdownGenerator",
"TableExtractionStrategy",
"DefaultTableExtraction",
"NoTableExtraction",
"RelevantContentFilter",
"PruningContentFilter",
"BM25ContentFilter",

View File

@@ -1,7 +1,7 @@
# crawl4ai/__version__.py
# This is the version that will be used for stable releases
__version__ = "0.7.4"
__version__ = "0.7.3"
# For nightly builds, this gets set during build process
__nightly_version__ = None

View File

@@ -20,7 +20,6 @@ from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .content_scraping_strategy import ContentScrapingStrategy, LXMLWebScrapingStrategy
from .deep_crawling import DeepCrawlStrategy
from .table_extraction import TableExtractionStrategy, DefaultTableExtraction
from .cache_context import CacheMode
from .proxy_strategy import ProxyRotationStrategy
@@ -983,8 +982,6 @@ class CrawlerRunConfig():
Default: False.
table_score_threshold (int): Minimum score threshold for processing a table.
Default: 7.
table_extraction (TableExtractionStrategy): Strategy to use for table extraction.
Default: DefaultTableExtraction with table_score_threshold.
# Virtual Scroll Parameters
virtual_scroll_config (VirtualScrollConfig or dict or None): Configuration for handling virtual scroll containers.
@@ -1111,7 +1108,6 @@ class CrawlerRunConfig():
image_description_min_word_threshold: int = IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
image_score_threshold: int = IMAGE_SCORE_THRESHOLD,
table_score_threshold: int = 7,
table_extraction: TableExtractionStrategy = None,
exclude_external_images: bool = False,
exclude_all_images: bool = False,
# Link and Domain Handling Parameters
@@ -1228,12 +1224,6 @@ class CrawlerRunConfig():
self.exclude_external_images = exclude_external_images
self.exclude_all_images = exclude_all_images
self.table_score_threshold = table_score_threshold
# Table extraction strategy (default to DefaultTableExtraction if not specified)
if table_extraction is None:
self.table_extraction = DefaultTableExtraction(table_score_threshold=table_score_threshold)
else:
self.table_extraction = table_extraction
# Link and Domain Handling Parameters
self.exclude_social_media_domains = (
@@ -1505,7 +1495,6 @@ class CrawlerRunConfig():
"image_score_threshold", IMAGE_SCORE_THRESHOLD
),
table_score_threshold=kwargs.get("table_score_threshold", 7),
table_extraction=kwargs.get("table_extraction", None),
exclude_all_images=kwargs.get("exclude_all_images", False),
exclude_external_images=kwargs.get("exclude_external_images", False),
# Link and Domain Handling Parameters
@@ -1614,7 +1603,6 @@ class CrawlerRunConfig():
"image_description_min_word_threshold": self.image_description_min_word_threshold,
"image_score_threshold": self.image_score_threshold,
"table_score_threshold": self.table_score_threshold,
"table_extraction": self.table_extraction,
"exclude_all_images": self.exclude_all_images,
"exclude_external_images": self.exclude_external_images,
"exclude_social_media_domains": self.exclude_social_media_domains,

View File

@@ -824,7 +824,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Error:
visibility_info = await self.check_visibility(page)
if self.browser_config.verbose:
if self.browser_config.config.verbose:
self.logger.debug(
message="Body visibility info: {info}",
tag="DEBUG",

View File

@@ -2129,265 +2129,3 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return True # Default to scrolling if check fails
####################################################################################################
# HTTP Crawler Strategy
####################################################################################################
class HTTPCrawlerError(Exception):
"""Base error class for HTTP crawler specific exceptions"""
pass
class ConnectionTimeoutError(HTTPCrawlerError):
"""Raised when connection timeout occurs"""
pass
class HTTPStatusError(HTTPCrawlerError):
"""Raised for unexpected status codes"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
super().__init__(f"HTTP {status_code}: {message}")
class AsyncHTTPCrawlerStrategy(AsyncCrawlerStrategy):
"""
Fast, lightweight HTTP-only crawler strategy optimized for memory efficiency.
"""
__slots__ = ('logger', 'max_connections', 'dns_cache_ttl', 'chunk_size', '_session', 'hooks', 'browser_config')
DEFAULT_TIMEOUT: Final[int] = 30
DEFAULT_CHUNK_SIZE: Final[int] = 64 * 1024
DEFAULT_MAX_CONNECTIONS: Final[int] = min(32, (os.cpu_count() or 1) * 4)
DEFAULT_DNS_CACHE_TTL: Final[int] = 300
VALID_SCHEMES: Final = frozenset({'http', 'https', 'file', 'raw'})
_BASE_HEADERS: Final = MappingProxyType({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def __init__(
self,
browser_config: Optional[HTTPCrawlerConfig] = None,
logger: Optional[AsyncLogger] = None,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
dns_cache_ttl: int = DEFAULT_DNS_CACHE_TTL,
chunk_size: int = DEFAULT_CHUNK_SIZE
):
"""Initialize the HTTP crawler with config"""
self.browser_config = browser_config or HTTPCrawlerConfig()
self.logger = logger
self.max_connections = max_connections
self.dns_cache_ttl = dns_cache_ttl
self.chunk_size = chunk_size
self._session: Optional[aiohttp.ClientSession] = None
self.hooks = {
k: partial(self._execute_hook, k)
for k in ('before_request', 'after_request', 'on_error')
}
# Set default hooks
self.set_hook('before_request', lambda *args, **kwargs: None)
self.set_hook('after_request', lambda *args, **kwargs: None)
self.set_hook('on_error', lambda *args, **kwargs: None)
async def __aenter__(self) -> AsyncHTTPCrawlerStrategy:
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@contextlib.asynccontextmanager
async def _session_context(self):
try:
if not self._session:
await self.start()
yield self._session
finally:
pass
def set_hook(self, hook_type: str, hook_func: Callable) -> None:
if hook_type in self.hooks:
self.hooks[hook_type] = partial(self._execute_hook, hook_type, hook_func)
else:
raise ValueError(f"Invalid hook type: {hook_type}")
async def _execute_hook(
self,
hook_type: str,
hook_func: Callable,
*args: Any,
**kwargs: Any
) -> Any:
if asyncio.iscoroutinefunction(hook_func):
return await hook_func(*args, **kwargs)
return hook_func(*args, **kwargs)
async def start(self) -> None:
if not self._session:
connector = aiohttp.TCPConnector(
limit=self.max_connections,
ttl_dns_cache=self.dns_cache_ttl,
use_dns_cache=True,
force_close=False
)
self._session = aiohttp.ClientSession(
headers=dict(self._BASE_HEADERS),
connector=connector,
timeout=ClientTimeout(total=self.DEFAULT_TIMEOUT)
)
async def close(self) -> None:
if self._session and not self._session.closed:
try:
await asyncio.wait_for(self._session.close(), timeout=5.0)
except asyncio.TimeoutError:
if self.logger:
self.logger.warning(
message="Session cleanup timed out",
tag="CLEANUP"
)
finally:
self._session = None
async def _stream_file(self, path: str) -> AsyncGenerator[memoryview, None]:
async with aiofiles.open(path, mode='rb') as f:
while chunk := await f.read(self.chunk_size):
yield memoryview(chunk)
async def _handle_file(self, path: str) -> AsyncCrawlResponse:
if not os.path.exists(path):
raise FileNotFoundError(f"Local file not found: {path}")
chunks = []
async for chunk in self._stream_file(path):
chunks.append(chunk.tobytes().decode('utf-8', errors='replace'))
return AsyncCrawlResponse(
html=''.join(chunks),
response_headers={},
status_code=200
)
async def _handle_raw(self, content: str) -> AsyncCrawlResponse:
return AsyncCrawlResponse(
html=content,
response_headers={},
status_code=200
)
async def _handle_http(
self,
url: str,
config: CrawlerRunConfig
) -> AsyncCrawlResponse:
async with self._session_context() as session:
timeout = ClientTimeout(
total=config.page_timeout or self.DEFAULT_TIMEOUT,
connect=10,
sock_read=30
)
headers = dict(self._BASE_HEADERS)
if self.browser_config.headers:
headers.update(self.browser_config.headers)
request_kwargs = {
'timeout': timeout,
'allow_redirects': self.browser_config.follow_redirects,
'ssl': self.browser_config.verify_ssl,
'headers': headers
}
if self.browser_config.method == "POST":
if self.browser_config.data:
request_kwargs['data'] = self.browser_config.data
if self.browser_config.json:
request_kwargs['json'] = self.browser_config.json
await self.hooks['before_request'](url, request_kwargs)
try:
async with session.request(self.browser_config.method, url, **request_kwargs) as response:
content = memoryview(await response.read())
if not (200 <= response.status < 300):
raise HTTPStatusError(
response.status,
f"Unexpected status code for {url}"
)
encoding = response.charset
if not encoding:
encoding = chardet.detect(content.tobytes())['encoding'] or 'utf-8'
result = AsyncCrawlResponse(
html=content.tobytes().decode(encoding, errors='replace'),
response_headers=dict(response.headers),
status_code=response.status,
redirected_url=str(response.url)
)
await self.hooks['after_request'](result)
return result
except aiohttp.ServerTimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except aiohttp.ClientConnectorError as e:
await self.hooks['on_error'](e)
raise ConnectionError(f"Connection failed: {str(e)}")
except aiohttp.ClientError as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP client error: {str(e)}")
except asyncio.exceptions.TimeoutError as e:
await self.hooks['on_error'](e)
raise ConnectionTimeoutError(f"Request timed out: {str(e)}")
except Exception as e:
await self.hooks['on_error'](e)
raise HTTPCrawlerError(f"HTTP request failed: {str(e)}")
async def crawl(
self,
url: str,
config: Optional[CrawlerRunConfig] = None,
**kwargs
) -> AsyncCrawlResponse:
config = config or CrawlerRunConfig.from_kwargs(kwargs)
parsed = urlparse(url)
scheme = parsed.scheme.rstrip('/')
if scheme not in self.VALID_SCHEMES:
raise ValueError(f"Unsupported URL scheme: {scheme}")
try:
if scheme == 'file':
return await self._handle_file(parsed.path)
elif scheme == 'raw':
return await self._handle_raw(parsed.path)
else: # http or https
return await self._handle_http(url, config)
except Exception as e:
if self.logger:
self.logger.error(
message="Crawl failed: {error}",
tag="CRAWL",
params={"error": str(e), "url": url}
)
raise

View File

@@ -22,7 +22,7 @@ from urllib.parse import urlparse
import random
from abc import ABC, abstractmethod
from .utils import get_true_memory_usage_percent
from .memory_utils import get_true_memory_usage_percent
class RateLimiter:

View File

@@ -49,9 +49,6 @@ from .utils import (
preprocess_html_for_schema,
)
# Import telemetry
from .telemetry import capture_exception, telemetry_decorator, async_telemetry_decorator
class AsyncWebCrawler:
"""
@@ -204,7 +201,6 @@ class AsyncWebCrawler:
"""异步空上下文管理器"""
yield
@async_telemetry_decorator
async def arun(
self,
url: str,
@@ -434,7 +430,6 @@ class AsyncWebCrawler:
)
)
@async_telemetry_decorator
async def aprocess_html(
self,
url: str,

View File

@@ -1385,97 +1385,6 @@ def profiles_cmd():
# Run interactive profile manager
anyio.run(manage_profiles)
@cli.group("telemetry")
def telemetry_cmd():
"""Manage telemetry settings for Crawl4AI
Telemetry helps improve Crawl4AI by sending anonymous crash reports.
No personal data or crawled content is ever collected.
"""
pass
@telemetry_cmd.command("enable")
@click.option("--email", "-e", help="Optional email for follow-up on critical issues")
@click.option("--always/--once", default=True, help="Always send errors (default) or just once")
def telemetry_enable_cmd(email: Optional[str], always: bool):
"""Enable telemetry to help improve Crawl4AI
Examples:
crwl telemetry enable # Enable telemetry
crwl telemetry enable --email me@ex.com # Enable with email
crwl telemetry enable --once # Send only next error
"""
from crawl4ai.telemetry import enable
try:
enable(email=email, always=always, once=not always)
console.print("[green]✅ Telemetry enabled successfully[/green]")
if email:
console.print(f" Email: {email}")
console.print(f" Mode: {'Always send errors' if always else 'Send next error only'}")
except Exception as e:
console.print(f"[red]❌ Failed to enable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("disable")
def telemetry_disable_cmd():
"""Disable telemetry
Stop sending anonymous crash reports to help improve Crawl4AI.
"""
from crawl4ai.telemetry import disable
try:
disable()
console.print("[green]✅ Telemetry disabled successfully[/green]")
except Exception as e:
console.print(f"[red]❌ Failed to disable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("status")
def telemetry_status_cmd():
"""Show current telemetry status
Display whether telemetry is enabled and current settings.
"""
from crawl4ai.telemetry import status
try:
info = status()
# Create status table
table = Table(title="Telemetry Status", show_header=False)
table.add_column("Setting", style="cyan")
table.add_column("Value")
# Status emoji
status_icon = "" if info['enabled'] else ""
table.add_row("Status", f"{status_icon} {'Enabled' if info['enabled'] else 'Disabled'}")
table.add_row("Consent", info['consent'].replace('_', ' ').title())
if info['email']:
table.add_row("Email", info['email'])
table.add_row("Environment", info['environment'])
table.add_row("Provider", info['provider'])
if info['errors_sent'] > 0:
table.add_row("Errors Sent", str(info['errors_sent']))
console.print(table)
# Add helpful messages
if not info['enabled']:
console.print("\n[yellow] Telemetry is disabled. Enable it to help improve Crawl4AI:[/yellow]")
console.print(" [dim]crwl telemetry enable[/dim]")
except Exception as e:
console.print(f"[red]❌ Failed to get telemetry status: {e}[/red]")
sys.exit(1)
@cli.command(name="")
@click.argument("url", required=False)
@click.option("--example", is_flag=True, help="Show usage examples")

View File

@@ -586,6 +586,117 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
return root
def is_data_table(self, table: etree.Element, **kwargs) -> bool:
score = 0
# Check for thead and tbody
has_thead = len(table.xpath(".//thead")) > 0
has_tbody = len(table.xpath(".//tbody")) > 0
if has_thead:
score += 2
if has_tbody:
score += 1
# Check for th elements
th_count = len(table.xpath(".//th"))
if th_count > 0:
score += 2
if has_thead or table.xpath(".//tr[1]/th"):
score += 1
# Check for nested tables
if len(table.xpath(".//table")) > 0:
score -= 3
# Role attribute check
role = table.get("role", "").lower()
if role in {"presentation", "none"}:
score -= 3
# Column consistency
rows = table.xpath(".//tr")
if not rows:
return False
col_counts = [len(row.xpath(".//td|.//th")) for row in rows]
avg_cols = sum(col_counts) / len(col_counts)
variance = sum((c - avg_cols)**2 for c in col_counts) / len(col_counts)
if variance < 1:
score += 2
# Caption and summary
if table.xpath(".//caption"):
score += 2
if table.get("summary"):
score += 1
# Text density
total_text = sum(len(''.join(cell.itertext()).strip()) for row in rows for cell in row.xpath(".//td|.//th"))
total_tags = sum(1 for _ in table.iterdescendants())
text_ratio = total_text / (total_tags + 1e-5)
if text_ratio > 20:
score += 3
elif text_ratio > 10:
score += 2
# Data attributes
data_attrs = sum(1 for attr in table.attrib if attr.startswith('data-'))
score += data_attrs * 0.5
# Size check
if avg_cols >= 2 and len(rows) >= 2:
score += 2
threshold = kwargs.get("table_score_threshold", 7)
return score >= threshold
def extract_table_data(self, table: etree.Element) -> dict:
caption = table.xpath(".//caption/text()")
caption = caption[0].strip() if caption else ""
summary = table.get("summary", "").strip()
# Extract headers with colspan handling
headers = []
thead_rows = table.xpath(".//thead/tr")
if thead_rows:
header_cells = thead_rows[0].xpath(".//th")
for cell in header_cells:
text = cell.text_content().strip()
colspan = int(cell.get("colspan", 1))
headers.extend([text] * colspan)
else:
first_row = table.xpath(".//tr[1]")
if first_row:
for cell in first_row[0].xpath(".//th|.//td"):
text = cell.text_content().strip()
colspan = int(cell.get("colspan", 1))
headers.extend([text] * colspan)
# Extract rows with colspan handling
rows = []
for row in table.xpath(".//tr[not(ancestor::thead)]"):
row_data = []
for cell in row.xpath(".//td"):
text = cell.text_content().strip()
colspan = int(cell.get("colspan", 1))
row_data.extend([text] * colspan)
if row_data:
rows.append(row_data)
# Align rows with headers
max_columns = len(headers) if headers else (max(len(row) for row in rows) if rows else 0)
aligned_rows = []
for row in rows:
aligned = row[:max_columns] + [''] * (max_columns - len(row))
aligned_rows.append(aligned)
if not headers:
headers = [f"Column {i+1}" for i in range(max_columns)]
return {
"headers": headers,
"rows": aligned_rows,
"caption": caption,
"summary": summary,
}
def _scrap(
self,
@@ -728,16 +839,12 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
**kwargs,
)
# Extract tables using the table extraction strategy if provided
if 'table' not in excluded_tags:
table_extraction = kwargs.get('table_extraction')
if table_extraction:
# Pass logger to the strategy if it doesn't have one
if not table_extraction.logger:
table_extraction.logger = self.logger
# Extract tables using the strategy
extracted_tables = table_extraction.extract_tables(body, **kwargs)
media["tables"].extend(extracted_tables)
tables = body.xpath(".//table")
for table in tables:
if self.is_data_table(table, **kwargs):
table_data = self.extract_table_data(table)
media["tables"].append(table_data)
# Handle only_text option
if kwargs.get("only_text", False):

View File

@@ -38,7 +38,14 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.include_external = include_external
self.score_threshold = score_threshold
self.max_pages = max_pages
self.logger = logger or logging.getLogger(__name__)
# Type check for logger
if isinstance(logger, dict):
logging.getLogger(__name__).warning(
"BFSDeepCrawlStrategy received a dict as logger; falling back to default logger."
)
self.logger = logging.getLogger(__name__)
else:
self.logger = logger or logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0

View File

@@ -30,7 +30,7 @@ class Crawl4aiDockerClient:
def __init__(
self,
base_url: str = "http://localhost:8000",
timeout: float = 30.0,
timeout: float = 600.0, # Increased to 10 minutes for crawling operations
verify_ssl: bool = True,
verbose: bool = True,
log_file: Optional[str] = None
@@ -113,21 +113,12 @@ class Crawl4aiDockerClient:
self.logger.info(f"Crawling {len(urls)} URLs {'(streaming)' if is_streaming else ''}", tag="CRAWL")
if is_streaming:
async def stream_results() -> AsyncGenerator[CrawlResult, None]:
async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
result = json.loads(line)
if "error" in result:
self.logger.error_status(url=result.get("url", "unknown"), error=result["error"])
continue
self.logger.url_status(url=result.get("url", "unknown"), success=True, timing=result.get("timing", 0.0))
if result.get("status") == "completed":
continue
else:
yield CrawlResult(**result)
return stream_results()
# For streaming, we need to return the async generator properly
# The caller should be able to do: async for result in await client.crawl(...)
async def streaming_wrapper():
async for result in self._stream_crawl_results(data):
yield result
return streaming_wrapper()
response = await self._request("POST", "/crawl", json=data)
result_data = response.json()
@@ -138,6 +129,35 @@ class Crawl4aiDockerClient:
self.logger.success(f"Crawl completed with {len(results)} results", tag="CRAWL")
return results[0] if len(results) == 1 else results
async def _stream_crawl_results(self, data: Dict[str, Any]) -> AsyncGenerator[CrawlResult, None]:
"""Internal method to handle streaming crawl results."""
async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
try:
result = json.loads(line)
if "error" in result:
self.logger.error_status(url=result.get("url", "unknown"), error=result["error"])
continue
# Check if this is a crawl result (has required fields)
if "url" in result and "success" in result:
self.logger.url_status(url=result.get("url", "unknown"), success=result.get("success", False), timing=result.get("timing", 0.0))
# Create CrawlResult object properly
crawl_result = CrawlResult(**result)
yield crawl_result
# Skip status-only messages
elif result.get("status") == "completed":
continue
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse streaming response: {e}", tag="STREAM")
continue
except Exception as e:
self.logger.error(f"Error processing streaming result: {e}", tag="STREAM")
continue
async def get_schema(self) -> Dict[str, Any]:
"""Retrieve configuration schemas."""
response = await self._request("GET", "/schema")

79
crawl4ai/memory_utils.py Normal file
View File

@@ -0,0 +1,79 @@
import psutil
import platform
import subprocess
from typing import Tuple
def get_true_available_memory_gb() -> float:
"""Get truly available memory including inactive pages (cross-platform)"""
vm = psutil.virtual_memory()
if platform.system() == 'Darwin': # macOS
# On macOS, we need to include inactive memory too
try:
# Use vm_stat to get accurate values
result = subprocess.run(['vm_stat'], capture_output=True, text=True)
lines = result.stdout.split('\n')
page_size = 16384 # macOS page size
pages = {}
for line in lines:
if 'Pages free:' in line:
pages['free'] = int(line.split()[-1].rstrip('.'))
elif 'Pages inactive:' in line:
pages['inactive'] = int(line.split()[-1].rstrip('.'))
elif 'Pages speculative:' in line:
pages['speculative'] = int(line.split()[-1].rstrip('.'))
elif 'Pages purgeable:' in line:
pages['purgeable'] = int(line.split()[-1].rstrip('.'))
# Calculate total available (free + inactive + speculative + purgeable)
total_available_pages = (
pages.get('free', 0) +
pages.get('inactive', 0) +
pages.get('speculative', 0) +
pages.get('purgeable', 0)
)
available_gb = (total_available_pages * page_size) / (1024**3)
return available_gb
except:
# Fallback to psutil
return vm.available / (1024**3)
else:
# For Windows and Linux, psutil.available is accurate
return vm.available / (1024**3)
def get_true_memory_usage_percent() -> float:
"""
Get memory usage percentage that accounts for platform differences.
Returns:
float: Memory usage percentage (0-100)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
# Calculate used percentage based on truly available memory
used_percent = 100.0 * (total_gb - available_gb) / total_gb
# Ensure it's within valid range
return max(0.0, min(100.0, used_percent))
def get_memory_stats() -> Tuple[float, float, float]:
"""
Get comprehensive memory statistics.
Returns:
Tuple[float, float, float]: (used_percent, available_gb, total_gb)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
used_percent = get_true_memory_usage_percent()
return used_percent, available_gb, total_gb

View File

@@ -1,4 +1,36 @@
from pydantic import BaseModel, HttpUrl, PrivateAttr, Field
"""
Crawl4AI Models Module
This module contains Pydantic models used throughout the Crawl4AI library.
Key Features:
- ORJSONModel: Base model with ORJSON serialization support
- DeprecatedPropertiesMixin: Global system for handling deprecated properties
- CrawlResult: Main result model with backward compatibility support
Deprecated Properties System:
The DeprecatedPropertiesMixin provides a global way to handle deprecated properties
across all models. Instead of manually excluding deprecated properties in each
model_dump() call, you can simply override the get_deprecated_properties() method:
Example:
class MyModel(ORJSONModel):
name: str
old_field: Optional[str] = None
def get_deprecated_properties(self) -> set[str]:
return {'old_field', 'another_deprecated_field'}
@property
def old_field(self):
raise AttributeError("old_field is deprecated, use name instead")
The system automatically excludes these properties from serialization, preventing
property objects from appearing in JSON output.
"""
from pydantic import BaseModel, ConfigDict,HttpUrl, PrivateAttr, Field
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from typing import AsyncGenerator
from typing import Generic, TypeVar
@@ -8,7 +40,7 @@ from .ssl_certificate import SSLCertificate
from datetime import datetime
from datetime import timedelta
import orjson
###############################
# Dispatcher Models
###############################
@@ -91,7 +123,122 @@ class TokenUsage:
completion_tokens_details: Optional[dict] = None
prompt_tokens_details: Optional[dict] = None
class UrlModel(BaseModel):
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
class DeprecatedPropertiesMixin:
"""
Mixin to handle deprecated properties in Pydantic models.
Classes that inherit from this mixin can define deprecated properties
that will be automatically excluded from serialization.
Usage:
1. Override the get_deprecated_properties() method to return a set of deprecated property names
2. The model_dump method will automatically exclude these properties
Example:
class MyModel(ORJSONModel):
def get_deprecated_properties(self) -> set[str]:
return {'old_field', 'legacy_property'}
name: str
old_field: Optional[str] = None # Field definition
@property
def old_field(self): # Property that overrides the field
raise AttributeError("old_field is deprecated, use name instead")
"""
def get_deprecated_properties(self) -> set[str]:
"""
Get deprecated property names for this model.
Override this method in subclasses to define deprecated properties.
Returns:
set[str]: Set of deprecated property names
"""
return set()
@classmethod
def get_all_deprecated_properties(cls) -> set[str]:
"""
Get all deprecated properties from this class and all parent classes.
Returns:
set[str]: Set of all deprecated property names
"""
deprecated_props = set()
# Create an instance to call the instance method
try:
# Try to create a dummy instance to get deprecated properties
dummy_instance = cls.__new__(cls)
deprecated_props.update(dummy_instance.get_deprecated_properties())
except Exception:
# If we can't create an instance, check for class-level definitions
pass
# Also check parent classes
for klass in cls.__mro__:
if hasattr(klass, 'get_deprecated_properties') and klass != DeprecatedPropertiesMixin:
try:
dummy_instance = klass.__new__(klass)
deprecated_props.update(dummy_instance.get_deprecated_properties())
except Exception:
pass
return deprecated_props
def model_dump(self, *args, **kwargs):
"""
Override model_dump to automatically exclude deprecated properties.
This method:
1. Gets the existing exclude set from kwargs
2. Adds all deprecated properties defined in get_deprecated_properties()
3. Calls the parent model_dump with the updated exclude set
"""
# Get the default exclude set, or create empty set if None
exclude = kwargs.get('exclude', set())
if exclude is None:
exclude = set()
elif not isinstance(exclude, set):
exclude = set(exclude) if exclude else set()
# Add deprecated properties for this instance
exclude.update(self.get_deprecated_properties())
kwargs['exclude'] = exclude
return super().model_dump(*args, **kwargs)
class ORJSONModel(DeprecatedPropertiesMixin, BaseModel):
model_config = ConfigDict(
ser_json_timedelta="iso8601", # Optional: format timedelta
ser_json_bytes="utf8", # Optional: bytes → UTF-8 string
)
def model_dump_json(self, **kwargs) -> bytes:
"""Custom JSON serialization using orjson"""
return orjson.dumps(self.model_dump(**kwargs), default=orjson_default)
@classmethod
def model_validate_json(cls, json_data: Union[str, bytes], **kwargs):
"""Custom JSON deserialization using orjson"""
if isinstance(json_data, str):
json_data = json_data.encode()
return cls.model_validate(orjson.loads(json_data), **kwargs)
class UrlModel(ORJSONModel):
url: HttpUrl
forced: bool = False
@@ -108,7 +255,7 @@ class TraversalStats:
total_depth_reached: int = 0
current_depth: int = 0
class DispatchResult(BaseModel):
class DispatchResult(ORJSONModel):
task_id: str
memory_usage: float
peak_memory: float
@@ -116,7 +263,7 @@ class DispatchResult(BaseModel):
end_time: Union[datetime, float]
error_message: str = ""
class MarkdownGenerationResult(BaseModel):
class MarkdownGenerationResult(ORJSONModel):
raw_markdown: str
markdown_with_citations: str
references_markdown: str
@@ -126,7 +273,7 @@ class MarkdownGenerationResult(BaseModel):
def __str__(self):
return self.raw_markdown
class CrawlResult(BaseModel):
class CrawlResult(ORJSONModel):
url: str
html: str
fit_html: Optional[str] = None
@@ -156,6 +303,10 @@ class CrawlResult(BaseModel):
class Config:
arbitrary_types_allowed = True
def get_deprecated_properties(self) -> set[str]:
"""Define deprecated properties that should be excluded from serialization."""
return {'fit_html', 'fit_markdown', 'markdown_v2'}
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
# and model_dump override all exist to support a smooth transition from markdown as a string
# to markdown as a MarkdownGenerationResult object, while maintaining backward compatibility.
@@ -245,14 +396,16 @@ class CrawlResult(BaseModel):
1. PrivateAttr fields are excluded from serialization by default
2. We need to maintain backward compatibility by including the 'markdown' field
in the serialized output
3. We're transitioning from 'markdown_v2' to enhancing 'markdown' to hold
the same type of data
3. Uses the DeprecatedPropertiesMixin to automatically exclude deprecated properties
Future developers: This method ensures that the markdown content is properly
serialized despite being stored in a private attribute. If the serialization
requirements change, this is where you would update the logic.
serialized despite being stored in a private attribute. The deprecated properties
are automatically handled by the mixin.
"""
# Use the parent class method which handles deprecated properties automatically
result = super().model_dump(*args, **kwargs)
# Add the markdown content if it exists
if self._markdown is not None:
result["markdown"] = self._markdown.model_dump()
return result
@@ -307,7 +460,7 @@ RunManyReturn = Union[
# 1. Replace the private attribute and property with a standard field
# 2. Update any serialization logic that might depend on the current behavior
class AsyncCrawlResponse(BaseModel):
class AsyncCrawlResponse(ORJSONModel):
html: str
response_headers: Dict[str, str]
js_execution_result: Optional[Dict[str, Any]] = None
@@ -328,7 +481,7 @@ class AsyncCrawlResponse(BaseModel):
###############################
# Scraping Models
###############################
class MediaItem(BaseModel):
class MediaItem(ORJSONModel):
src: Optional[str] = ""
data: Optional[str] = ""
alt: Optional[str] = ""
@@ -340,7 +493,7 @@ class MediaItem(BaseModel):
width: Optional[int] = None
class Link(BaseModel):
class Link(ORJSONModel):
href: Optional[str] = ""
text: Optional[str] = ""
title: Optional[str] = ""
@@ -353,7 +506,7 @@ class Link(BaseModel):
total_score: Optional[float] = None # Combined score from intrinsic and contextual scores
class Media(BaseModel):
class Media(ORJSONModel):
images: List[MediaItem] = []
videos: List[
MediaItem
@@ -364,12 +517,12 @@ class Media(BaseModel):
tables: List[Dict] = [] # Table data extracted from HTML tables
class Links(BaseModel):
class Links(ORJSONModel):
internal: List[Link] = []
external: List[Link] = []
class ScrapingResult(BaseModel):
class ScrapingResult(ORJSONModel):
cleaned_html: str
success: bool
media: Media = Media()

File diff suppressed because it is too large Load Diff

View File

@@ -1,440 +0,0 @@
"""
Crawl4AI Telemetry Module.
Provides opt-in error tracking to improve stability.
"""
import os
import sys
import functools
import traceback
from typing import Optional, Any, Dict, Callable, Type
from contextlib import contextmanager, asynccontextmanager
from .base import TelemetryProvider, NullProvider
from .config import TelemetryConfig, TelemetryConsent
from .consent import ConsentManager
from .environment import Environment, EnvironmentDetector
class TelemetryManager:
"""
Main telemetry manager for Crawl4AI.
Coordinates provider, config, and consent management.
"""
_instance: Optional['TelemetryManager'] = None
def __init__(self):
"""Initialize telemetry manager."""
self.config = TelemetryConfig()
self.consent_manager = ConsentManager(self.config)
self.environment = EnvironmentDetector.detect()
self._provider: Optional[TelemetryProvider] = None
self._initialized = False
self._error_count = 0
self._max_errors = 100 # Prevent telemetry spam
# Load provider based on config
self._setup_provider()
@classmethod
def get_instance(cls) -> 'TelemetryManager':
"""
Get singleton instance of telemetry manager.
Returns:
TelemetryManager instance
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _setup_provider(self) -> None:
"""Setup telemetry provider based on configuration."""
# Update config from environment
self.config.update_from_env()
# Check if telemetry is enabled
if not self.config.is_enabled():
self._provider = NullProvider()
return
# Try to load Sentry provider
try:
from .providers.sentry import SentryProvider
# Get Crawl4AI version for release tracking
try:
from crawl4ai import __version__
release = f"crawl4ai@{__version__}"
except ImportError:
release = "crawl4ai@unknown"
self._provider = SentryProvider(
environment=self.environment.value,
release=release
)
# Initialize provider
if not self._provider.initialize():
# Fallback to null provider if init fails
self._provider = NullProvider()
except ImportError:
# Sentry not installed - use null provider
self._provider = NullProvider()
self._initialized = True
def capture_exception(
self,
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture and send an exception.
Args:
exception: The exception to capture
context: Optional additional context
Returns:
True if exception was sent
"""
# Check error count limit
if self._error_count >= self._max_errors:
return False
# Check consent on first error
if self._error_count == 0:
consent = self.consent_manager.check_and_prompt()
# Update provider if consent changed
if consent == TelemetryConsent.DENIED:
self._provider = NullProvider()
return False
elif consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]:
if isinstance(self._provider, NullProvider):
self._setup_provider()
# Check if we should send this error
if not self.config.should_send_current():
return False
# Prepare context
full_context = EnvironmentDetector.get_environment_context()
if context:
full_context.update(context)
# Add user email if available
email = self.config.get_email()
if email:
full_context['email'] = email
# Add source info
full_context['source'] = 'crawl4ai'
# Send exception
try:
if self._provider:
success = self._provider.send_exception(exception, full_context)
if success:
self._error_count += 1
return success
except Exception:
# Telemetry itself failed - ignore
pass
return False
def capture_message(
self,
message: str,
level: str = 'info',
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture a message event.
Args:
message: Message to send
level: Message level (info, warning, error)
context: Optional context
Returns:
True if message was sent
"""
if not self.config.is_enabled():
return False
payload = {
'level': level,
'message': message
}
if context:
payload.update(context)
try:
if self._provider:
return self._provider.send_event(message, payload)
except Exception:
pass
return False
def enable(
self,
email: Optional[str] = None,
always: bool = True,
once: bool = False
) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors
once: If True, send only next error
"""
if once:
consent = TelemetryConsent.ONCE
elif always:
consent = TelemetryConsent.ALWAYS
else:
consent = TelemetryConsent.ALWAYS
self.config.set_consent(consent, email)
self._setup_provider()
print("✅ Telemetry enabled")
if email:
print(f" Email: {email}")
print(f" Mode: {'once' if once else 'always'}")
def disable(self) -> None:
"""Disable telemetry."""
self.config.set_consent(TelemetryConsent.DENIED)
self._provider = NullProvider()
print("✅ Telemetry disabled")
def status(self) -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return {
'enabled': self.config.is_enabled(),
'consent': self.config.get_consent().value,
'email': self.config.get_email(),
'environment': self.environment.value,
'provider': type(self._provider).__name__ if self._provider else 'None',
'errors_sent': self._error_count
}
def flush(self) -> None:
"""Flush any pending telemetry data."""
if self._provider:
self._provider.flush()
def shutdown(self) -> None:
"""Shutdown telemetry."""
if self._provider:
self._provider.shutdown()
# Global instance
_telemetry_manager: Optional[TelemetryManager] = None
def get_telemetry() -> TelemetryManager:
"""
Get global telemetry manager instance.
Returns:
TelemetryManager instance
"""
global _telemetry_manager
if _telemetry_manager is None:
_telemetry_manager = TelemetryManager.get_instance()
return _telemetry_manager
def capture_exception(
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture an exception for telemetry.
Args:
exception: Exception to capture
context: Optional context
Returns:
True if sent successfully
"""
try:
return get_telemetry().capture_exception(exception, context)
except Exception:
return False
def telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from a function.
Args:
func: Function to wrap
Returns:
Wrapped function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
def async_telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from an async function.
Args:
func: Async function to wrap
Returns:
Wrapped async function
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
@contextmanager
def telemetry_context(operation: str):
"""
Context manager for capturing exceptions.
Args:
operation: Name of the operation
Example:
with telemetry_context("web_crawl"):
# Your code here
pass
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
@asynccontextmanager
async def async_telemetry_context(operation: str):
"""
Async context manager for capturing exceptions in async code.
Args:
operation: Name of the operation
Example:
async with async_telemetry_context("async_crawl"):
# Your async code here
await something()
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
def install_exception_handler():
"""Install global exception handler for uncaught exceptions."""
original_hook = sys.excepthook
def telemetry_exception_hook(exc_type, exc_value, exc_traceback):
"""Custom exception hook with telemetry."""
# Don't capture KeyboardInterrupt
if not issubclass(exc_type, KeyboardInterrupt):
capture_exception(exc_value, {
'uncaught': True,
'type': exc_type.__name__
})
# Call original hook
original_hook(exc_type, exc_value, exc_traceback)
sys.excepthook = telemetry_exception_hook
# Public API
def enable(email: Optional[str] = None, always: bool = True, once: bool = False) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors (default)
once: If True, send only the next error
"""
get_telemetry().enable(email=email, always=always, once=once)
def disable() -> None:
"""Disable telemetry."""
get_telemetry().disable()
def status() -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return get_telemetry().status()
# Auto-install exception handler on import
# (Only for main library usage, not for Docker/API)
if EnvironmentDetector.detect() not in [Environment.DOCKER, Environment.API_SERVER]:
install_exception_handler()
__all__ = [
'TelemetryManager',
'get_telemetry',
'capture_exception',
'telemetry_decorator',
'async_telemetry_decorator',
'telemetry_context',
'async_telemetry_context',
'enable',
'disable',
'status',
]

View File

@@ -1,140 +0,0 @@
"""
Base telemetry provider interface for Crawl4AI.
Provides abstraction for different telemetry backends.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Union
import traceback
class TelemetryProvider(ABC):
"""Abstract base class for telemetry providers."""
def __init__(self, **kwargs):
"""Initialize the provider with optional configuration."""
self.config = kwargs
self._initialized = False
@abstractmethod
def initialize(self) -> bool:
"""
Initialize the telemetry provider.
Returns True if initialization successful, False otherwise.
"""
pass
@abstractmethod
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send an exception to the telemetry backend.
Args:
exc: The exception to report
context: Optional context data (email, environment, etc.)
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send a generic telemetry event.
Args:
event_name: Name of the event
payload: Optional event data
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def flush(self) -> None:
"""Flush any pending telemetry data."""
pass
@abstractmethod
def shutdown(self) -> None:
"""Clean shutdown of the provider."""
pass
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove sensitive information from telemetry data.
Override in subclasses for custom sanitization.
Args:
data: Raw data dictionary
Returns:
Sanitized data dictionary
"""
# Default implementation - remove common sensitive fields
sensitive_keys = {
'password', 'token', 'api_key', 'secret', 'credential',
'auth', 'authorization', 'cookie', 'session'
}
def _sanitize_dict(d: Dict) -> Dict:
sanitized = {}
for key, value in d.items():
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
sanitized[key] = '[REDACTED]'
elif isinstance(value, dict):
sanitized[key] = _sanitize_dict(value)
elif isinstance(value, list):
sanitized[key] = [
_sanitize_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
sanitized[key] = value
return sanitized
return _sanitize_dict(data) if isinstance(data, dict) else data
class NullProvider(TelemetryProvider):
"""No-op provider for when telemetry is disabled."""
def initialize(self) -> bool:
"""No initialization needed for null provider."""
self._initialized = True
return True
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op exception sending."""
return True
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op event sending."""
return True
def flush(self) -> None:
"""No-op flush."""
pass
def shutdown(self) -> None:
"""No-op shutdown."""
pass

View File

@@ -1,196 +0,0 @@
"""
Configuration management for Crawl4AI telemetry.
Handles user preferences and persistence.
"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
from enum import Enum
class TelemetryConsent(Enum):
"""Telemetry consent levels."""
NOT_SET = "not_set"
DENIED = "denied"
ONCE = "once" # Send current error only
ALWAYS = "always" # Send all errors
class TelemetryConfig:
"""Manages telemetry configuration and persistence."""
def __init__(self, config_dir: Optional[Path] = None):
"""
Initialize configuration manager.
Args:
config_dir: Optional custom config directory
"""
if config_dir:
self.config_dir = config_dir
else:
# Default to ~/.crawl4ai/
self.config_dir = Path.home() / '.crawl4ai'
self.config_file = self.config_dir / 'config.json'
self._config: Dict[str, Any] = {}
self._load_config()
def _ensure_config_dir(self) -> None:
"""Ensure configuration directory exists."""
self.config_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self) -> None:
"""Load configuration from disk."""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
self._config = json.load(f)
except (json.JSONDecodeError, IOError):
# Corrupted or inaccessible config - start fresh
self._config = {}
else:
self._config = {}
def _save_config(self) -> bool:
"""
Save configuration to disk.
Returns:
True if saved successfully
"""
try:
self._ensure_config_dir()
# Write to temporary file first
temp_file = self.config_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(self._config, f, indent=2)
# Atomic rename
temp_file.replace(self.config_file)
return True
except (IOError, OSError):
return False
def get_telemetry_settings(self) -> Dict[str, Any]:
"""
Get current telemetry settings.
Returns:
Dictionary with telemetry settings
"""
return self._config.get('telemetry', {
'consent': TelemetryConsent.NOT_SET.value,
'email': None
})
def get_consent(self) -> TelemetryConsent:
"""
Get current consent status.
Returns:
TelemetryConsent enum value
"""
settings = self.get_telemetry_settings()
consent_value = settings.get('consent', TelemetryConsent.NOT_SET.value)
# Handle legacy boolean values
if isinstance(consent_value, bool):
consent_value = TelemetryConsent.ALWAYS.value if consent_value else TelemetryConsent.DENIED.value
try:
return TelemetryConsent(consent_value)
except ValueError:
return TelemetryConsent.NOT_SET
def set_consent(
self,
consent: TelemetryConsent,
email: Optional[str] = None
) -> bool:
"""
Set telemetry consent and optional email.
Args:
consent: Consent level
email: Optional email for follow-up
Returns:
True if saved successfully
"""
if 'telemetry' not in self._config:
self._config['telemetry'] = {}
self._config['telemetry']['consent'] = consent.value
# Only update email if provided
if email is not None:
self._config['telemetry']['email'] = email
return self._save_config()
def get_email(self) -> Optional[str]:
"""
Get stored email if any.
Returns:
Email address or None
"""
settings = self.get_telemetry_settings()
return settings.get('email')
def is_enabled(self) -> bool:
"""
Check if telemetry is enabled.
Returns:
True if telemetry should send data
"""
consent = self.get_consent()
return consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]
def should_send_current(self) -> bool:
"""
Check if current error should be sent.
Used for one-time consent.
Returns:
True if current error should be sent
"""
consent = self.get_consent()
if consent == TelemetryConsent.ONCE:
# After sending once, reset to NOT_SET
self.set_consent(TelemetryConsent.NOT_SET)
return True
return consent == TelemetryConsent.ALWAYS
def clear(self) -> bool:
"""
Clear all telemetry settings.
Returns:
True if cleared successfully
"""
if 'telemetry' in self._config:
del self._config['telemetry']
return self._save_config()
return True
def update_from_env(self) -> None:
"""Update configuration from environment variables."""
# Check for telemetry disable flag
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.set_consent(TelemetryConsent.DENIED)
# Check for email override
env_email = os.environ.get('CRAWL4AI_TELEMETRY_EMAIL')
if env_email and self.is_enabled():
current_settings = self.get_telemetry_settings()
self.set_consent(
TelemetryConsent(current_settings['consent']),
email=env_email
)

View File

@@ -1,314 +0,0 @@
"""
User consent handling for Crawl4AI telemetry.
Provides interactive prompts for different environments.
"""
import sys
from typing import Optional, Tuple
from .config import TelemetryConsent, TelemetryConfig
from .environment import Environment, EnvironmentDetector
class ConsentManager:
"""Manages user consent for telemetry."""
def __init__(self, config: Optional[TelemetryConfig] = None):
"""
Initialize consent manager.
Args:
config: Optional TelemetryConfig instance
"""
self.config = config or TelemetryConfig()
self.environment = EnvironmentDetector.detect()
def check_and_prompt(self) -> TelemetryConsent:
"""
Check consent status and prompt if needed.
Returns:
Current consent status
"""
current_consent = self.config.get_consent()
# If already set, return current value
if current_consent != TelemetryConsent.NOT_SET:
return current_consent
# Docker/API server: default enabled (check env var)
if self.environment in [Environment.DOCKER, Environment.API_SERVER]:
return self._handle_docker_consent()
# Interactive environments: prompt user
if EnvironmentDetector.is_interactive():
return self._prompt_for_consent()
# Non-interactive: default disabled
return TelemetryConsent.DENIED
def _handle_docker_consent(self) -> TelemetryConsent:
"""
Handle consent in Docker environment.
Default enabled unless disabled via env var.
"""
import os
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.config.set_consent(TelemetryConsent.DENIED)
return TelemetryConsent.DENIED
# Default enabled for Docker
self.config.set_consent(TelemetryConsent.ALWAYS)
return TelemetryConsent.ALWAYS
def _prompt_for_consent(self) -> TelemetryConsent:
"""
Prompt user for consent based on environment.
Returns:
User's consent choice
"""
if self.environment == Environment.CLI:
return self._cli_prompt()
elif self.environment in [Environment.JUPYTER, Environment.COLAB]:
return self._notebook_prompt()
else:
return TelemetryConsent.DENIED
def _cli_prompt(self) -> TelemetryConsent:
"""
Show CLI prompt for consent.
Returns:
User's consent choice
"""
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detection")
print("="*60)
print("\nWe noticed an error occurred. Help improve Crawl4AI by")
print("sending anonymous crash reports?")
print("\n[1] Yes, send this error only")
print("[2] Yes, always send errors")
print("[3] No, don't send")
print("\n" + "-"*60)
# Get choice
while True:
try:
choice = input("Your choice (1/2/3): ").strip()
if choice == '1':
consent = TelemetryConsent.ONCE
break
elif choice == '2':
consent = TelemetryConsent.ALWAYS
break
elif choice == '3':
consent = TelemetryConsent.DENIED
break
else:
print("Please enter 1, 2, or 3")
except (KeyboardInterrupt, EOFError):
# User cancelled - treat as denial
consent = TelemetryConsent.DENIED
break
# Optional email
email = None
if consent != TelemetryConsent.DENIED:
print("\nOptional: Enter email for follow-up (or press Enter to skip):")
try:
email_input = input("Email: ").strip()
if email_input and '@' in email_input:
email = email_input
except (KeyboardInterrupt, EOFError):
pass
# Save choice
self.config.set_consent(consent, email)
if consent != TelemetryConsent.DENIED:
print("\n✅ Thank you for helping improve Crawl4AI!")
else:
print("\n✅ Telemetry disabled. You can enable it anytime with:")
print(" crawl4ai telemetry enable")
print("="*60 + "\n")
return consent
def _notebook_prompt(self) -> TelemetryConsent:
"""
Show notebook prompt for consent.
Uses widgets if available, falls back to print + code.
Returns:
User's consent choice
"""
if EnvironmentDetector.supports_widgets():
return self._widget_prompt()
else:
return self._notebook_fallback_prompt()
def _widget_prompt(self) -> TelemetryConsent:
"""
Show interactive widget prompt in Jupyter/Colab.
Returns:
User's consent choice
"""
try:
import ipywidgets as widgets
from IPython.display import display, HTML
# Create styled HTML
html = HTML("""
<div style="padding: 15px; border: 2px solid #ff6b6b; border-radius: 8px; background: #fff5f5;">
<h3 style="color: #c92a2a; margin-top: 0;">🚨 Crawl4AI Error Detected</h3>
<p style="color: #495057;">Help us improve by sending anonymous crash reports?</p>
</div>
""")
display(html)
# Create buttons
btn_once = widgets.Button(
description='Send this error',
button_style='info',
icon='check'
)
btn_always = widgets.Button(
description='Always send',
button_style='success',
icon='check-circle'
)
btn_never = widgets.Button(
description='Don\'t send',
button_style='danger',
icon='times'
)
# Email input
email_input = widgets.Text(
placeholder='Optional: your@email.com',
description='Email:',
style={'description_width': 'initial'}
)
# Output area for feedback
output = widgets.Output()
# Container
button_box = widgets.HBox([btn_once, btn_always, btn_never])
container = widgets.VBox([button_box, email_input, output])
# Variable to store choice
consent_choice = {'value': None}
def on_button_click(btn):
"""Handle button click."""
with output:
output.clear_output()
if btn == btn_once:
consent_choice['value'] = TelemetryConsent.ONCE
print("✅ Sending this error only")
elif btn == btn_always:
consent_choice['value'] = TelemetryConsent.ALWAYS
print("✅ Always sending errors")
else:
consent_choice['value'] = TelemetryConsent.DENIED
print("✅ Telemetry disabled")
# Save with email if provided
email = email_input.value.strip() if email_input.value else None
self.config.set_consent(consent_choice['value'], email)
# Disable buttons after choice
btn_once.disabled = True
btn_always.disabled = True
btn_never.disabled = True
email_input.disabled = True
# Attach handlers
btn_once.on_click(on_button_click)
btn_always.on_click(on_button_click)
btn_never.on_click(on_button_click)
# Display widget
display(container)
# Wait for user choice (in notebook, this is non-blocking)
# Return NOT_SET for now, actual choice will be saved via callback
return consent_choice.get('value', TelemetryConsent.NOT_SET)
except Exception:
# Fallback if widgets fail
return self._notebook_fallback_prompt()
def _notebook_fallback_prompt(self) -> TelemetryConsent:
"""
Fallback prompt for notebooks without widget support.
Returns:
User's consent choice (defaults to DENIED)
"""
try:
from IPython.display import display, Markdown
markdown_content = """
### 🚨 Crawl4AI Error Detected
Help us improve by sending anonymous crash reports.
**Telemetry is currently OFF.** To enable, run:
```python
import crawl4ai
crawl4ai.telemetry.enable(email="your@email.com", always=True)
```
To send just this error:
```python
crawl4ai.telemetry.enable(once=True)
```
To keep telemetry disabled:
```python
crawl4ai.telemetry.disable()
```
"""
display(Markdown(markdown_content))
except ImportError:
# Pure print fallback
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detected")
print("="*60)
print("\nTelemetry is OFF. To enable, run:")
print("\nimport crawl4ai")
print('crawl4ai.telemetry.enable(email="you@example.com", always=True)')
print("\n" + "="*60)
# Default to disabled in fallback mode
return TelemetryConsent.DENIED
def force_prompt(self) -> Tuple[TelemetryConsent, Optional[str]]:
"""
Force a consent prompt regardless of current settings.
Used for manual telemetry configuration.
Returns:
Tuple of (consent choice, optional email)
"""
# Temporarily reset consent to force prompt
original_consent = self.config.get_consent()
self.config.set_consent(TelemetryConsent.NOT_SET)
try:
new_consent = self._prompt_for_consent()
email = self.config.get_email()
return new_consent, email
except Exception:
# Restore original on error
self.config.set_consent(original_consent)
raise

View File

@@ -1,199 +0,0 @@
"""
Environment detection for Crawl4AI telemetry.
Detects whether we're running in CLI, Docker, Jupyter, etc.
"""
import os
import sys
from enum import Enum
from typing import Optional
class Environment(Enum):
"""Detected runtime environment."""
CLI = "cli"
DOCKER = "docker"
JUPYTER = "jupyter"
COLAB = "colab"
API_SERVER = "api_server"
UNKNOWN = "unknown"
class EnvironmentDetector:
"""Detects the current runtime environment."""
@staticmethod
def detect() -> Environment:
"""
Detect current runtime environment.
Returns:
Environment enum value
"""
# Check for Docker
if EnvironmentDetector._is_docker():
# Further check if it's API server
if EnvironmentDetector._is_api_server():
return Environment.API_SERVER
return Environment.DOCKER
# Check for Google Colab
if EnvironmentDetector._is_colab():
return Environment.COLAB
# Check for Jupyter
if EnvironmentDetector._is_jupyter():
return Environment.JUPYTER
# Check for CLI
if EnvironmentDetector._is_cli():
return Environment.CLI
return Environment.UNKNOWN
@staticmethod
def _is_docker() -> bool:
"""Check if running inside Docker container."""
# Check for Docker-specific files
if os.path.exists('/.dockerenv'):
return True
# Check cgroup for docker signature
try:
with open('/proc/1/cgroup', 'r') as f:
return 'docker' in f.read()
except (IOError, OSError):
pass
# Check environment variable (if set in Dockerfile)
return os.environ.get('CRAWL4AI_DOCKER', '').lower() == 'true'
@staticmethod
def _is_api_server() -> bool:
"""Check if running as API server."""
# Check for API server indicators
return (
os.environ.get('CRAWL4AI_API_SERVER', '').lower() == 'true' or
'deploy/docker/server.py' in ' '.join(sys.argv) or
'deploy/docker/api.py' in ' '.join(sys.argv)
)
@staticmethod
def _is_jupyter() -> bool:
"""Check if running in Jupyter notebook."""
try:
# Check for IPython
from IPython import get_ipython
ipython = get_ipython()
if ipython is None:
return False
# Check for notebook kernel
if 'IPKernelApp' in ipython.config:
return True
# Check for Jupyter-specific attributes
if hasattr(ipython, 'kernel'):
return True
except (ImportError, AttributeError):
pass
return False
@staticmethod
def _is_colab() -> bool:
"""Check if running in Google Colab."""
try:
import google.colab
return True
except ImportError:
pass
# Alternative check
return 'COLAB_GPU' in os.environ or 'COLAB_TPU_ADDR' in os.environ
@staticmethod
def _is_cli() -> bool:
"""Check if running from command line."""
# Check if we have a terminal
return (
hasattr(sys, 'ps1') or
sys.stdin.isatty() or
bool(os.environ.get('TERM'))
)
@staticmethod
def is_interactive() -> bool:
"""
Check if environment supports interactive prompts.
Returns:
True if interactive prompts are supported
"""
env = EnvironmentDetector.detect()
# Docker/API server are non-interactive
if env in [Environment.DOCKER, Environment.API_SERVER]:
return False
# CLI with TTY is interactive
if env == Environment.CLI:
return sys.stdin.isatty()
# Jupyter/Colab can be interactive with widgets
if env in [Environment.JUPYTER, Environment.COLAB]:
return True
return False
@staticmethod
def supports_widgets() -> bool:
"""
Check if environment supports IPython widgets.
Returns:
True if widgets are supported
"""
env = EnvironmentDetector.detect()
if env not in [Environment.JUPYTER, Environment.COLAB]:
return False
try:
import ipywidgets
from IPython.display import display
return True
except ImportError:
return False
@staticmethod
def get_environment_context() -> dict:
"""
Get environment context for telemetry.
Returns:
Dictionary with environment information
"""
env = EnvironmentDetector.detect()
context = {
'environment_type': env.value,
'python_version': f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
'platform': sys.platform,
}
# Add environment-specific context
if env == Environment.DOCKER:
context['docker'] = True
context['container_id'] = os.environ.get('HOSTNAME', 'unknown')
elif env == Environment.COLAB:
context['colab'] = True
context['gpu'] = bool(os.environ.get('COLAB_GPU'))
elif env == Environment.JUPYTER:
context['jupyter'] = True
return context

View File

@@ -1,15 +0,0 @@
"""
Telemetry providers for Crawl4AI.
"""
from ..base import TelemetryProvider, NullProvider
__all__ = ['TelemetryProvider', 'NullProvider']
# Try to import Sentry provider if available
try:
from .sentry import SentryProvider
__all__.append('SentryProvider')
except ImportError:
# Sentry SDK not installed
pass

View File

@@ -1,234 +0,0 @@
"""
Sentry telemetry provider for Crawl4AI.
"""
import os
from typing import Dict, Any, Optional
from ..base import TelemetryProvider
# Hardcoded DSN for Crawl4AI project
# This is safe to embed as it's the public part of the DSN
# TODO: Replace with actual Crawl4AI Sentry project DSN before release
# Format: "https://<public_key>@<organization>.ingest.sentry.io/<project_id>"
DEFAULT_SENTRY_DSN = "https://your-public-key@sentry.io/your-project-id"
class SentryProvider(TelemetryProvider):
"""Sentry implementation of telemetry provider."""
def __init__(self, dsn: Optional[str] = None, **kwargs):
"""
Initialize Sentry provider.
Args:
dsn: Optional DSN override (for testing/development)
**kwargs: Additional Sentry configuration
"""
super().__init__(**kwargs)
# Allow DSN override via environment variable or parameter
self.dsn = (
dsn or
os.environ.get('CRAWL4AI_SENTRY_DSN') or
DEFAULT_SENTRY_DSN
)
self._sentry_sdk = None
self.environment = kwargs.get('environment', 'production')
self.release = kwargs.get('release', None)
def initialize(self) -> bool:
"""Initialize Sentry SDK."""
try:
import sentry_sdk
from sentry_sdk.integrations.stdlib import StdlibIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
# Initialize Sentry with minimal integrations
sentry_sdk.init(
dsn=self.dsn,
environment=self.environment,
release=self.release,
# Performance monitoring disabled by default
traces_sample_rate=0.0,
# Only capture errors, not transactions
# profiles_sample_rate=0.0,
# Minimal integrations
integrations=[
StdlibIntegration(),
ExcepthookIntegration(always_run=False),
],
# Privacy settings
send_default_pii=False,
attach_stacktrace=True,
# Before send hook for additional sanitization
before_send=self._before_send,
# Disable automatic breadcrumbs
max_breadcrumbs=0,
# Disable request data collection
# request_bodies='never',
# # Custom transport options
# transport_options={
# 'keepalive': True,
# },
)
self._sentry_sdk = sentry_sdk
self._initialized = True
return True
except ImportError:
# Sentry SDK not installed
return False
except Exception:
# Initialization failed silently
return False
def _before_send(self, event: Dict[str, Any], hint: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Process event before sending to Sentry.
Provides additional privacy protection.
"""
# Remove sensitive data
if 'request' in event:
event['request'] = self._sanitize_request(event['request'])
# Remove local variables that might contain sensitive data
if 'exception' in event and 'values' in event['exception']:
for exc in event['exception']['values']:
if 'stacktrace' in exc and 'frames' in exc['stacktrace']:
for frame in exc['stacktrace']['frames']:
# Remove local variables from frames
frame.pop('vars', None)
# Apply general sanitization
event = self.sanitize_data(event)
return event
def _sanitize_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize request data to remove sensitive information."""
sanitized = request_data.copy()
# Remove sensitive fields
sensitive_fields = ['cookies', 'headers', 'data', 'query_string', 'env']
for field in sensitive_fields:
if field in sanitized:
sanitized[field] = '[REDACTED]'
# Keep only safe fields
safe_fields = ['method', 'url']
return {k: v for k, v in sanitized.items() if k in safe_fields}
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send exception to Sentry.
Args:
exc: Exception to report
context: Optional context (email, environment info)
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
with self._sentry_sdk.push_scope() as scope:
# Add user context if email provided
if context and 'email' in context:
scope.set_user({'email': context['email']})
# Add additional context
if context:
for key, value in context.items():
if key != 'email':
scope.set_context(key, value)
# Add tags for filtering
scope.set_tag('source', context.get('source', 'unknown'))
scope.set_tag('environment_type', context.get('environment_type', 'unknown'))
# Capture the exception
self._sentry_sdk.capture_exception(exc)
return True
except Exception:
# Silently fail - telemetry should never crash the app
return False
return False
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send custom event to Sentry.
Args:
event_name: Name of the event
payload: Event data
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
# Sanitize payload
safe_payload = self.sanitize_data(payload) if payload else {}
# Send as a message with extra data
self._sentry_sdk.capture_message(
event_name,
level='info',
extras=safe_payload
)
return True
except Exception:
return False
return False
def flush(self) -> None:
"""Flush pending events to Sentry."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
except Exception:
pass
def shutdown(self) -> None:
"""Shutdown Sentry client."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
# Note: sentry_sdk doesn't have a shutdown method
# Flush is sufficient for cleanup
except Exception:
pass
finally:
self._initialized = False

View File

@@ -16,7 +16,7 @@ from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, IM
import httpx
from socket import gaierror
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable, Generator, Tuple, Iterable
from typing import Dict, Any, List, Optional, Callable
from urllib.parse import urljoin
import requests
from requests.exceptions import InvalidSchema
@@ -40,7 +40,8 @@ from typing import Sequence
from itertools import chain
from collections import deque
import psutil
from typing import Generator, Iterable
import numpy as np
from urllib.parse import (
@@ -3413,79 +3414,3 @@ def cosine_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate cosine distance (1 - similarity) between two vectors"""
return 1 - cosine_similarity(vec1, vec2)
# Memory utilities
def get_true_available_memory_gb() -> float:
"""Get truly available memory including inactive pages (cross-platform)"""
vm = psutil.virtual_memory()
if platform.system() == 'Darwin': # macOS
# On macOS, we need to include inactive memory too
try:
# Use vm_stat to get accurate values
result = subprocess.run(['vm_stat'], capture_output=True, text=True)
lines = result.stdout.split('\n')
page_size = 16384 # macOS page size
pages = {}
for line in lines:
if 'Pages free:' in line:
pages['free'] = int(line.split()[-1].rstrip('.'))
elif 'Pages inactive:' in line:
pages['inactive'] = int(line.split()[-1].rstrip('.'))
elif 'Pages speculative:' in line:
pages['speculative'] = int(line.split()[-1].rstrip('.'))
elif 'Pages purgeable:' in line:
pages['purgeable'] = int(line.split()[-1].rstrip('.'))
# Calculate total available (free + inactive + speculative + purgeable)
total_available_pages = (
pages.get('free', 0) +
pages.get('inactive', 0) +
pages.get('speculative', 0) +
pages.get('purgeable', 0)
)
available_gb = (total_available_pages * page_size) / (1024**3)
return available_gb
except:
# Fallback to psutil
return vm.available / (1024**3)
else:
# For Windows and Linux, psutil.available is accurate
return vm.available / (1024**3)
def get_true_memory_usage_percent() -> float:
"""
Get memory usage percentage that accounts for platform differences.
Returns:
float: Memory usage percentage (0-100)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
# Calculate used percentage based on truly available memory
used_percent = 100.0 * (total_gb - available_gb) / total_gb
# Ensure it's within valid range
return max(0.0, min(100.0, used_percent))
def get_memory_stats() -> Tuple[float, float, float]:
"""
Get comprehensive memory statistics.
Returns:
Tuple[float, float, float]: (used_percent, available_gb, total_gb)
"""
vm = psutil.virtual_memory()
total_gb = vm.total / (1024**3)
available_gb = get_true_available_memory_gb()
used_percent = get_true_memory_usage_percent()
return used_percent, available_gb, total_gb

View File

@@ -1,5 +1,6 @@
import os
import json
import orjson
import asyncio
from typing import List, Tuple, Dict
from functools import partial
@@ -384,27 +385,60 @@ def create_task_response(task: dict, task_id: str, base_url: str) -> dict:
async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]:
"""Stream results with heartbeats and completion markers."""
import json
from utils import datetime_handler
import orjson
from datetime import datetime
import inspect
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
try:
async for result in results_gen:
try:
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = json.dumps(result_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8')
except Exception as e:
logger.error(f"Serialization error: {e}")
error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')}
yield (json.dumps(error_response) + "\n").encode('utf-8')
logger.info(f"Starting streaming with results_gen type: {type(results_gen)}")
logger.info(f"Is results_gen async generator: {inspect.isasyncgen(results_gen)}")
# Check if results_gen is actually an async generator vs another type
if inspect.isasyncgen(results_gen):
logger.info("Processing as async generator")
async for result in results_gen:
try:
logger.info(f"Processing streaming result of type: {type(result)}")
# Check if this result is actually a CrawlResult
if hasattr(result, 'model_dump_json'):
server_memory_mb = _get_memory_mb()
result_json = result.model_dump_json()
result_dict = orjson.loads(result_json)
result_dict['server_memory_mb'] = server_memory_mb
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = orjson.dumps(result_dict, default=orjson_default).decode('utf-8') + "\n"
yield data.encode('utf-8')
else:
logger.error(f"Result doesn't have model_dump_json method: {type(result)}")
error_response = {"error": f"Invalid result type: {type(result)}", "url": "unknown"}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
except Exception as e:
logger.error(f"Serialization error: {e}")
logger.error(f"Result type was: {type(result)}")
error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
else:
logger.error(f"results_gen is not an async generator: {type(results_gen)}")
error_response = {"error": f"Invalid results_gen type: {type(results_gen)}"}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
yield json.dumps({"status": "completed"}).encode('utf-8')
yield orjson.dumps({"status": "completed"}).decode('utf-8').encode('utf-8')
except asyncio.CancelledError:
logger.warning("Client disconnected during streaming")
@@ -472,7 +506,9 @@ async def handle_crawl_request(
# Process results to handle PDF bytes
processed_results = []
for result in results:
result_dict = result.model_dump()
# Use ORJSON serialization to handle property objects properly
result_json = result.model_dump_json()
result_dict = orjson.loads(result_json)
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -522,8 +558,19 @@ async def handle_stream_crawl_request(
browser_config.verbose = False
crawler_config = CrawlerRunConfig.load(crawler_config)
crawler_config.scraping_strategy = LXMLWebScrapingStrategy()
crawler_config.stream = True
# Don't force stream=True here - let the deep crawl strategy control its own streaming behavior
# Apply global base config (this was missing!)
base_config = config["crawler"]["base_config"]
for key, value in base_config.items():
if hasattr(crawler_config, key):
print(f"[DEBUG] Applying base_config: {key} = {value}")
setattr(crawler_config, key, value)
print(f"[DEBUG] Deep crawl strategy: {type(crawler_config.deep_crawl_strategy).__name__ if crawler_config.deep_crawl_strategy else 'None'}")
print(f"[DEBUG] Stream mode: {crawler_config.stream}")
print(f"[DEBUG] Simulate user: {getattr(crawler_config, 'simulate_user', 'Not set')}")
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
rate_limiter=RateLimiter(
@@ -537,11 +584,58 @@ async def handle_stream_crawl_request(
# crawler = AsyncWebCrawler(config=browser_config)
# await crawler.start()
results_gen = await crawler.arun_many(
urls=urls,
config=crawler_config,
dispatcher=dispatcher
)
# Use correct method based on URL count (same as regular endpoint)
if len(urls) == 1:
# For single URL, use arun to get CrawlResult, then wrap in async generator
single_result_container = await crawler.arun(
url=urls[0],
config=crawler_config,
dispatcher=dispatcher
)
async def single_result_generator():
# Handle CrawlResultContainer - extract the actual results
if hasattr(single_result_container, '_results'):
# It's a CrawlResultContainer - iterate over the internal results
for result in single_result_container._results:
# Check if the result is an async generator (from deep crawl)
if hasattr(result, '__aiter__'):
async for sub_result in result:
yield sub_result
else:
yield result
elif hasattr(single_result_container, '__aiter__'):
# It's an async generator (from streaming deep crawl)
async for result in single_result_container:
yield result
elif hasattr(single_result_container, '__iter__') and not hasattr(single_result_container, 'url'):
# It's iterable but not a CrawlResult itself
for result in single_result_container:
# Check if each result is an async generator
if hasattr(result, '__aiter__'):
async for sub_result in result:
yield sub_result
else:
yield result
else:
# It's a single CrawlResult
yield single_result_container
results_gen = single_result_generator()
else:
# For multiple URLs, use arun_many
results_gen = await crawler.arun_many(
urls=urls,
config=crawler_config,
dispatcher=dispatcher
)
# If results_gen is a list (e.g., from deep crawl), convert to async generator
if isinstance(results_gen, list):
async def convert_list_to_generator():
for result in results_gen:
yield result
results_gen = convert_list_to_generator()
return crawler, results_gen

View File

@@ -15,4 +15,3 @@ PyJWT==2.10.1
mcp>=1.6.0
websockets>=15.0.1
httpx[http2]>=0.27.2
sentry-sdk>=2.0.0

View File

@@ -7,13 +7,16 @@ Crawl4AI FastAPI entrypoint
"""
# ── stdlib & 3rdparty imports ───────────────────────────────
from datetime import datetime
import orjson
from crawler_pool import get_crawler, close_all, janitor
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from auth import create_access_token, get_token_dependency, TokenRequest
from pydantic import BaseModel
from typing import Optional, List, Dict
from fastapi import Request, Depends
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, ORJSONResponse
import base64
import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
@@ -32,6 +35,8 @@ from schemas import (
JSEndpointRequest,
)
# Use the proper serialization functions from async_configs
from crawl4ai.async_configs import to_serializable_dict
from utils import (
FilterType, load_config, setup_logging, verify_email_domain
)
@@ -74,32 +79,6 @@ setup_logging(config)
__version__ = "0.5.1-d1"
# ───────────────────── telemetry setup ────────────────────────
# Docker/API server telemetry: enabled by default unless CRAWL4AI_TELEMETRY=0
import os as _os
if _os.environ.get('CRAWL4AI_TELEMETRY') != '0':
# Set environment variable to indicate we're in API server mode
_os.environ['CRAWL4AI_API_SERVER'] = 'true'
# Import and enable telemetry for Docker/API environment
from crawl4ai.telemetry import enable as enable_telemetry
from crawl4ai.telemetry import capture_exception
# Enable telemetry automatically in Docker mode
enable_telemetry(always=True)
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("✅ Telemetry enabled for Docker/API server")
else:
# Define no-op for capture_exception if telemetry is disabled
def capture_exception(exc, context=None):
pass
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("❌ Telemetry disabled via CRAWL4AI_TELEMETRY=0")
# ── global page semaphore (hard cap) ─────────────────────────
MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30)
GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
@@ -138,11 +117,26 @@ async def lifespan(_: FastAPI):
app.state.janitor.cancel()
await close_all()
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
def orjson_dumps(v, *, default):
return orjson.dumps(v, default=orjson_default).decode()
# ───────────────────── FastAPI instance ──────────────────────
app = FastAPI(
title=config["app"]["title"],
version=config["app"]["version"],
lifespan=lifespan,
default_response_class=ORJSONResponse
)
# ── static playground ──────────────────────────────────────
@@ -461,15 +455,20 @@ async def crawl(
"""
Crawl a list of URLs and return the results as JSON.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
return JSONResponse(res)
try:
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
# handle_crawl_request returns a dictionary, so we can pass it directly to ORJSONResponse
return ORJSONResponse(res)
except Exception as e:
print(f"Error occurred: {e}")
return ORJSONResponse({"error": str(e)}, status_code=500)
@app.post("/crawl/stream")

View File

@@ -1,305 +0,0 @@
# 🚀 Crawl4AI v0.7.4: The Intelligent Table Extraction & Performance Update
*August 17, 2025 • 6 min read*
---
Today I'm releasing Crawl4AI v0.7.4—the Intelligent Table Extraction & Performance Update. This release introduces revolutionary LLM-powered table extraction with intelligent chunking, significant performance improvements for concurrent crawling, enhanced browser management, and critical stability fixes that make Crawl4AI more robust for production workloads.
## 🎯 What's New at a Glance
- **🚀 LLMTableExtraction**: Revolutionary table extraction with intelligent chunking for massive tables
- **⚡ Enhanced Concurrency**: True concurrency improvements for fast-completing tasks in batch operations
- **🧹 Memory Management Refactor**: Streamlined memory utilities and better resource management
- **🔧 Browser Manager Fixes**: Resolved race conditions in concurrent page creation
- **⌨️ Cross-Platform Browser Profiler**: Improved keyboard handling and quit mechanisms
- **🔗 Advanced URL Processing**: Better handling of raw URLs and base tag link resolution
- **🛡️ Enhanced Proxy Support**: Flexible proxy configuration with dict and string formats
- **🐳 Docker Improvements**: Better API handling and raw HTML support
## 🚀 LLMTableExtraction: Revolutionary Table Processing
**The Problem:** Complex tables with rowspan, colspan, nested structures, or massive datasets that traditional HTML parsing can't handle effectively. Large tables that exceed token limits crash extraction processes.
**My Solution:** I developed LLMTableExtraction—an intelligent table extraction strategy that uses Large Language Models with automatic chunking to handle tables of any size and complexity.
### Technical Implementation
```python
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
LLMConfig,
LLMTableExtraction,
CacheMode
)
# Configure LLM for table extraction
llm_config = LLMConfig(
provider="openai/gpt-4.1-mini",
api_token="env:OPENAI_API_KEY",
temperature=0.1, # Low temperature for consistency
max_tokens=32000
)
# Create intelligent table extraction strategy
table_strategy = LLMTableExtraction(
llm_config=llm_config,
verbose=True,
max_tries=2,
enable_chunking=True, # Handle massive tables
chunk_token_threshold=5000, # Smart chunking threshold
overlap_threshold=100, # Maintain context between chunks
extraction_type="structured" # Get structured data output
)
# Apply to crawler configuration
config = CrawlerRunConfig(
table_extraction_strategy=table_strategy,
cache_mode=CacheMode.BYPASS
)
async with AsyncWebCrawler() as crawler:
# Extract complex tables with intelligence
result = await crawler.arun(
"https://en.wikipedia.org/wiki/List_of_countries_by_GDP",
config=config
)
# Access extracted tables directly
for i, table in enumerate(result.tables):
print(f"Table {i}: {len(table['data'])} rows × {len(table['headers'])} columns")
# Convert to pandas DataFrame instantly
import pandas as pd
df = pd.DataFrame(table['data'], columns=table['headers'])
print(df.head())
```
**Intelligent Chunking for Massive Tables:**
```python
# Handle tables that exceed token limits
large_table_strategy = LLMTableExtraction(
llm_config=llm_config,
enable_chunking=True,
chunk_token_threshold=3000, # Conservative threshold
overlap_threshold=150, # Preserve context
max_concurrent_chunks=3, # Parallel processing
merge_strategy="intelligent" # Smart chunk merging
)
# Process Wikipedia comparison tables, financial reports, etc.
config = CrawlerRunConfig(
table_extraction_strategy=large_table_strategy,
# Target specific table containers
css_selector="div.wikitable, table.sortable",
delay_before_return_html=2.0
)
result = await crawler.arun(
"https://en.wikipedia.org/wiki/Comparison_of_operating_systems",
config=config
)
# Tables are automatically chunked, processed, and merged
print(f"Extracted {len(result.tables)} complex tables")
for table in result.tables:
print(f"Merged table: {len(table['data'])} total rows")
```
**Advanced Features:**
- **Intelligent Chunking**: Automatically splits massive tables while preserving structure
- **Context Preservation**: Overlapping chunks maintain column relationships
- **Parallel Processing**: Concurrent chunk processing for speed
- **Smart Merging**: Reconstructs complete tables from processed chunks
- **Complex Structure Support**: Handles rowspan, colspan, nested tables
- **Metadata Extraction**: Captures table context, captions, and relationships
**Expected Real-World Impact:**
- **Financial Analysis**: Extract complex earnings tables and financial statements
- **Research & Academia**: Process large datasets from Wikipedia, research papers
- **E-commerce**: Handle product comparison tables with complex layouts
- **Government Data**: Extract census data, statistical tables from official sources
- **Competitive Intelligence**: Process competitor pricing and feature tables
## ⚡ Enhanced Concurrency: True Performance Gains
**The Problem:** The `arun_many()` method wasn't achieving true concurrency for fast-completing tasks, leading to sequential processing bottlenecks in batch operations.
**My Solution:** I implemented true concurrency improvements in the dispatcher that enable genuine parallel processing for fast-completing tasks.
### Performance Optimization
```python
# Before v0.7.4: Sequential-like behavior for fast tasks
# After v0.7.4: True concurrency
async with AsyncWebCrawler() as crawler:
# These will now run with true concurrency
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
# Processes in truly parallel fashion
results = await crawler.arun_many(urls)
# Performance improvement: ~4x faster for fast-completing tasks
print(f"Processed {len(results)} URLs with true concurrency")
```
**Expected Real-World Impact:**
- **API Crawling**: 3-4x faster processing of REST endpoints and API documentation
- **Batch URL Processing**: Significant speedup for large URL lists
- **Monitoring Systems**: Faster health checks and status page monitoring
- **Data Aggregation**: Improved performance for real-time data collection
## 🧹 Memory Management Refactor: Cleaner Architecture
**The Problem:** Memory utilities were scattered and difficult to maintain, with potential import conflicts and unclear organization.
**My Solution:** I consolidated all memory-related utilities into the main `utils.py` module, creating a cleaner, more maintainable architecture.
### Improved Memory Handling
```python
# All memory utilities now consolidated
from crawl4ai.utils import get_true_memory_usage_percent, MemoryMonitor
# Enhanced memory monitoring
monitor = MemoryMonitor()
monitor.start_monitoring()
async with AsyncWebCrawler() as crawler:
# Memory-efficient batch processing
results = await crawler.arun_many(large_url_list)
# Get accurate memory metrics
memory_usage = get_true_memory_usage_percent()
memory_report = monitor.get_report()
print(f"Memory efficiency: {memory_report['efficiency']:.1f}%")
print(f"Peak usage: {memory_report['peak_mb']:.1f} MB")
```
**Expected Real-World Impact:**
- **Production Stability**: More reliable memory tracking and management
- **Code Maintainability**: Cleaner architecture for easier debugging
- **Import Clarity**: Resolved potential conflicts and import issues
- **Developer Experience**: Simpler API for memory monitoring
## 🔧 Critical Stability Fixes
### Browser Manager Race Condition Resolution
**The Problem:** Concurrent page creation in persistent browser contexts caused "Target page/context closed" errors during high-concurrency operations.
**My Solution:** Implemented thread-safe page creation with proper locking mechanisms.
```python
# Fixed: Safe concurrent page creation
browser_config = BrowserConfig(
browser_type="chromium",
use_persistent_context=True, # Now thread-safe
max_concurrent_sessions=10 # Safely handle concurrent requests
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# These concurrent operations are now stable
tasks = [crawler.arun(url) for url in url_list]
results = await asyncio.gather(*tasks) # No more race conditions
```
### Enhanced Browser Profiler
**The Problem:** Inconsistent keyboard handling across platforms and unreliable quit mechanisms.
**My Solution:** Cross-platform keyboard listeners with improved quit handling.
### Advanced URL Processing
**The Problem:** Raw URL formats (`raw://` and `raw:`) weren't properly handled, and base tag link resolution was incomplete.
**My Solution:** Enhanced URL preprocessing and base tag support.
```python
# Now properly handles all URL formats
urls = [
"https://example.com",
"raw://static-html-content",
"raw:file://local-file.html"
]
# Base tag links are now correctly resolved
config = CrawlerRunConfig(
include_links=True, # Links properly resolved with base tags
resolve_absolute_urls=True
)
```
## 🛡️ Enhanced Proxy Configuration
**The Problem:** Proxy configuration only accepted specific formats, limiting flexibility.
**My Solution:** Enhanced ProxyConfig to support both dictionary and string formats.
```python
# Multiple proxy configuration formats now supported
from crawl4ai import BrowserConfig, ProxyConfig
# String format
proxy_config = ProxyConfig("http://proxy.example.com:8080")
# Dictionary format
proxy_config = ProxyConfig({
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass"
})
# Use with crawler
browser_config = BrowserConfig(proxy_config=proxy_config)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://httpbin.org/ip")
```
## 🐳 Docker & Infrastructure Improvements
This release includes several Docker and infrastructure improvements:
- **Better API Token Handling**: Improved Docker example scripts with correct endpoints
- **Raw HTML Support**: Enhanced Docker API to handle raw HTML content properly
- **Documentation Updates**: Comprehensive Docker deployment examples
- **Test Coverage**: Expanded test suite with better coverage
## 📚 Documentation & Examples
Enhanced documentation includes:
- **LLM Table Extraction Guide**: Comprehensive examples and best practices
- **Migration Documentation**: Updated patterns for new table extraction methods
- **Docker Deployment**: Complete deployment guide with examples
- **Performance Optimization**: Guidelines for concurrent crawling
## 🙏 Acknowledgments
Thanks to our contributors and community for feedback, bug reports, and feature requests that made this release possible.
## 📚 Resources
- [Full Documentation](https://docs.crawl4ai.com)
- [GitHub Repository](https://github.com/unclecode/crawl4ai)
- [Discord Community](https://discord.gg/crawl4ai)
- [LLM Table Extraction Examples](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/llm_table_extraction_example.py)
---
*Crawl4AI v0.7.4 delivers intelligent table extraction and significant performance improvements. The new LLMTableExtraction strategy handles complex tables that were previously impossible to process, while concurrency improvements make batch operations 3-4x faster. Try the intelligent table extraction—it's a game changer for data extraction workflows!*
**Happy Crawling! 🕷️**
*- The Crawl4AI Team*

View File

@@ -1,356 +0,0 @@
#!/usr/bin/env python3
"""
Example demonstrating LLM-based table extraction in Crawl4AI.
This example shows how to use the LLMTableExtraction strategy to extract
complex tables from web pages, including handling rowspan, colspan, and nested tables.
"""
import os
import sys
# Get the grandparent directory
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(grandparent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
LLMConfig,
LLMTableExtraction,
CacheMode
)
import pandas as pd
# Example 1: Basic LLM Table Extraction
async def basic_llm_extraction():
"""Extract tables using LLM with default settings."""
print("\n=== Example 1: Basic LLM Table Extraction ===")
# Configure LLM (using OpenAI GPT-4o-mini for cost efficiency)
llm_config = LLMConfig(
provider="openai/gpt-4.1-mini",
api_token="env:OPENAI_API_KEY", # Uses environment variable
temperature=0.1, # Low temperature for consistency
max_tokens=32000
)
# Create LLM table extraction strategy
table_strategy = LLMTableExtraction(
llm_config=llm_config,
verbose=True,
# css_selector="div.mw-content-ltr",
max_tries=2,
enable_chunking=True,
chunk_token_threshold=5000, # Lower threshold to force chunking
min_rows_per_chunk=10,
max_parallel_chunks=3
)
# Configure crawler with the strategy
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=table_strategy
)
async with AsyncWebCrawler() as crawler:
# Extract tables from a Wikipedia page
result = await crawler.arun(
url="https://en.wikipedia.org/wiki/List_of_chemical_elements",
config=config
)
if result.success:
print(f"✓ Found {len(result.tables)} tables")
# Display first table
if result.tables:
first_table = result.tables[0]
print(f"\nFirst table:")
print(f" Headers: {first_table['headers'][:5]}...")
print(f" Rows: {len(first_table['rows'])}")
# Convert to pandas DataFrame
df = pd.DataFrame(
first_table['rows'],
columns=first_table['headers']
)
print(f"\nDataFrame shape: {df.shape}")
print(df.head())
else:
print(f"✗ Extraction failed: {result.error}")
# Example 2: Focused Extraction with CSS Selector
async def focused_extraction():
"""Extract tables from specific page sections using CSS selectors."""
print("\n=== Example 2: Focused Extraction with CSS Selector ===")
# HTML with multiple tables
test_html = """
<html>
<body>
<div class="sidebar">
<table role="presentation">
<tr><td>Navigation</td></tr>
</table>
</div>
<div class="main-content">
<table id="data-table">
<caption>Quarterly Sales Report</caption>
<thead>
<tr>
<th rowspan="2">Product</th>
<th colspan="3">Q1 2024</th>
</tr>
<tr>
<th>Jan</th>
<th>Feb</th>
<th>Mar</th>
</tr>
</thead>
<tbody>
<tr>
<td>Widget A</td>
<td>100</td>
<td>120</td>
<td>140</td>
</tr>
<tr>
<td>Widget B</td>
<td>200</td>
<td>180</td>
<td>220</td>
</tr>
</tbody>
</table>
</div>
</body>
</html>
"""
llm_config = LLMConfig(
provider="openai/gpt-4.1-mini",
api_token="env:OPENAI_API_KEY"
)
# Focus only on main content area
table_strategy = LLMTableExtraction(
llm_config=llm_config,
css_selector=".main-content", # Only extract from main content
verbose=True
)
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=table_strategy
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url=f"raw:{test_html}",
config=config
)
if result.success and result.tables:
table = result.tables[0]
print(f"✓ Extracted table: {table.get('caption', 'No caption')}")
print(f" Headers: {table['headers']}")
print(f" Metadata: {table['metadata']}")
# The LLM should have handled the rowspan/colspan correctly
print("\nProcessed data (rowspan/colspan handled):")
for i, row in enumerate(table['rows']):
print(f" Row {i+1}: {row}")
# Example 3: Comparing with Default Extraction
async def compare_strategies():
"""Compare LLM extraction with default extraction on complex tables."""
print("\n=== Example 3: Comparing LLM vs Default Extraction ===")
# Complex table with nested structure
complex_html = """
<html>
<body>
<table>
<tr>
<th rowspan="3">Category</th>
<th colspan="2">2023</th>
<th colspan="2">2024</th>
</tr>
<tr>
<th>H1</th>
<th>H2</th>
<th>H1</th>
<th>H2</th>
</tr>
<tr>
<td colspan="4">All values in millions</td>
</tr>
<tr>
<td>Revenue</td>
<td>100</td>
<td>120</td>
<td>130</td>
<td>145</td>
</tr>
<tr>
<td>Profit</td>
<td>20</td>
<td>25</td>
<td>28</td>
<td>32</td>
</tr>
</table>
</body>
</html>
"""
async with AsyncWebCrawler() as crawler:
# Test with default extraction
from crawl4ai import DefaultTableExtraction
default_strategy = DefaultTableExtraction(
table_score_threshold=3,
verbose=True
)
config_default = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=default_strategy
)
result_default = await crawler.arun(
url=f"raw:{complex_html}",
config=config_default
)
# Test with LLM extraction
llm_strategy = LLMTableExtraction(
llm_config=LLMConfig(
provider="openai/gpt-4.1-mini",
api_token="env:OPENAI_API_KEY"
),
verbose=True
)
config_llm = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=llm_strategy
)
result_llm = await crawler.arun(
url=f"raw:{complex_html}",
config=config_llm
)
# Compare results
print("\nDefault Extraction:")
if result_default.tables:
table = result_default.tables[0]
print(f" Headers: {table.get('headers', [])}")
print(f" Rows: {len(table.get('rows', []))}")
for i, row in enumerate(table.get('rows', [])[:3]):
print(f" Row {i+1}: {row}")
print("\nLLM Extraction (handles complex structure better):")
if result_llm.tables:
table = result_llm.tables[0]
print(f" Headers: {table.get('headers', [])}")
print(f" Rows: {len(table.get('rows', []))}")
for i, row in enumerate(table.get('rows', [])):
print(f" Row {i+1}: {row}")
print(f" Metadata: {table.get('metadata', {})}")
# Example 4: Batch Processing Multiple Pages
async def batch_extraction():
"""Extract tables from multiple pages efficiently."""
print("\n=== Example 4: Batch Table Extraction ===")
urls = [
"https://www.worldometers.info/geography/alphabetical-list-of-countries/",
# "https://en.wikipedia.org/wiki/List_of_chemical_elements",
]
llm_config = LLMConfig(
provider="openai/gpt-4.1-mini",
api_token="env:OPENAI_API_KEY",
temperature=0.1,
max_tokens=1500
)
table_strategy = LLMTableExtraction(
llm_config=llm_config,
css_selector="div.datatable-container", # Wikipedia data tables
verbose=False,
enable_chunking=True,
chunk_token_threshold=5000, # Lower threshold to force chunking
min_rows_per_chunk=10,
max_parallel_chunks=3
)
config = CrawlerRunConfig(
table_extraction=table_strategy,
cache_mode=CacheMode.BYPASS
)
all_tables = []
async with AsyncWebCrawler() as crawler:
for url in urls:
print(f"\nProcessing: {url.split('/')[-1][:50]}...")
result = await crawler.arun(url=url, config=config)
if result.success and result.tables:
print(f" ✓ Found {len(result.tables)} tables")
# Store first table from each page
if result.tables:
all_tables.append({
'url': url,
'table': result.tables[0]
})
# Summary
print(f"\n=== Summary ===")
print(f"Extracted {len(all_tables)} tables from {len(urls)} pages")
for item in all_tables:
table = item['table']
print(f"\nFrom {item['url'].split('/')[-1][:30]}:")
print(f" Columns: {len(table['headers'])}")
print(f" Rows: {len(table['rows'])}")
async def main():
"""Run all examples."""
print("=" * 60)
print("LLM TABLE EXTRACTION EXAMPLES")
print("=" * 60)
# Run examples (comment out ones you don't want to run)
# Basic extraction
await basic_llm_extraction()
# # Focused extraction with CSS
# await focused_extraction()
# # Compare strategies
# await compare_strategies()
# # Batch processing
# await batch_extraction()
print("\n" + "=" * 60)
print("ALL EXAMPLES COMPLETED")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,276 +0,0 @@
"""
Example: Using Table Extraction Strategies in Crawl4AI
This example demonstrates how to use different table extraction strategies
to extract tables from web pages.
"""
import asyncio
import pandas as pd
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
CacheMode,
DefaultTableExtraction,
NoTableExtraction,
TableExtractionStrategy
)
from typing import Dict, List, Any
async def example_default_extraction():
"""Example 1: Using default table extraction (automatic)."""
print("\n" + "="*50)
print("Example 1: Default Table Extraction")
print("="*50)
async with AsyncWebCrawler() as crawler:
# No need to specify table_extraction - uses DefaultTableExtraction automatically
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_score_threshold=7 # Adjust sensitivity (default: 7)
)
result = await crawler.arun(
"https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)",
config=config
)
if result.success and result.tables:
print(f"Found {len(result.tables)} tables")
# Convert first table to pandas DataFrame
if result.tables:
first_table = result.tables[0]
df = pd.DataFrame(
first_table['rows'],
columns=first_table['headers'] if first_table['headers'] else None
)
print(f"\nFirst table preview:")
print(df.head())
print(f"Shape: {df.shape}")
async def example_custom_configuration():
"""Example 2: Custom table extraction configuration."""
print("\n" + "="*50)
print("Example 2: Custom Table Configuration")
print("="*50)
async with AsyncWebCrawler() as crawler:
# Create custom extraction strategy with specific settings
table_strategy = DefaultTableExtraction(
table_score_threshold=5, # Lower threshold for more permissive detection
min_rows=3, # Only extract tables with at least 3 rows
min_cols=2, # Only extract tables with at least 2 columns
verbose=True
)
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=table_strategy,
# Target specific tables using CSS selector
css_selector="div.main-content"
)
result = await crawler.arun(
"https://example.com/data",
config=config
)
if result.success:
print(f"Found {len(result.tables)} tables matching criteria")
for i, table in enumerate(result.tables):
print(f"\nTable {i+1}:")
print(f" Caption: {table.get('caption', 'No caption')}")
print(f" Size: {table['metadata']['row_count']} rows × {table['metadata']['column_count']} columns")
print(f" Has headers: {table['metadata']['has_headers']}")
async def example_disable_extraction():
"""Example 3: Disable table extraction when not needed."""
print("\n" + "="*50)
print("Example 3: Disable Table Extraction")
print("="*50)
async with AsyncWebCrawler() as crawler:
# Use NoTableExtraction to skip table processing entirely
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=NoTableExtraction() # No tables will be extracted
)
result = await crawler.arun(
"https://example.com",
config=config
)
if result.success:
print(f"Tables extracted: {len(result.tables)} (should be 0)")
print("Table extraction disabled - better performance for non-table content")
class FinancialTableExtraction(TableExtractionStrategy):
"""
Custom strategy for extracting financial tables with specific requirements.
"""
def __init__(self, currency_symbols=None, **kwargs):
super().__init__(**kwargs)
self.currency_symbols = currency_symbols or ['$', '', '£', '¥']
def extract_tables(self, element, **kwargs):
"""Extract only tables that appear to contain financial data."""
tables_data = []
for table in element.xpath(".//table"):
# Check if table contains currency symbols
table_text = ''.join(table.itertext())
has_currency = any(symbol in table_text for symbol in self.currency_symbols)
if not has_currency:
continue
# Extract using base logic (could reuse DefaultTableExtraction logic)
headers = []
rows = []
# Extract headers
for th in table.xpath(".//thead//th | .//tr[1]//th"):
headers.append(th.text_content().strip())
# Extract rows
for tr in table.xpath(".//tbody//tr | .//tr[position()>1]"):
row = []
for td in tr.xpath(".//td"):
cell_text = td.text_content().strip()
# Clean currency values
for symbol in self.currency_symbols:
cell_text = cell_text.replace(symbol, '')
row.append(cell_text)
if row:
rows.append(row)
if headers or rows:
tables_data.append({
"headers": headers,
"rows": rows,
"caption": table.xpath(".//caption/text()")[0] if table.xpath(".//caption") else "",
"summary": table.get("summary", ""),
"metadata": {
"type": "financial",
"has_currency": True,
"row_count": len(rows),
"column_count": len(headers) if headers else len(rows[0]) if rows else 0
}
})
return tables_data
async def example_custom_strategy():
"""Example 4: Custom table extraction strategy."""
print("\n" + "="*50)
print("Example 4: Custom Financial Table Strategy")
print("="*50)
async with AsyncWebCrawler() as crawler:
# Use custom strategy for financial tables
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=FinancialTableExtraction(
currency_symbols=['$', ''],
verbose=True
)
)
result = await crawler.arun(
"https://finance.yahoo.com/",
config=config
)
if result.success:
print(f"Found {len(result.tables)} financial tables")
for table in result.tables:
if table['metadata'].get('type') == 'financial':
print(f" ✓ Financial table with {table['metadata']['row_count']} rows")
async def example_combined_extraction():
"""Example 5: Combine table extraction with other strategies."""
print("\n" + "="*50)
print("Example 5: Combined Extraction Strategies")
print("="*50)
from crawl4ai import LLMExtractionStrategy, LLMConfig
async with AsyncWebCrawler() as crawler:
# Define schema for structured extraction
schema = {
"type": "object",
"properties": {
"page_title": {"type": "string"},
"main_topic": {"type": "string"},
"key_figures": {
"type": "array",
"items": {"type": "string"}
}
}
}
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
# Table extraction
table_extraction=DefaultTableExtraction(
table_score_threshold=6,
min_rows=2
),
# LLM extraction for structured data
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai"),
schema=schema
)
)
result = await crawler.arun(
"https://en.wikipedia.org/wiki/Economy_of_the_United_States",
config=config
)
if result.success:
print(f"Tables found: {len(result.tables)}")
# Tables are in result.tables
if result.tables:
print(f"First table has {len(result.tables[0]['rows'])} rows")
# Structured data is in result.extracted_content
if result.extracted_content:
import json
structured_data = json.loads(result.extracted_content)
print(f"Page title: {structured_data.get('page_title', 'N/A')}")
print(f"Main topic: {structured_data.get('main_topic', 'N/A')}")
async def main():
"""Run all examples."""
print("\n" + "="*60)
print("CRAWL4AI TABLE EXTRACTION EXAMPLES")
print("="*60)
# Run examples
await example_default_extraction()
await example_custom_configuration()
await example_disable_extraction()
await example_custom_strategy()
# await example_combined_extraction() # Requires OpenAI API key
print("\n" + "="*60)
print("EXAMPLES COMPLETED")
print("="*60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,22 +20,136 @@ Ever wondered why your AI coding assistant struggles with your library despite c
## Latest Release
### [Crawl4AI v0.7.4 The Intelligent Table Extraction & Performance Update](../blog/release-v0.7.4.md)
*August 17, 2025*
### [Crawl4AI v0.7.3 The Multi-Config Intelligence Update](releases/0.7.3.md)
*August 6, 2025*
Crawl4AI v0.7.4 introduces revolutionary LLM-powered table extraction with intelligent chunking, performance improvements for concurrent crawling, enhanced browser management, and critical stability fixes that make Crawl4AI more robust for production workloads.
Crawl4AI v0.7.3 brings smarter URL-specific configurations, flexible Docker deployments, and critical stability improvements. Configure different crawling strategies for different URL patterns in a single batch—perfect for mixed content sites with docs, blogs, and APIs.
Key highlights:
- **🚀 LLMTableExtraction**: Revolutionary table extraction with intelligent chunking for massive tables
- **⚡ Dispatcher Bug Fix**: Fixed sequential processing issue in arun_many for fast-completing tasks
- **🧹 Memory Management Refactor**: Streamlined memory utilities and better resource management
- **🔧 Browser Manager Fixes**: Resolved race conditions in concurrent page creation
- **🔗 Advanced URL Processing**: Better handling of raw URLs and base tag link resolution
- **Multi-URL Configurations**: Different strategies for different URL patterns in one crawl
- **Flexible Docker LLM Providers**: Configure providers via environment variables
- **Bug Fixes**: Critical stability improvements for production deployments
- **Documentation Updates**: Clearer examples and improved API documentation
[Read full release notes →](../blog/release-v0.7.4.md)
[Read full release notes →](releases/0.7.3.md)
---
## Previous Releases
### [Crawl4AI v0.7.0 The Adaptive Intelligence Update](releases/0.7.0.md)
*January 28, 2025*
Introduced groundbreaking intelligence features including Adaptive Crawling, Virtual Scroll support, intelligent Link Preview, and the Async URL Seeder for massive URL discovery.
[Read release notes →](releases/0.7.0.md)
### [Crawl4AI v0.6.0 World-Aware Crawling, Pre-Warmed Browsers, and the MCP API](releases/0.6.0.md)
*December 23, 2024*
Crawl4AI v0.6.0 brought major architectural upgrades including world-aware crawling (set geolocation, locale, and timezone), real-time traffic capture, and a memory-efficient crawler pool with pre-warmed pages.
The Docker server now exposes a full-featured MCP socket + SSE interface, supports streaming, and comes with a new Playground UI. Plus, table extraction is now native, and the new stress-test framework supports crawling 1,000+ URLs.
Other key changes:
* Native support for `result.media["tables"]` to export DataFrames
* Full network + console logs and MHTML snapshot per crawl
* Browser pooling and pre-warming for faster cold starts
* New streaming endpoints via MCP API and Playground
* Robots.txt support, proxy rotation, and improved session handling
* Deprecated old markdown names, legacy modules cleaned up
* Massive repo cleanup: ~36K insertions, ~5K deletions across 121 files
[Read full release notes →](releases/0.6.0.md)
---
### [Crawl4AI v0.5.0: Deep Crawling, Scalability, and a New CLI!](releases/0.5.0.md)
My dear friends and crawlers, there you go, this is the release of Crawl4AI v0.5.0! This release brings a wealth of new features, performance improvements, and a more streamlined developer experience. Here's a breakdown of what's new:
**Major New Features:**
* **Deep Crawling:** Explore entire websites with configurable strategies (BFS, DFS, Best-First). Define custom filters and URL scoring for targeted crawls.
* **Memory-Adaptive Dispatcher:** Handle large-scale crawls with ease! Our new dispatcher dynamically adjusts concurrency based on available memory and includes built-in rate limiting.
* **Multiple Crawler Strategies:** Choose between the full-featured Playwright browser-based crawler or a new, *much* faster HTTP-only crawler for simpler tasks.
* **Docker Deployment:** Deploy Crawl4AI as a scalable, self-contained service with built-in API endpoints and optional JWT authentication.
* **Command-Line Interface (CLI):** Interact with Crawl4AI directly from your terminal. Crawl, configure, and extract data with simple commands.
* **LLM Configuration (`LLMConfig`):** A new, unified way to configure LLM providers (OpenAI, Anthropic, Ollama, etc.) for extraction, filtering, and schema generation. Simplifies API key management and switching between models.
**Minor Updates & Improvements:**
* **LXML Scraping Mode:** Faster HTML parsing with `LXMLWebScrapingStrategy`.
* **Proxy Rotation:** Added `ProxyRotationStrategy` with a `RoundRobinProxyStrategy` implementation.
* **PDF Processing:** Extract text, images, and metadata from PDF files.
* **URL Redirection Tracking:** Automatically follows and records redirects.
* **Robots.txt Compliance:** Optionally respect website crawling rules.
* **LLM-Powered Schema Generation:** Automatically create extraction schemas using an LLM.
* **`LLMContentFilter`:** Generate high-quality, focused markdown using an LLM.
* **Improved Error Handling & Stability:** Numerous bug fixes and performance enhancements.
* **Enhanced Documentation:** Updated guides and examples.
**Breaking Changes & Migration:**
This release includes several breaking changes to improve the library's structure and consistency. Here's what you need to know:
* **`arun_many()` Behavior:** Now uses the `MemoryAdaptiveDispatcher` by default. The return type depends on the `stream` parameter in `CrawlerRunConfig`. Adjust code that relied on unbounded concurrency.
* **`max_depth` Location:** Moved to `CrawlerRunConfig` and now controls *crawl depth*.
* **Deep Crawling Imports:** Import `DeepCrawlStrategy` and related classes from `crawl4ai.deep_crawling`.
* **`BrowserContext` API:** Updated; the old `get_context` method is deprecated.
* **Optional Model Fields:** Many data model fields are now optional. Handle potential `None` values.
* **`ScrapingMode` Enum:** Replaced with strategy pattern (`WebScrapingStrategy`, `LXMLWebScrapingStrategy`).
* **`content_filter` Parameter:** Removed from `CrawlerRunConfig`. Use extraction strategies or markdown generators with filters.
* **Removed Functionality:** The synchronous `WebCrawler`, the old CLI, and docs management tools have been removed.
* **Docker:** Significant changes to deployment. See the [Docker documentation](../deploy/docker/README.md).
* **`ssl_certificate.json`:** This file has been removed.
* **Config**: FastFilterChain has been replaced with FilterChain
* **Deep-Crawl**: DeepCrawlStrategy.arun now returns Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
* **Proxy**: Removed synchronous WebCrawler support and related rate limiting configurations
* **LLM Parameters:** Use the new `LLMConfig` object instead of passing `provider`, `api_token`, `base_url`, and `api_base` directly to `LLMExtractionStrategy` and `LLMContentFilter`.
**In short:** Update imports, adjust `arun_many()` usage, check for optional fields, and review the Docker deployment guide.
## License Change
Crawl4AI v0.5.0 updates the license to Apache 2.0 *with a required attribution clause*. This means you are free to use, modify, and distribute Crawl4AI (even commercially), but you *must* clearly attribute the project in any public use or distribution. See the updated `LICENSE` file for the full legal text and specific requirements.
**Get Started:**
* **Installation:** `pip install "crawl4ai[all]"` (or use the Docker image)
* **Documentation:** [https://docs.crawl4ai.com](https://docs.crawl4ai.com)
* **GitHub:** [https://github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
I'm very excited to see what you build with Crawl4AI v0.5.0!
---
### [0.4.2 - Configurable Crawlers, Session Management, and Smarter Screenshots](releases/0.4.2.md)
*December 12, 2024*
The 0.4.2 update brings massive improvements to configuration, making crawlers and browsers easier to manage with dedicated objects. You can now import/export local storage for seamless session management. Plus, long-page screenshots are faster and cleaner, and full-page PDF exports are now possible. Check out all the new features to make your crawling experience even smoother.
[Read full release notes →](releases/0.4.2.md)
---
### [0.4.1 - Smarter Crawling with Lazy-Load Handling, Text-Only Mode, and More](releases/0.4.1.md)
*December 8, 2024*
This release brings major improvements to handling lazy-loaded images, a blazing-fast Text-Only Mode, full-page scanning for infinite scrolls, dynamic viewport adjustments, and session reuse for efficient crawling. If you're looking to improve speed, reliability, or handle dynamic content with ease, this update has you covered.
[Read full release notes →](releases/0.4.1.md)
---
### [0.4.0 - Major Content Filtering Update](releases/0.4.0.md)
*December 1, 2024*
Introduced significant improvements to content filtering, multi-threaded environment handling, and user-agent generation. This release features the new PruningContentFilter, enhanced thread safety, and improved test coverage.
[Read full release notes →](releases/0.4.0.md)
## Project History
Curious about how Crawl4AI has evolved? Check out our [complete changelog](https://github.com/unclecode/crawl4ai/blob/main/CHANGELOG.md) for a detailed history of all versions and updates.

View File

@@ -1,807 +0,0 @@
# Table Extraction Strategies
## Overview
**New in v0.7.3+**: Table extraction now follows the **Strategy Design Pattern**, providing unprecedented flexibility and power for handling different table structures. Don't worry - **your existing code still works!** We maintain full backward compatibility while offering new capabilities.
### What's Changed?
- **Architecture**: Table extraction now uses pluggable strategies
- **Backward Compatible**: Your existing code with `table_score_threshold` continues to work
- **More Power**: Choose from multiple strategies or create your own
- **Same Default Behavior**: By default, uses `DefaultTableExtraction` (same as before)
### Key Points
**Old code still works** - No breaking changes
**Same default behavior** - Uses the proven extraction algorithm
**New capabilities** - Add LLM extraction or custom strategies when needed
**Strategy pattern** - Clean, extensible architecture
## Quick Start
### The Simplest Way (Works Like Before)
If you're already using Crawl4AI, nothing changes:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async def extract_tables():
async with AsyncWebCrawler() as crawler:
# This works exactly like before - uses DefaultTableExtraction internally
result = await crawler.arun("https://example.com/data")
# Tables are automatically extracted and available in result.tables
for table in result.tables:
print(f"Table with {len(table['rows'])} rows and {len(table['headers'])} columns")
print(f"Headers: {table['headers']}")
print(f"First row: {table['rows'][0] if table['rows'] else 'No data'}")
asyncio.run(extract_tables())
```
### Using the Old Configuration (Still Supported)
Your existing code with `table_score_threshold` continues to work:
```python
# This old approach STILL WORKS - we maintain backward compatibility
config = CrawlerRunConfig(
table_score_threshold=7 # Internally creates DefaultTableExtraction(table_score_threshold=7)
)
result = await crawler.arun(url, config)
```
## Table Extraction Strategies
### Understanding the Strategy Pattern
The strategy pattern allows you to choose different table extraction algorithms at runtime. Think of it as having different tools in a toolbox - you pick the right one for the job:
- **No explicit strategy?** → Uses `DefaultTableExtraction` automatically (same as v0.7.2 and earlier)
- **Need complex table handling?** → Choose `LLMTableExtraction` (costs money, use sparingly)
- **Want to disable tables?** → Use `NoTableExtraction`
- **Have special requirements?** → Create a custom strategy
### Available Strategies
| Strategy | Description | Use Case | Cost | When to Use |
|----------|-------------|----------|------|-------------|
| `DefaultTableExtraction` | **RECOMMENDED**: Same algorithm as before v0.7.3 | General purpose (default) | Free | **Use this first - handles 95% of cases** |
| `LLMTableExtraction` | AI-powered extraction for complex tables | Tables with complex rowspan/colspan | **$$$ Per API call** | Only when DefaultTableExtraction fails |
| `NoTableExtraction` | Disables table extraction | When tables aren't needed | Free | For text-only extraction |
| Custom strategies | User-defined extraction logic | Specialized requirements | Free | Domain-specific needs |
> **⚠️ CRITICAL COST WARNING for LLMTableExtraction**:
>
> **DO NOT USE `LLMTableExtraction` UNLESS ABSOLUTELY NECESSARY!**
>
> - **Always try `DefaultTableExtraction` first** - It's free and handles most tables perfectly
> - LLM extraction **costs money** with every API call
> - For large tables (100+ rows), LLM extraction can be **very slow**
> - **For large tables**: If you must use LLM, choose fast providers:
> - ✅ **Groq** (fastest inference)
> - ✅ **Cerebras** (optimized for speed)
> - ⚠️ Avoid: OpenAI, Anthropic for large tables (slower)
>
> **🚧 WORK IN PROGRESS**:
> We are actively developing an **advanced non-LLM algorithm** that will handle complex table structures (rowspan, colspan, nested tables) for **FREE**. This will replace the need for costly LLM extraction in most cases. Coming soon!
### DefaultTableExtraction
The default strategy uses a sophisticated scoring system to identify data tables:
```python
from crawl4ai import DefaultTableExtraction, CrawlerRunConfig
# Customize the default extraction
table_strategy = DefaultTableExtraction(
table_score_threshold=7, # Scoring threshold (default: 7)
min_rows=2, # Minimum rows required
min_cols=2, # Minimum columns required
verbose=True # Enable detailed logging
)
config = CrawlerRunConfig(
table_extraction=table_strategy
)
```
#### Scoring System
The scoring system evaluates multiple factors:
| Factor | Score Impact | Description |
|--------|--------------|-------------|
| Has `<thead>` | +2 | Semantic table structure |
| Has `<tbody>` | +1 | Organized table body |
| Has `<th>` elements | +2 | Header cells present |
| Headers in correct position | +1 | Proper semantic structure |
| Consistent column count | +2 | Regular data structure |
| Has caption | +2 | Descriptive caption |
| Has summary | +1 | Summary attribute |
| High text density | +2 to +3 | Content-rich cells |
| Data attributes | +0.5 each | Data-* attributes |
| Nested tables | -3 | Often indicates layout |
| Role="presentation" | -3 | Explicitly non-data |
| Too few rows | -2 | Insufficient data |
### LLMTableExtraction (Use Sparingly!)
**⚠️ WARNING**: Only use this when `DefaultTableExtraction` fails with complex tables!
LLMTableExtraction uses AI to understand complex table structures that traditional parsers struggle with. It automatically handles large tables through intelligent chunking and parallel processing:
```python
from crawl4ai import LLMTableExtraction, LLMConfig, CrawlerRunConfig
# Configure LLM (costs money per call!)
llm_config = LLMConfig(
provider="groq/llama-3.3-70b-versatile", # Fast provider for large tables
api_token="your_api_key",
temperature=0.1
)
# Create LLM extraction strategy with smart chunking
table_strategy = LLMTableExtraction(
llm_config=llm_config,
max_tries=3, # Retry up to 3 times if extraction fails
css_selector="table", # Optional: focus on specific tables
enable_chunking=True, # Automatically chunk large tables (default: True)
chunk_token_threshold=3000, # Split tables larger than this (default: 3000 tokens)
min_rows_per_chunk=10, # Minimum rows per chunk (default: 10)
max_parallel_chunks=5, # Process up to 5 chunks in parallel (default: 5)
verbose=True
)
config = CrawlerRunConfig(
table_extraction=table_strategy
)
result = await crawler.arun(url, config)
```
#### When to Use LLMTableExtraction
**Use ONLY when**:
- Tables have complex merged cells (rowspan/colspan) that break DefaultTableExtraction
- Nested tables that need semantic understanding
- Tables with irregular structures
- You've tried DefaultTableExtraction and it failed
**Never use when**:
- DefaultTableExtraction works (99% of cases)
- Tables are simple or well-structured
- You're processing many pages (costs add up!)
- Tables have 100+ rows (very slow)
#### How Smart Chunking Works
LLMTableExtraction automatically handles large tables through intelligent chunking:
1. **Automatic Detection**: Tables exceeding the token threshold are automatically split
2. **Smart Splitting**: Chunks are created at row boundaries, preserving table structure
3. **Header Preservation**: Each chunk includes the original headers for context
4. **Parallel Processing**: Multiple chunks are processed simultaneously for speed
5. **Intelligent Merging**: Results are merged back into a single, complete table
**Chunking Parameters**:
- `enable_chunking` (default: `True`): Automatically handle large tables
- `chunk_token_threshold` (default: `3000`): When to split tables
- `min_rows_per_chunk` (default: `10`): Ensures meaningful chunk sizes
- `max_parallel_chunks` (default: `5`): Concurrent processing for speed
The chunking is completely transparent - you get the same output format whether the table was processed in one piece or multiple chunks.
#### Performance Optimization for LLMTableExtraction
**Provider Recommendations by Table Size**:
| Table Size | Recommended Providers | Why |
|------------|----------------------|-----|
| Small (<50 rows) | Any provider | Fast enough |
| Medium (50-200 rows) | Groq, Cerebras | Optimized inference |
| Large (200+ rows) | **Groq** (best), Cerebras | Fastest inference + automatic chunking |
| Very Large (500+ rows) | Groq with chunking | Parallel processing keeps it fast |
### NoTableExtraction
Disable table extraction for better performance when tables aren't needed:
```python
from crawl4ai import NoTableExtraction, CrawlerRunConfig
config = CrawlerRunConfig(
table_extraction=NoTableExtraction()
)
# Tables won't be extracted, improving performance
result = await crawler.arun(url, config)
assert len(result.tables) == 0
```
## Extracted Table Structure
Each extracted table contains:
```python
{
"headers": ["Column 1", "Column 2", ...], # Column headers
"rows": [ # Data rows
["Row 1 Col 1", "Row 1 Col 2", ...],
["Row 2 Col 1", "Row 2 Col 2", ...],
],
"caption": "Table Caption", # If present
"summary": "Table Summary", # If present
"metadata": {
"row_count": 10, # Number of rows
"column_count": 3, # Number of columns
"has_headers": True, # Headers detected
"has_caption": True, # Caption exists
"has_summary": False, # Summary exists
"id": "data-table-1", # Table ID if present
"class": "financial-data" # Table class if present
}
}
```
## Configuration Options
### Basic Configuration
```python
config = CrawlerRunConfig(
# Table extraction settings
table_score_threshold=7, # Default threshold (backward compatible)
table_extraction=strategy, # Optional: custom strategy
# Filter what to process
css_selector="main", # Focus on specific area
excluded_tags=["nav", "aside"] # Exclude page sections
)
```
### Advanced Configuration
```python
from crawl4ai import DefaultTableExtraction, CrawlerRunConfig
# Fine-tuned extraction
strategy = DefaultTableExtraction(
table_score_threshold=5, # Lower = more permissive
min_rows=3, # Require at least 3 rows
min_cols=2, # Require at least 2 columns
verbose=True # Detailed logging
)
config = CrawlerRunConfig(
table_extraction=strategy,
css_selector="article.content", # Target specific content
exclude_domains=["ads.com"], # Exclude ad domains
cache_mode=CacheMode.BYPASS # Fresh extraction
)
```
## Working with Extracted Tables
### Convert to Pandas DataFrame
```python
import pandas as pd
async def tables_to_dataframes(url):
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url)
dataframes = []
for table_data in result.tables:
# Create DataFrame
if table_data['headers']:
df = pd.DataFrame(
table_data['rows'],
columns=table_data['headers']
)
else:
df = pd.DataFrame(table_data['rows'])
# Add metadata as DataFrame attributes
df.attrs['caption'] = table_data.get('caption', '')
df.attrs['metadata'] = table_data.get('metadata', {})
dataframes.append(df)
return dataframes
```
### Filter Tables by Criteria
```python
async def extract_large_tables(url):
async with AsyncWebCrawler() as crawler:
# Configure minimum size requirements
strategy = DefaultTableExtraction(
min_rows=10,
min_cols=3,
table_score_threshold=6
)
config = CrawlerRunConfig(
table_extraction=strategy
)
result = await crawler.arun(url, config)
# Further filter results
large_tables = [
table for table in result.tables
if table['metadata']['row_count'] > 10
and table['metadata']['column_count'] > 3
]
return large_tables
```
### Export Tables to Different Formats
```python
import json
import csv
async def export_tables(url):
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url)
for i, table in enumerate(result.tables):
# Export as JSON
with open(f'table_{i}.json', 'w') as f:
json.dump(table, f, indent=2)
# Export as CSV
with open(f'table_{i}.csv', 'w', newline='') as f:
writer = csv.writer(f)
if table['headers']:
writer.writerow(table['headers'])
writer.writerows(table['rows'])
# Export as Markdown
with open(f'table_{i}.md', 'w') as f:
# Write headers
if table['headers']:
f.write('| ' + ' | '.join(table['headers']) + ' |\n')
f.write('|' + '---|' * len(table['headers']) + '\n')
# Write rows
for row in table['rows']:
f.write('| ' + ' | '.join(str(cell) for cell in row) + ' |\n')
```
## Creating Custom Strategies
Extend `TableExtractionStrategy` to create custom extraction logic:
### Example: Financial Table Extractor
```python
from crawl4ai import TableExtractionStrategy
from typing import List, Dict, Any
import re
class FinancialTableExtractor(TableExtractionStrategy):
"""Extract tables containing financial data."""
def __init__(self, currency_symbols=None, require_numbers=True, **kwargs):
super().__init__(**kwargs)
self.currency_symbols = currency_symbols or ['$', '', '£', '¥']
self.require_numbers = require_numbers
self.number_pattern = re.compile(r'\d+[,.]?\d*')
def extract_tables(self, element, **kwargs):
tables_data = []
for table in element.xpath(".//table"):
# Check if table contains financial indicators
table_text = ''.join(table.itertext())
# Must contain currency symbols
has_currency = any(sym in table_text for sym in self.currency_symbols)
if not has_currency:
continue
# Must contain numbers if required
if self.require_numbers:
numbers = self.number_pattern.findall(table_text)
if len(numbers) < 3: # Arbitrary minimum
continue
# Extract the table data
table_data = self._extract_financial_data(table)
if table_data:
tables_data.append(table_data)
return tables_data
def _extract_financial_data(self, table):
"""Extract and clean financial data from table."""
headers = []
rows = []
# Extract headers
for th in table.xpath(".//thead//th | .//tr[1]//th"):
headers.append(th.text_content().strip())
# Extract and clean rows
for tr in table.xpath(".//tbody//tr | .//tr[position()>1]"):
row = []
for td in tr.xpath(".//td"):
text = td.text_content().strip()
# Clean currency formatting
text = re.sub(r'[$€£¥,]', '', text)
row.append(text)
if row:
rows.append(row)
return {
"headers": headers,
"rows": rows,
"caption": self._get_caption(table),
"summary": table.get("summary", ""),
"metadata": {
"type": "financial",
"row_count": len(rows),
"column_count": len(headers) or len(rows[0]) if rows else 0
}
}
def _get_caption(self, table):
caption = table.xpath(".//caption/text()")
return caption[0].strip() if caption else ""
# Usage
strategy = FinancialTableExtractor(
currency_symbols=['$', 'EUR'],
require_numbers=True
)
config = CrawlerRunConfig(
table_extraction=strategy
)
```
### Example: Specific Table Extractor
```python
class SpecificTableExtractor(TableExtractionStrategy):
"""Extract only tables matching specific criteria."""
def __init__(self,
required_headers=None,
id_pattern=None,
class_pattern=None,
**kwargs):
super().__init__(**kwargs)
self.required_headers = required_headers or []
self.id_pattern = id_pattern
self.class_pattern = class_pattern
def extract_tables(self, element, **kwargs):
tables_data = []
for table in element.xpath(".//table"):
# Check ID pattern
if self.id_pattern:
table_id = table.get('id', '')
if not re.match(self.id_pattern, table_id):
continue
# Check class pattern
if self.class_pattern:
table_class = table.get('class', '')
if not re.match(self.class_pattern, table_class):
continue
# Extract headers to check requirements
headers = self._extract_headers(table)
# Check if required headers are present
if self.required_headers:
if not all(req in headers for req in self.required_headers):
continue
# Extract full table data
table_data = self._extract_table_data(table, headers)
tables_data.append(table_data)
return tables_data
```
## Combining with Other Strategies
Table extraction works seamlessly with other Crawl4AI strategies:
```python
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
DefaultTableExtraction,
LLMExtractionStrategy,
JsonCssExtractionStrategy
)
async def combined_extraction(url):
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
# Table extraction
table_extraction=DefaultTableExtraction(
table_score_threshold=6,
min_rows=2
),
# CSS-based extraction for specific elements
extraction_strategy=JsonCssExtractionStrategy({
"title": "h1",
"summary": "p.summary",
"date": "time"
}),
# Focus on main content
css_selector="main.content"
)
result = await crawler.arun(url, config)
# Access different extraction results
tables = result.tables # Table data
structured = json.loads(result.extracted_content) # CSS extraction
return {
"tables": tables,
"structured_data": structured,
"markdown": result.markdown
}
```
## Performance Considerations
### Optimization Tips
1. **Disable when not needed**: Use `NoTableExtraction` if tables aren't required
2. **Target specific areas**: Use `css_selector` to limit processing scope
3. **Set minimum thresholds**: Filter out small/irrelevant tables early
4. **Cache results**: Use appropriate cache modes for repeated extractions
```python
# Optimized configuration for large pages
config = CrawlerRunConfig(
# Only process main content area
css_selector="article.main-content",
# Exclude navigation and sidebars
excluded_tags=["nav", "aside", "footer"],
# Higher threshold for stricter filtering
table_extraction=DefaultTableExtraction(
table_score_threshold=8,
min_rows=5,
min_cols=3
),
# Enable caching for repeated access
cache_mode=CacheMode.ENABLED
)
```
## Migration Guide
### Important: Your Code Still Works!
**No changes required!** The transition to the strategy pattern is **fully backward compatible**.
### How It Works Internally
#### v0.7.2 and Earlier
```python
# Old way - directly passing table_score_threshold
config = CrawlerRunConfig(
table_score_threshold=7
)
# Internally: No strategy pattern, direct implementation
```
#### v0.7.3+ (Current)
```python
# Old way STILL WORKS - we handle it internally
config = CrawlerRunConfig(
table_score_threshold=7
)
# Internally: Automatically creates DefaultTableExtraction(table_score_threshold=7)
```
### Taking Advantage of New Features
While your old code works, you can now use the strategy pattern for more control:
```python
# Option 1: Keep using the old way (perfectly fine!)
config = CrawlerRunConfig(
table_score_threshold=7 # Still supported
)
# Option 2: Use the new strategy pattern (more flexibility)
from crawl4ai import DefaultTableExtraction
strategy = DefaultTableExtraction(
table_score_threshold=7,
min_rows=2, # New capability!
min_cols=2 # New capability!
)
config = CrawlerRunConfig(
table_extraction=strategy
)
# Option 3: Use advanced strategies when needed
from crawl4ai import LLMTableExtraction, LLMConfig
# Only for complex tables that DefaultTableExtraction can't handle
# Automatically handles large tables with smart chunking
llm_strategy = LLMTableExtraction(
llm_config=LLMConfig(
provider="groq/llama-3.3-70b-versatile",
api_token="your_key"
),
max_tries=3,
enable_chunking=True, # Automatically chunk large tables
chunk_token_threshold=3000, # Chunk when exceeding 3000 tokens
max_parallel_chunks=5 # Process up to 5 chunks in parallel
)
config = CrawlerRunConfig(
table_extraction=llm_strategy # Advanced extraction with automatic chunking
)
```
### Summary
-**No breaking changes** - Old code works as-is
-**Same defaults** - DefaultTableExtraction is automatically used
-**Gradual adoption** - Use new features when you need them
-**Full compatibility** - result.tables structure unchanged
## Best Practices
### 1. Choose the Right Strategy (Cost-Conscious Approach)
**Decision Flow**:
```
1. Do you need tables?
→ No: Use NoTableExtraction
→ Yes: Continue to #2
2. Try DefaultTableExtraction first (FREE)
→ Works? Done! ✅
→ Fails? Continue to #3
3. Is the table critical and complex?
→ No: Accept DefaultTableExtraction results
→ Yes: Continue to #4
4. Use LLMTableExtraction (COSTS MONEY)
→ Small table (<50 rows): Any LLM provider
→ Large table (50+ rows): Use Groq or Cerebras
→ Very large (500+ rows): Reconsider - maybe chunk the page
```
**Strategy Selection Guide**:
- **DefaultTableExtraction**: Use for 99% of cases - it's free and effective
- **LLMTableExtraction**: Only for complex tables with merged cells that break DefaultTableExtraction
- **NoTableExtraction**: When you only need text/markdown content
- **Custom Strategy**: For specialized requirements (financial, scientific, etc.)
### 2. Validate Extracted Data
```python
def validate_table(table):
"""Validate table data quality."""
# Check structure
if not table.get('rows'):
return False
# Check consistency
if table.get('headers'):
expected_cols = len(table['headers'])
for row in table['rows']:
if len(row) != expected_cols:
return False
# Check minimum content
total_cells = sum(len(row) for row in table['rows'])
non_empty = sum(1 for row in table['rows']
for cell in row if cell.strip())
if non_empty / total_cells < 0.5: # Less than 50% non-empty
return False
return True
# Filter valid tables
valid_tables = [t for t in result.tables if validate_table(t)]
```
### 3. Handle Edge Cases
```python
async def robust_table_extraction(url):
"""Extract tables with error handling."""
async with AsyncWebCrawler() as crawler:
try:
config = CrawlerRunConfig(
table_extraction=DefaultTableExtraction(
table_score_threshold=6,
verbose=True
)
)
result = await crawler.arun(url, config)
if not result.success:
print(f"Crawl failed: {result.error}")
return []
# Process tables safely
processed_tables = []
for table in result.tables:
try:
# Validate and process
if validate_table(table):
processed_tables.append(table)
except Exception as e:
print(f"Error processing table: {e}")
continue
return processed_tables
except Exception as e:
print(f"Extraction error: {e}")
return []
```
## Troubleshooting
### Common Issues and Solutions
| Issue | Cause | Solution |
|-------|-------|----------|
| No tables extracted | Score too high | Lower `table_score_threshold` |
| Layout tables included | Score too low | Increase `table_score_threshold` |
| Missing tables | CSS selector too specific | Broaden or remove `css_selector` |
| Incomplete data | Complex table structure | Create custom strategy |
| Performance issues | Processing entire page | Use `css_selector` to limit scope |
### Debug Logging
Enable verbose logging to understand extraction decisions:
```python
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG)
# Enable verbose mode in strategy
strategy = DefaultTableExtraction(
table_score_threshold=7,
verbose=True # Detailed extraction logs
)
config = CrawlerRunConfig(
table_extraction=strategy,
verbose=True # General crawler logs
)
```
## See Also
- [Extraction Strategies](extraction-strategies.md) - Overview of all extraction strategies
- [Content Selection](content-selection.md) - Using CSS selectors and filters
- [Performance Optimization](../optimization/performance-tuning.md) - Speed up extraction
- [Examples](../examples/table_extraction_example.py) - Complete working examples

View File

@@ -1,242 +0,0 @@
# Telemetry
Crawl4AI includes **opt-in telemetry** to help improve stability by capturing anonymous crash reports. No personal data or crawled content is ever collected.
!!! info "Privacy First"
Telemetry is completely optional and respects your privacy. Only exception information is collected - no URLs, no personal data, no crawled content.
## Overview
- **Privacy-first**: Only exceptions and crashes are reported
- **Opt-in by default**: You control when telemetry is enabled (except in Docker where it's on by default)
- **No PII**: No URLs, request data, or personal information is collected
- **Provider-agnostic**: Currently uses Sentry, but designed to support multiple backends
## Installation
Telemetry requires the optional Sentry SDK:
```bash
# Install with telemetry support
pip install crawl4ai[telemetry]
# Or install Sentry SDK separately
pip install sentry-sdk>=2.0.0
```
## Environments
### 1. Python Library & CLI
On first exception, you'll see an interactive prompt:
```
🚨 Crawl4AI Error Detection
==============================================================
We noticed an error occurred. Help improve Crawl4AI by
sending anonymous crash reports?
[1] Yes, send this error only
[2] Yes, always send errors
[3] No, don't send
Your choice (1/2/3):
```
Control via CLI:
```bash
# Enable telemetry
crwl telemetry enable
crwl telemetry enable --email you@example.com
# Disable telemetry
crwl telemetry disable
# Check status
crwl telemetry status
```
### 2. Docker / API Server
!!! warning "Default Enabled in Docker"
Telemetry is **enabled by default** in Docker environments to help identify container-specific issues. This is different from the CLI where it's opt-in.
To disable:
```bash
# Via environment variable
docker run -e CRAWL4AI_TELEMETRY=0 ...
# In docker-compose.yml
environment:
- CRAWL4AI_TELEMETRY=0
```
### 3. Jupyter / Google Colab
In notebooks, you'll see an interactive widget (if available) or a code snippet:
```python
import crawl4ai
# Enable telemetry
crawl4ai.telemetry.enable(email="you@example.com", always=True)
# Send only next error
crawl4ai.telemetry.enable(once=True)
# Disable telemetry
crawl4ai.telemetry.disable()
# Check status
crawl4ai.telemetry.status()
```
## Python API
### Basic Usage
```python
from crawl4ai import telemetry
# Enable/disable telemetry
telemetry.enable(email="optional@email.com", always=True)
telemetry.disable()
# Check current status
status = telemetry.status()
print(f"Telemetry enabled: {status['enabled']}")
print(f"Consent: {status['consent']}")
```
### Manual Exception Capture
```python
from crawl4ai.telemetry import capture_exception
try:
# Your code here
risky_operation()
except Exception as e:
# Manually capture exception with context
capture_exception(e, {
'operation': 'custom_crawler',
'url': 'https://example.com' # Will be sanitized
})
raise
```
### Decorator Pattern
```python
from crawl4ai.telemetry import telemetry_decorator
@telemetry_decorator
def my_crawler_function():
# Exceptions will be automatically captured
pass
```
### Context Manager
```python
from crawl4ai.telemetry import telemetry_context
with telemetry_context("data_extraction"):
# Any exceptions in this block will be captured
result = extract_data(html)
```
## Configuration
Settings are stored in `~/.crawl4ai/config.json`:
```json
{
"telemetry": {
"consent": "always",
"email": "user@example.com"
}
}
```
Consent levels:
- `"not_set"` - No decision made yet
- `"denied"` - Telemetry disabled
- `"once"` - Send current error only
- `"always"` - Always send errors
## Environment Variables
- `CRAWL4AI_TELEMETRY=0` - Disable telemetry (overrides config)
- `CRAWL4AI_TELEMETRY_EMAIL=email@example.com` - Set email for follow-up
- `CRAWL4AI_SENTRY_DSN=https://...` - Override default DSN (for maintainers)
## What's Collected
### Collected ✅
- Exception type and traceback
- Crawl4AI version
- Python version
- Operating system
- Environment type (CLI, Docker, Jupyter)
- Optional email (if provided)
### NOT Collected ❌
- URLs being crawled
- HTML content
- Request/response data
- Cookies or authentication tokens
- IP addresses
- Any personally identifiable information
## Provider Architecture
Telemetry is designed to be provider-agnostic:
```python
from crawl4ai.telemetry.base import TelemetryProvider
class CustomProvider(TelemetryProvider):
def send_exception(self, exc, context=None):
# Your implementation
pass
```
## FAQ
### Q: Can I completely disable telemetry?
A: Yes! Use `crwl telemetry disable` or set `CRAWL4AI_TELEMETRY=0`
### Q: Is telemetry required?
A: No, it's completely optional (except enabled by default in Docker)
### Q: What if I don't install sentry-sdk?
A: Telemetry will gracefully degrade to a no-op state
### Q: Can I see what's being sent?
A: Yes, check the source code in `crawl4ai/telemetry/`
### Q: How do I remove my email?
A: Delete `~/.crawl4ai/config.json` or edit it to remove the email field
## Privacy Commitment
1. **Transparency**: All telemetry code is open source
2. **Control**: You can enable/disable at any time
3. **Minimal**: Only crash data, no user content
4. **Secure**: Data transmitted over HTTPS to Sentry
5. **Anonymous**: No tracking or user identification
## Contributing
Help improve telemetry:
- Report issues with telemetry itself
- Suggest privacy improvements
- Add new provider backends
## Support
If you have concerns about telemetry:
- Open an issue on GitHub
- Email the maintainers
- Review the code in `crawl4ai/telemetry/`

View File

@@ -1,376 +0,0 @@
# Migration Guide: Table Extraction v0.7.3
## Overview
Version 0.7.3 introduces the **Table Extraction Strategy Pattern**, providing a more flexible and extensible approach to table extraction while maintaining full backward compatibility.
## What's New
### Strategy Pattern Implementation
Table extraction now follows the same strategy pattern used throughout Crawl4AI:
- **Consistent Architecture**: Aligns with extraction, chunking, and markdown strategies
- **Extensibility**: Easy to create custom table extraction strategies
- **Better Separation**: Table logic moved from content scraping to dedicated module
- **Full Control**: Fine-grained control over table detection and extraction
### New Classes
```python
from crawl4ai import (
TableExtractionStrategy, # Abstract base class
DefaultTableExtraction, # Current implementation (default)
NoTableExtraction # Explicitly disable extraction
)
```
## Backward Compatibility
**✅ All existing code continues to work without changes.**
### No Changes Required
If your code looks like this, it will continue to work:
```python
# This still works exactly the same
config = CrawlerRunConfig(
table_score_threshold=7
)
result = await crawler.arun(url, config)
tables = result.tables # Same structure, same data
```
### What Happens Behind the Scenes
When you don't specify a `table_extraction` strategy:
1. `CrawlerRunConfig` automatically creates `DefaultTableExtraction`
2. It uses your `table_score_threshold` parameter
3. Tables are extracted exactly as before
4. Results appear in `result.tables` with the same structure
## New Capabilities
### 1. Explicit Strategy Configuration
You can now explicitly configure table extraction:
```python
# New: Explicit control
strategy = DefaultTableExtraction(
table_score_threshold=7,
min_rows=2, # New: minimum row filter
min_cols=2, # New: minimum column filter
verbose=True # New: detailed logging
)
config = CrawlerRunConfig(
table_extraction=strategy
)
```
### 2. Disable Table Extraction
Improve performance when tables aren't needed:
```python
# New: Skip table extraction entirely
config = CrawlerRunConfig(
table_extraction=NoTableExtraction()
)
# No CPU cycles spent on table detection/extraction
```
### 3. Custom Extraction Strategies
Create specialized extractors:
```python
class MyTableExtractor(TableExtractionStrategy):
def extract_tables(self, element, **kwargs):
# Custom extraction logic
return custom_tables
config = CrawlerRunConfig(
table_extraction=MyTableExtractor()
)
```
## Migration Scenarios
### Scenario 1: Basic Usage (No Changes Needed)
**Before (v0.7.2):**
```python
config = CrawlerRunConfig()
result = await crawler.arun(url, config)
for table in result.tables:
print(table['headers'])
```
**After (v0.7.3):**
```python
# Exactly the same - no changes required
config = CrawlerRunConfig()
result = await crawler.arun(url, config)
for table in result.tables:
print(table['headers'])
```
### Scenario 2: Custom Threshold (No Changes Needed)
**Before (v0.7.2):**
```python
config = CrawlerRunConfig(
table_score_threshold=5
)
```
**After (v0.7.3):**
```python
# Still works the same
config = CrawlerRunConfig(
table_score_threshold=5
)
# Or use new explicit approach for more control
strategy = DefaultTableExtraction(
table_score_threshold=5,
min_rows=2 # Additional filtering
)
config = CrawlerRunConfig(
table_extraction=strategy
)
```
### Scenario 3: Advanced Filtering (New Feature)
**Before (v0.7.2):**
```python
# Had to filter after extraction
config = CrawlerRunConfig(
table_score_threshold=5
)
result = await crawler.arun(url, config)
# Manual filtering
large_tables = [
t for t in result.tables
if len(t['rows']) >= 5 and len(t['headers']) >= 3
]
```
**After (v0.7.3):**
```python
# Filter during extraction (more efficient)
strategy = DefaultTableExtraction(
table_score_threshold=5,
min_rows=5,
min_cols=3
)
config = CrawlerRunConfig(
table_extraction=strategy
)
result = await crawler.arun(url, config)
# result.tables already filtered
```
## Code Organization Changes
### Module Structure
**Before (v0.7.2):**
```
crawl4ai/
content_scraping_strategy.py
- LXMLWebScrapingStrategy
- is_data_table() # Table detection
- extract_table_data() # Table extraction
```
**After (v0.7.3):**
```
crawl4ai/
content_scraping_strategy.py
- LXMLWebScrapingStrategy
# Table methods removed, uses strategy
table_extraction.py (NEW)
- TableExtractionStrategy # Base class
- DefaultTableExtraction # Moved logic here
- NoTableExtraction # New option
```
### Import Changes
**New imports available (optional):**
```python
# These are now available but not required for existing code
from crawl4ai import (
TableExtractionStrategy,
DefaultTableExtraction,
NoTableExtraction
)
```
## Performance Implications
### No Performance Impact
For existing code, performance remains identical:
- Same extraction logic
- Same scoring algorithm
- Same processing time
### Performance Improvements Available
New options for better performance:
```python
# Skip tables entirely (faster)
config = CrawlerRunConfig(
table_extraction=NoTableExtraction()
)
# Process only specific areas (faster)
config = CrawlerRunConfig(
css_selector="main.content",
table_extraction=DefaultTableExtraction(
min_rows=5, # Skip small tables
min_cols=3
)
)
```
## Testing Your Migration
### Verification Script
Run this to verify your extraction still works:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async def verify_extraction():
url = "your_url_here"
async with AsyncWebCrawler() as crawler:
# Test 1: Old approach
config_old = CrawlerRunConfig(
table_score_threshold=7
)
result_old = await crawler.arun(url, config_old)
# Test 2: New explicit approach
from crawl4ai import DefaultTableExtraction
config_new = CrawlerRunConfig(
table_extraction=DefaultTableExtraction(
table_score_threshold=7
)
)
result_new = await crawler.arun(url, config_new)
# Compare results
assert len(result_old.tables) == len(result_new.tables)
print(f"✓ Both approaches extracted {len(result_old.tables)} tables")
# Verify structure
for old, new in zip(result_old.tables, result_new.tables):
assert old['headers'] == new['headers']
assert old['rows'] == new['rows']
print("✓ Table content identical")
asyncio.run(verify_extraction())
```
## Deprecation Notes
### No Deprecations
- All existing parameters continue to work
- `table_score_threshold` in `CrawlerRunConfig` is still supported
- No breaking changes
### Internal Changes (Transparent to Users)
- `LXMLWebScrapingStrategy.is_data_table()` - Moved to `DefaultTableExtraction`
- `LXMLWebScrapingStrategy.extract_table_data()` - Moved to `DefaultTableExtraction`
These methods were internal and not part of the public API.
## Benefits of Upgrading
While not required, using the new pattern provides:
1. **Better Control**: Filter tables during extraction, not after
2. **Performance Options**: Skip extraction when not needed
3. **Extensibility**: Create custom extractors for specific needs
4. **Consistency**: Same pattern as other Crawl4AI strategies
5. **Future-Proof**: Ready for upcoming advanced strategies
## Troubleshooting
### Issue: Different Number of Tables
**Cause**: Threshold or filtering differences
**Solution**:
```python
# Ensure same threshold
strategy = DefaultTableExtraction(
table_score_threshold=7, # Match your old setting
min_rows=0, # No filtering (default)
min_cols=0 # No filtering (default)
)
```
### Issue: Import Errors
**Cause**: Using new classes without importing
**Solution**:
```python
# Add imports if using new features
from crawl4ai import (
DefaultTableExtraction,
NoTableExtraction,
TableExtractionStrategy
)
```
### Issue: Custom Strategy Not Working
**Cause**: Incorrect method signature
**Solution**:
```python
class CustomExtractor(TableExtractionStrategy):
def extract_tables(self, element, **kwargs): # Correct signature
# Not: extract_tables(self, html)
# Not: extract(self, element)
return tables_list
```
## Getting Help
If you encounter issues:
1. Check your `table_score_threshold` matches previous settings
2. Verify imports if using new classes
3. Enable verbose logging: `DefaultTableExtraction(verbose=True)`
4. Review the [Table Extraction Documentation](../core/table_extraction.md)
5. Check [examples](../examples/table_extraction_example.py)
## Summary
-**Full backward compatibility** - No code changes required
-**Same results** - Identical extraction behavior by default
-**New options** - Additional control when needed
-**Better architecture** - Consistent with Crawl4AI patterns
-**Ready for future** - Foundation for advanced strategies
The migration to v0.7.3 is seamless with no required changes while providing new capabilities for those who need them.

View File

@@ -35,7 +35,6 @@ nav:
- "Page Interaction": "core/page-interaction.md"
- "Content Selection": "core/content-selection.md"
- "Cache Modes": "core/cache-modes.md"
- "Telemetry": "core/telemetry.md"
- "Local Files & Raw HTML": "core/local-files.md"
- "Link & Media": "core/link-media.md"
- Advanced:

View File

@@ -64,7 +64,6 @@ torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers", "sentence-transformers"]
cosine = ["torch", "transformers", "nltk", "sentence-transformers"]
sync = ["selenium"]
telemetry = ["sentry-sdk>=2.0.0", "ipywidgets>=8.0.0"]
all = [
"PyPDF2",
"torch",
@@ -73,9 +72,7 @@ all = [
"transformers",
"tokenizers",
"sentence-transformers",
"selenium",
"sentry-sdk>=2.0.0",
"ipywidgets>=8.0.0"
"selenium"
]
[project.scripts]

View File

@@ -1,16 +0,0 @@
[pytest]
testpaths = tests
python_paths = .
addopts = --maxfail=1 --disable-warnings -q --tb=short -v
asyncio_mode = auto
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
privacy: marks tests related to privacy compliance
performance: marks tests related to performance
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
env =
CRAWL4AI_TEST_MODE=1

View File

@@ -1,151 +0,0 @@
"""
Shared pytest fixtures for Crawl4AI tests.
"""
import pytest
import tempfile
import os
from pathlib import Path
from unittest.mock import Mock, patch
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment
@pytest.fixture
def temp_config_dir():
"""Provide a temporary directory for telemetry config testing."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def mock_telemetry_config(temp_config_dir):
"""Provide a mocked telemetry config for testing."""
config = TelemetryConfig(config_dir=temp_config_dir)
yield config
@pytest.fixture
def clean_environment():
"""Clean environment variables before and after test."""
# Store original environment
original_env = os.environ.copy()
# Clean telemetry-related env vars
telemetry_vars = [
'CRAWL4AI_TELEMETRY',
'CRAWL4AI_DOCKER',
'CRAWL4AI_API_SERVER',
'CRAWL4AI_TEST_MODE'
]
for var in telemetry_vars:
if var in os.environ:
del os.environ[var]
# Set test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
yield
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
@pytest.fixture
def mock_sentry_provider():
"""Provide a mocked Sentry provider for testing."""
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as mock:
provider_instance = Mock()
provider_instance.initialize.return_value = True
provider_instance.send_exception.return_value = True
provider_instance.is_initialized = True
mock.return_value = provider_instance
yield provider_instance
@pytest.fixture
def enabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry enabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.ALWAYS
config.is_enabled.return_value = True
config.should_send_current.return_value = True
config.get_email.return_value = "test@example.com"
config.update_from_env.return_value = None
yield config
@pytest.fixture
def disabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry disabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.DENIED
config.is_enabled.return_value = False
config.should_send_current.return_value = False
config.update_from_env.return_value = None
yield config
@pytest.fixture
def docker_environment():
"""Mock Docker environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
yield
@pytest.fixture
def cli_environment():
"""Mock CLI environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.CLI):
with patch('sys.stdin.isatty', return_value=True):
yield
@pytest.fixture
def jupyter_environment():
"""Mock Jupyter environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.JUPYTER):
yield
@pytest.fixture(autouse=True)
def reset_telemetry_singleton():
"""Reset telemetry singleton between tests."""
from crawl4ai.telemetry import TelemetryManager
# Reset the singleton instance
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
yield
# Clean up after test
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
@pytest.fixture
def sample_exception():
"""Provide a sample exception for testing."""
try:
raise ValueError("Test exception for telemetry")
except ValueError as e:
return e
@pytest.fixture
def privacy_test_data():
"""Provide test data that should NOT be captured by telemetry."""
return {
'url': 'https://example.com/private-page',
'content': 'This is private content that should not be sent',
'user_data': {
'email': 'user@private.com',
'password': 'secret123',
'api_key': 'sk-1234567890abcdef'
},
'pii': {
'ssn': '123-45-6789',
'phone': '+1-555-123-4567',
'address': '123 Main St, Anytown, USA'
}
}

View File

@@ -1,64 +0,0 @@
"""
Test configuration and utilities for telemetry testing.
"""
import os
import pytest
def pytest_configure(config): # noqa: ARG001
"""Configure pytest for telemetry tests."""
# Add custom markers
config.addinivalue_line("markers", "unit: Unit tests")
config.addinivalue_line("markers", "integration: Integration tests")
config.addinivalue_line("markers", "privacy: Privacy compliance tests")
config.addinivalue_line("markers", "performance: Performance tests")
config.addinivalue_line("markers", "slow: Slow running tests")
def pytest_collection_modifyitems(config, items): # noqa: ARG001
"""Modify test collection to add markers automatically."""
for item in items:
# Add markers based on test location and name
if "telemetry" in str(item.fspath):
if "integration" in item.name or "test_integration" in str(item.fspath):
item.add_marker(pytest.mark.integration)
elif "privacy" in item.name or "performance" in item.name:
if "privacy" in item.name:
item.add_marker(pytest.mark.privacy)
if "performance" in item.name:
item.add_marker(pytest.mark.performance)
else:
item.add_marker(pytest.mark.unit)
# Mark slow tests
if "slow" in item.name or any(mark.name == "slow" for mark in item.iter_markers()):
item.add_marker(pytest.mark.slow)
@pytest.fixture(autouse=True)
def setup_test_environment():
"""Set up test environment variables."""
# Ensure we're in test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
# Disable actual telemetry during tests unless explicitly enabled
if 'CRAWL4AI_TELEMETRY_TEST_REAL' not in os.environ:
os.environ['CRAWL4AI_TELEMETRY'] = '0'
yield
# Clean up after tests
test_vars = ['CRAWL4AI_TEST_MODE', 'CRAWL4AI_TELEMETRY_TEST_REAL']
for var in test_vars:
if var in os.environ:
del os.environ[var]
def pytest_report_header(config): # noqa: ARG001
"""Add information to pytest header."""
return [
"Crawl4AI Telemetry Tests",
f"Test mode: {'ENABLED' if os.environ.get('CRAWL4AI_TEST_MODE') else 'DISABLED'}",
f"Real telemetry: {'ENABLED' if os.environ.get('CRAWL4AI_TELEMETRY_TEST_REAL') else 'DISABLED'}"
]

View File

@@ -1,216 +0,0 @@
"""
Integration tests for telemetry CLI commands.
"""
import pytest
import subprocess
import sys
import os
from unittest.mock import patch, Mock
@pytest.mark.integration
class TestTelemetryCLI:
"""Test telemetry CLI commands integration."""
def test_telemetry_status_command(self, clean_environment, temp_config_dir):
"""Test the telemetry status CLI command."""
# Import with mocked config
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = 'not_set'
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test status command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'status']):
try:
main()
except SystemExit:
pass # CLI commands often call sys.exit()
def test_telemetry_enable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry enable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test enable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'enable', '--email', 'test@example.com']):
try:
main()
except SystemExit:
pass
def test_telemetry_disable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry disable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test disable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'disable']):
try:
main()
except SystemExit:
pass
@pytest.mark.slow
def test_cli_subprocess_integration(self, temp_config_dir):
"""Test CLI commands as subprocess calls."""
env = os.environ.copy()
env['CRAWL4AI_CONFIG_DIR'] = str(temp_config_dir)
# Test status command via subprocess
try:
result = subprocess.run(
[sys.executable, '-m', 'crawl4ai.cli', 'telemetry', 'status'],
env=env,
capture_output=True,
text=True,
timeout=10
)
# Should not crash, regardless of exit code
assert result.returncode in [0, 1] # May return 1 if not configured
except subprocess.TimeoutExpired:
pytest.skip("CLI command timed out")
except FileNotFoundError:
pytest.skip("CLI module not found")
@pytest.mark.integration
class TestAsyncWebCrawlerIntegration:
"""Test AsyncWebCrawler telemetry integration."""
@pytest.mark.asyncio
async def test_crawler_telemetry_decorator(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that AsyncWebCrawler methods are decorated with telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Check if the arun method has telemetry decoration
crawler = AsyncWebCrawler()
assert hasattr(crawler.arun, '__wrapped__') or callable(crawler.arun)
@pytest.mark.asyncio
async def test_crawler_exception_capture_integration(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that exceptions in AsyncWebCrawler are captured."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
with patch('crawl4ai.telemetry.capture_exception') as _mock_capture:
from crawl4ai import AsyncWebCrawler
async with AsyncWebCrawler() as crawler:
try:
# This should cause an exception
await crawler.arun(url="invalid://url")
except Exception:
pass # We expect this to fail
# The decorator should have attempted to capture the exception
# Note: This might not always be called depending on where the exception occurs
@pytest.mark.asyncio
async def test_crawler_with_disabled_telemetry(self, disabled_telemetry_config):
"""Test that AsyncWebCrawler works normally with disabled telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Should work normally even with telemetry disabled
async with AsyncWebCrawler() as crawler:
assert crawler is not None
@pytest.mark.integration
class TestDockerIntegration:
"""Test Docker environment telemetry integration."""
def test_docker_environment_detection(self, docker_environment, temp_config_dir):
"""Test that Docker environment is detected correctly."""
from crawl4ai.telemetry.environment import EnvironmentDetector
env = EnvironmentDetector.detect()
from crawl4ai.telemetry.environment import Environment
assert env == Environment.DOCKER
def test_docker_default_telemetry_enabled(self, temp_config_dir):
"""Test that telemetry is enabled by default in Docker."""
from crawl4ai.telemetry.environment import Environment
# Clear any existing environment variables that might interfere
with patch.dict(os.environ, {}, clear=True):
# Set only the Docker environment variable
os.environ['CRAWL4AI_DOCKER'] = 'true'
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to ALWAYS for Docker
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.ALWAYS
def test_docker_telemetry_can_be_disabled(self, temp_config_dir):
"""Test that Docker telemetry can be disabled via environment variable."""
from crawl4ai.telemetry.environment import Environment
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0', 'CRAWL4AI_DOCKER': 'true'}):
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to DENIED when env var is 0
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.DENIED
@pytest.mark.integration
class TestTelemetryProviderIntegration:
"""Test telemetry provider integration."""
def test_sentry_provider_initialization(self, enabled_telemetry_config):
"""Test that Sentry provider initializes correctly."""
try:
from crawl4ai.telemetry.providers.sentry import SentryProvider
provider = SentryProvider()
# Should not crash during initialization
assert provider is not None
except ImportError:
pytest.skip("Sentry provider not available")
def test_null_provider_fallback(self, disabled_telemetry_config):
"""Test that NullProvider is used when telemetry is disabled."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
def test_graceful_degradation_without_sentry(self, enabled_telemetry_config):
"""Test graceful degradation when sentry-sdk is not available."""
with patch.dict('sys.modules', {'sentry_sdk': None}):
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
# Should fall back to NullProvider when Sentry is not available
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,283 +0,0 @@
"""
Privacy and performance tests for telemetry system.
"""
import pytest
import time
import asyncio
from unittest.mock import patch
from crawl4ai.telemetry import telemetry_decorator, async_telemetry_decorator, TelemetryManager
@pytest.mark.privacy
class TestTelemetryPrivacy:
"""Test privacy compliance of telemetry system."""
def test_no_url_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that URLs are not captured in telemetry data."""
# Ensure config is properly set for sending
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
# Mock the provider directly in the manager
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
# Create exception with URL in context
exception = ValueError("Test error")
context = {'url': privacy_test_data['url']}
manager.capture_exception(exception, context)
# Verify that the provider was called
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that context was passed to the provider (filtering happens in provider)
assert len(call_args) >= 2
def test_no_content_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that crawled content is not captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'content': privacy_test_data['content'],
'html': '<html><body>Private content</body></html>',
'text': 'Extracted private text'
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_no_pii_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that PII is not captured in telemetry."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = privacy_test_data['user_data'].copy()
context.update(privacy_test_data['pii'])
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_sanitized_context_captured(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that only safe context is captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'operation': 'crawl', # Safe to capture
'status_code': 404, # Safe to capture
'retry_count': 3, # Safe to capture
'user_email': 'secret@example.com', # Should be in context (not filtered at this level)
'content': 'private content' # Should be in context (not filtered at this level)
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Get the actual arguments passed to the mock
args, kwargs = call_args
assert len(args) >= 2, f"Expected at least 2 args, got {len(args)}"
# The second argument should be the context
captured_context = args[1]
# The basic context should be present (this tests the manager, not the provider filtering)
assert 'operation' in captured_context, f"operation not found in {captured_context}"
assert captured_context.get('operation') == 'crawl'
assert captured_context.get('status_code') == 404
assert captured_context.get('retry_count') == 3
@pytest.mark.performance
class TestTelemetryPerformance:
"""Test performance impact of telemetry system."""
def test_decorator_overhead_sync(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of sync telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with telemetry decorator."""
time.sleep(0.001) # Simulate small amount of work
return "success"
# Measure time with telemetry
start_time = time.time()
for _ in range(100):
test_function()
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead
assert telemetry_time < 1.0 # Should complete 100 calls in under 1 second
@pytest.mark.asyncio
async def test_decorator_overhead_async(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of async telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@async_telemetry_decorator
async def test_async_function():
"""Test async function with telemetry decorator."""
await asyncio.sleep(0.001) # Simulate small amount of async work
return "success"
# Measure time with telemetry
start_time = time.time()
tasks = [test_async_function() for _ in range(100)]
await asyncio.gather(*tasks)
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead to async operations
assert telemetry_time < 2.0 # Should complete 100 async calls in under 2 seconds
def test_disabled_telemetry_performance(self, disabled_telemetry_config):
"""Test that disabled telemetry has zero overhead."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with disabled telemetry."""
time.sleep(0.001)
return "success"
# Measure time with disabled telemetry
start_time = time.time()
for _ in range(100):
test_function()
disabled_time = time.time() - start_time
# Should be very fast when disabled
assert disabled_time < 0.5 # Should be faster than enabled telemetry
def test_telemetry_manager_initialization_performance(self):
"""Test that TelemetryManager initializes quickly."""
start_time = time.time()
# Initialize multiple managers (should use singleton)
for _ in range(10):
TelemetryManager.get_instance()
init_time = time.time() - start_time
# Initialization should be fast
assert init_time < 0.1 # Should initialize in under 100ms
def test_config_loading_performance(self, temp_config_dir):
"""Test that config loading is fast."""
from crawl4ai.telemetry.config import TelemetryConfig
# Create config with some data
config = TelemetryConfig(config_dir=temp_config_dir)
from crawl4ai.telemetry.config import TelemetryConsent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
start_time = time.time()
# Load config multiple times
for _ in range(100):
new_config = TelemetryConfig(config_dir=temp_config_dir)
new_config.get_consent()
load_time = time.time() - start_time
# Config loading should be fast
assert load_time < 0.5 # Should load 100 times in under 500ms
@pytest.mark.performance
class TestTelemetryScalability:
"""Test telemetry system scalability."""
def test_multiple_exception_capture(self, enabled_telemetry_config, mock_sentry_provider):
"""Test capturing multiple exceptions in sequence."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
start_time = time.time()
# Capture many exceptions
for i in range(50):
exception = ValueError(f"Test error {i}")
manager.capture_exception(exception, {'operation': f'test_{i}'})
capture_time = time.time() - start_time
# Should handle multiple exceptions efficiently
assert capture_time < 1.0 # Should capture 50 exceptions in under 1 second
assert mock_sentry_provider.send_exception.call_count <= 50 # May be less due to consent checks
@pytest.mark.asyncio
async def test_concurrent_exception_capture(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test concurrent exception capture performance."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
async def capture_exception_async(i):
exception = ValueError(f"Concurrent error {i}")
return manager.capture_exception(exception, {'operation': f'concurrent_{i}'})
start_time = time.time()
# Capture exceptions concurrently
tasks = [capture_exception_async(i) for i in range(20)]
await asyncio.gather(*tasks)
capture_time = time.time() - start_time
# Should handle concurrent exceptions efficiently
assert capture_time < 1.0 # Should capture 20 concurrent exceptions in under 1 second
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,241 +0,0 @@
"""
Tests for Crawl4AI telemetry functionality.
"""
import pytest
import os
import tempfile
from pathlib import Path
import json
from unittest.mock import Mock, patch, MagicMock
from crawl4ai.telemetry import (
TelemetryManager,
capture_exception,
enable,
disable,
status
)
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment, EnvironmentDetector
from crawl4ai.telemetry.base import NullProvider
from crawl4ai.telemetry.consent import ConsentManager
class TestTelemetryConfig:
"""Test telemetry configuration management."""
def test_config_initialization(self):
"""Test config initialization with custom directory."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
assert config.config_dir == Path(tmpdir)
assert config.get_consent() == TelemetryConsent.NOT_SET
def test_consent_persistence(self):
"""Test that consent is saved and loaded correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
# Set consent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
# Create new config instance to test persistence
config2 = TelemetryConfig(config_dir=Path(tmpdir))
assert config2.get_consent() == TelemetryConsent.ALWAYS
assert config2.get_email() == "test@example.com"
def test_environment_variable_override(self):
"""Test that environment variables override config."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
config.set_consent(TelemetryConsent.ALWAYS)
# Set environment variable to disable
os.environ['CRAWL4AI_TELEMETRY'] = '0'
try:
config.update_from_env()
assert config.get_consent() == TelemetryConsent.DENIED
finally:
del os.environ['CRAWL4AI_TELEMETRY']
class TestEnvironmentDetection:
"""Test environment detection functionality."""
def test_cli_detection(self):
"""Test CLI environment detection."""
# Mock sys.stdin.isatty
with patch('sys.stdin.isatty', return_value=True):
env = EnvironmentDetector.detect()
# Should detect as CLI in most test environments
assert env in [Environment.CLI, Environment.UNKNOWN]
def test_docker_detection(self):
"""Test Docker environment detection."""
# Mock Docker environment
with patch.dict(os.environ, {'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.DOCKER
def test_api_server_detection(self):
"""Test API server detection."""
with patch.dict(os.environ, {'CRAWL4AI_API_SERVER': 'true', 'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.API_SERVER
class TestTelemetryManager:
"""Test the main telemetry manager."""
def test_singleton_pattern(self):
"""Test that TelemetryManager is a singleton."""
manager1 = TelemetryManager.get_instance()
manager2 = TelemetryManager.get_instance()
assert manager1 is manager2
def test_exception_capture(self):
"""Test exception capture functionality."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create manager with custom config dir
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.ALWAYS
mock_config.is_enabled.return_value = True
mock_config.should_send_current.return_value = True
mock_config.get_email.return_value = "test@example.com"
mock_config.update_from_env.return_value = None
MockConfig.return_value = mock_config
# Mock the provider setup
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as MockSentryProvider:
mock_provider = Mock()
mock_provider.initialize.return_value = True
mock_provider.send_exception.return_value = True
MockSentryProvider.return_value = mock_provider
manager = TelemetryManager()
# Test exception capture
test_exception = ValueError("Test error")
result = manager.capture_exception(test_exception, {'test': 'context'})
# Verify the exception was processed
assert mock_config.should_send_current.called
def test_null_provider_when_disabled(self):
"""Test that NullProvider is used when telemetry is disabled."""
with tempfile.TemporaryDirectory() as tmpdir:
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.DENIED
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider)
class TestConsentManager:
"""Test consent management functionality."""
def test_docker_default_enabled(self):
"""Test that Docker environment has telemetry enabled by default."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch('os.environ.get') as mock_env_get:
# Mock os.environ.get to return None for CRAWL4AI_TELEMETRY
mock_env_get.return_value = None
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent_manager.check_and_prompt()
# Should be enabled by default in Docker
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.ALWAYS
def test_docker_disabled_by_env(self):
"""Test that Docker telemetry can be disabled via environment variable."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0'}):
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent = consent_manager.check_and_prompt()
# Should be disabled
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.DENIED
class TestPublicAPI:
"""Test the public API functions."""
@patch('crawl4ai.telemetry.get_telemetry')
def test_enable_function(self, mock_get_telemetry):
"""Test the enable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
enable(email="test@example.com", always=True)
mock_manager.enable.assert_called_once_with(
email="test@example.com",
always=True,
once=False
)
@patch('crawl4ai.telemetry.get_telemetry')
def test_disable_function(self, mock_get_telemetry):
"""Test the disable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
disable()
mock_manager.disable.assert_called_once()
@patch('crawl4ai.telemetry.get_telemetry')
def test_status_function(self, mock_get_telemetry):
"""Test the status() function."""
mock_manager = Mock()
mock_manager.status.return_value = {
'enabled': True,
'consent': 'always',
'email': 'test@example.com'
}
mock_get_telemetry.return_value = mock_manager
result = status()
assert result['enabled'] is True
assert result['consent'] == 'always'
assert result['email'] == 'test@example.com'
class TestIntegration:
"""Integration tests for telemetry with AsyncWebCrawler."""
@pytest.mark.asyncio
async def test_crawler_exception_capture(self):
"""Test that AsyncWebCrawler captures exceptions."""
from crawl4ai import AsyncWebCrawler
with patch('crawl4ai.telemetry.capture_exception') as mock_capture:
# This should trigger an exception for invalid URL
async with AsyncWebCrawler() as crawler:
try:
# Use an invalid URL that will cause an error
result = await crawler.arun(url="not-a-valid-url")
except Exception:
pass
# Check if exception was captured (may not be called if error is handled)
# This is more of a smoke test to ensure the integration doesn't break
if __name__ == "__main__":
pytest.main([__file__, "-v"])

File diff suppressed because it is too large Load Diff

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Test LLMTableExtraction with controlled HTML
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
LLMConfig,
LLMTableExtraction,
DefaultTableExtraction,
CacheMode
)
async def test_controlled_html():
"""Test with controlled HTML content."""
print("\n" + "=" * 60)
print("LLM TABLE EXTRACTION TEST")
print("=" * 60)
url = "https://en.wikipedia.org/wiki/List_of_chemical_elements"
# url = "https://en.wikipedia.org/wiki/List_of_prime_ministers_of_India"
# Configure LLM
llm_config = LLMConfig(
# provider="openai/gpt-4.1-mini",
# api_token=os.getenv("OPENAI_API_KEY"),
provider="groq/llama-3.3-70b-versatile",
api_token="GROQ_API_TOKEN",
temperature=0.1,
max_tokens=32000
)
print("\n1. Testing LLMTableExtraction:")
# Create LLM extraction strategy
llm_strategy = LLMTableExtraction(
llm_config=llm_config,
verbose=True,
# css_selector="div.w3-example"
css_selector="div.mw-content-ltr",
# css_selector="table.wikitable",
max_tries=2,
enable_chunking=True,
chunk_token_threshold=5000, # Lower threshold to force chunking
min_rows_per_chunk=10,
max_parallel_chunks=3
)
config_llm = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
table_extraction=llm_strategy
)
async with AsyncWebCrawler() as crawler:
# Test with LLM extraction
result_llm = await crawler.arun(
# url=f"raw:{test_html}",
url=url,
config=config_llm
)
if result_llm.success:
print(f"\n ✓ LLM Extraction: Found {len(result_llm.tables)} table(s)")
for i, table in enumerate(result_llm.tables, 1):
print(f"\n Table {i}:")
print(f" - Caption: {table.get('caption', 'No caption')}")
print(f" - Headers: {table['headers']}")
print(f" - Rows: {len(table['rows'])}")
# Show how colspan/rowspan were handled
print(f" - Sample rows:")
for j, row in enumerate(table['rows'][:2], 1):
print(f" Row {j}: {row}")
metadata = table.get('metadata', {})
print(f" - Metadata:")
print(f" • Has merged cells: {metadata.get('has_merged_cells', False)}")
print(f" • Table type: {metadata.get('table_type', 'unknown')}")
# # Compare with default extraction
# print("\n2. Comparing with DefaultTableExtraction:")
# default_strategy = DefaultTableExtraction(
# table_score_threshold=3,
# verbose=False
# )
# config_default = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
# table_extraction=default_strategy
# )
# result_default = await crawler.arun(
# # url=f"raw:{test_html}",
# url=url,
# config=config_default
# )
# if result_default.success:
# print(f" ✓ Default Extraction: Found {len(result_default.tables)} table(s)")
# # Compare handling of complex structures
# print("\n3. Comparison Summary:")
# print(f" LLM found: {len(result_llm.tables)} tables")
# print(f" Default found: {len(result_default.tables)} tables")
# if result_llm.tables and result_default.tables:
# llm_first = result_llm.tables[0]
# default_first = result_default.tables[0]
# print(f"\n First table comparison:")
# print(f" LLM headers: {len(llm_first['headers'])} columns")
# print(f" Default headers: {len(default_first['headers'])} columns")
# # Check if LLM better handled the complex structure
# if llm_first.get('metadata', {}).get('has_merged_cells'):
# print(" ✓ LLM correctly identified merged cells")
# # Test pandas compatibility
# try:
# import pandas as pd
# print("\n4. Testing Pandas compatibility:")
# # Create DataFrame from LLM extraction
# df_llm = pd.DataFrame(
# llm_first['rows'],
# columns=llm_first['headers']
# )
# print(f" ✓ LLM table -> DataFrame: Shape {df_llm.shape}")
# # Create DataFrame from default extraction
# df_default = pd.DataFrame(
# default_first['rows'],
# columns=default_first['headers']
# )
# print(f" ✓ Default table -> DataFrame: Shape {df_default.shape}")
# print("\n LLM DataFrame preview:")
# print(df_llm.head(2).to_string())
# except ImportError:
# print("\n4. Pandas not installed, skipping DataFrame test")
print("\n✅ Test completed successfully!")
async def main():
"""Run the test."""
# Check for API key
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ OPENAI_API_KEY not set. Please set it to test LLM extraction.")
print(" You can set it with: export OPENAI_API_KEY='your-key-here'")
return
await test_controlled_html()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,7 +4,7 @@
import psutil
import platform
import time
from crawl4ai.utils import get_true_memory_usage_percent, get_memory_stats, get_true_available_memory_gb
from crawl4ai.memory_utils import get_true_memory_usage_percent, get_memory_stats, get_true_available_memory_gb
def test_memory_calculation():