Compare commits

..

2 Commits

Author SHA1 Message Date
AHMET YILMAZ
d48d382d18 feat(tests): Implement comprehensive testing framework for telemetry system 2025-09-22 19:06:20 +08:00
ntohidi
7f360577d9 feat(telemetry): Add opt-in telemetry system for error tracking and stability improvement
Implement a privacy-first, provider-agnostic telemetry system to help improve Crawl4AI stability
through anonymous crash reporting. The system is designed with user privacy as the top priority,
collecting only exception information without any PII, URLs, or crawled content.

Architecture & Design:
- Provider-agnostic architecture with base TelemetryProvider interface
- Sentry as the initial provider implementation with easy extensibility
- Separate handling for sync and async code paths
- Environment-aware behavior (CLI, Docker, Jupyter/Colab)

Key Features:
- Opt-in by default for CLI/library usage with interactive consent prompt
- Opt-out by default for Docker/API server (enabled unless CRAWL4AI_TELEMETRY=0)
- Jupyter/Colab support with widget-based consent (fallback to code snippets)
- Persistent consent storage in ~/.crawl4ai/config.json
- Optional email collection for critical issue follow-up

CLI Integration:
- `crwl telemetry enable [--email <email>] [--once]` - Enable telemetry
- `crwl telemetry disable` - Disable telemetry
- `crwl telemetry status` - Check current status

Python API:
- Decorators: @telemetry_decorator, @async_telemetry_decorator
- Context managers: telemetry_context(), async_telemetry_context()
- Manual capture: capture_exception(exc, context)
- Control: telemetry.enable(), telemetry.disable(), telemetry.status()

Privacy Safeguards:
- No URL collection
- No request/response data
- No authentication tokens or cookies
- No crawled content
- Automatic sanitization of sensitive fields
- Local consent storage only

Testing:
- Comprehensive test suite with 15 test cases
- Coverage for all environments and consent flows
- Mock providers for testing without external dependencies

Documentation:
- Detailed documentation in docs/md_v2/core/telemetry.md
- Added to mkdocs navigation under Core section
- Privacy commitment and FAQ included
- Examples for all usage patterns

Installation:
- Optional dependency: pip install crawl4ai[telemetry]
- Graceful degradation if sentry-sdk not installed
- Added to pyproject.toml optional dependencies
- Docker requirements updated

Integration Points:
- AsyncWebCrawler: Automatic exception capture in arun() and aprocess_html()
- Docker server: Automatic initialization with environment control
- Global exception handler for uncaught exceptions (CLI only)

This implementation provides valuable error insights to improve Crawl4AI while maintaining
complete transparency and user control over data collection.
2025-08-20 16:49:44 +08:00
78 changed files with 3495 additions and 5807 deletions

2
.gitignore vendored
View File

@@ -265,7 +265,7 @@ CLAUDE.md
tests/**/test_site
tests/**/reports
tests/**/benchmark_reports
test_scripts/
docs/**/data
.codecat/

View File

@@ -5,16 +5,6 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **🔒 HTTPS Preservation for Internal Links**: New `preserve_https_for_internal_links` configuration flag
- Maintains HTTPS scheme for internal links even when servers redirect to HTTP
- Prevents security downgrades during deep crawling
- Useful for security-conscious crawling and sites supporting both protocols
- Fully backward compatible with opt-in flag (default: `False`)
- Fixes issue #1410 where HTTPS URLs were being downgraded to HTTP
## [0.7.3] - 2025-08-09
### Added

136
Makefile.telemetry Normal file
View File

@@ -0,0 +1,136 @@
# Makefile for Crawl4AI Telemetry Testing
# Usage: make test-telemetry, make test-unit, make test-integration, etc.
.PHONY: help test-all test-telemetry test-unit test-integration test-privacy test-performance test-slow test-coverage test-verbose clean
# Default Python executable
PYTHON := .venv/bin/python
PYTEST := $(PYTHON) -m pytest
help:
@echo "Crawl4AI Telemetry Testing Commands:"
@echo ""
@echo " test-all Run all telemetry tests"
@echo " test-telemetry Run all telemetry tests (same as test-all)"
@echo " test-unit Run unit tests only"
@echo " test-integration Run integration tests only"
@echo " test-privacy Run privacy compliance tests only"
@echo " test-performance Run performance tests only"
@echo " test-slow Run slow tests only"
@echo " test-coverage Run tests with coverage report"
@echo " test-verbose Run tests with verbose output"
@echo " test-specific TEST= Run specific test (e.g., make test-specific TEST=test_telemetry.py::TestTelemetryConfig)"
@echo " clean Clean test artifacts"
@echo ""
@echo "Environment Variables:"
@echo " CRAWL4AI_TELEMETRY_TEST_REAL=1 Enable real telemetry during tests"
@echo " PYTEST_ARGS Additional pytest arguments"
# Run all telemetry tests
test-all test-telemetry:
$(PYTEST) tests/telemetry/ -v
# Run unit tests only
test-unit:
$(PYTEST) tests/telemetry/ -m "unit" -v
# Run integration tests only
test-integration:
$(PYTEST) tests/telemetry/ -m "integration" -v
# Run privacy compliance tests only
test-privacy:
$(PYTEST) tests/telemetry/ -m "privacy" -v
# Run performance tests only
test-performance:
$(PYTEST) tests/telemetry/ -m "performance" -v
# Run slow tests only
test-slow:
$(PYTEST) tests/telemetry/ -m "slow" -v
# Run tests with coverage
test-coverage:
$(PYTEST) tests/telemetry/ --cov=crawl4ai.telemetry --cov-report=html --cov-report=term-missing -v
# Run tests with verbose output
test-verbose:
$(PYTEST) tests/telemetry/ -vvv --tb=long
# Run specific test
test-specific:
$(PYTEST) tests/telemetry/$(TEST) -v
# Run tests excluding slow ones
test-fast:
$(PYTEST) tests/telemetry/ -m "not slow" -v
# Run tests in parallel
test-parallel:
$(PYTEST) tests/telemetry/ -n auto -v
# Clean test artifacts
clean:
rm -rf .pytest_cache/
rm -rf htmlcov/
rm -rf .coverage
find tests/ -name "*.pyc" -delete
find tests/ -name "__pycache__" -type d -exec rm -rf {} +
rm -rf tests/telemetry/__pycache__/
# Lint test files
lint-tests:
$(PYTHON) -m flake8 tests/telemetry/
$(PYTHON) -m pylint tests/telemetry/
# Type check test files
typecheck-tests:
$(PYTHON) -m mypy tests/telemetry/
# Run all quality checks
check-tests: lint-tests typecheck-tests test-unit
# Install test dependencies
install-test-deps:
$(PYTHON) -m pip install pytest pytest-asyncio pytest-mock pytest-cov pytest-xdist
# Setup development environment for testing
setup-dev:
$(PYTHON) -m pip install -e .
$(MAKE) install-test-deps
# Generate test report
test-report:
$(PYTEST) tests/telemetry/ --html=test-report.html --self-contained-html -v
# Run performance benchmarks
benchmark:
$(PYTEST) tests/telemetry/test_privacy_performance.py::TestTelemetryPerformance -v --benchmark-only
# Test different environments
test-docker-env:
CRAWL4AI_DOCKER=true $(PYTEST) tests/telemetry/ -k "docker" -v
test-cli-env:
$(PYTEST) tests/telemetry/ -k "cli" -v
# Validate telemetry implementation
validate:
@echo "Running telemetry validation suite..."
$(MAKE) test-unit
$(MAKE) test-privacy
$(MAKE) test-performance
@echo "Validation complete!"
# Debug failing tests
debug:
$(PYTEST) tests/telemetry/ --pdb -x -v
# Show test markers
show-markers:
$(PYTEST) --markers
# Show test collection (dry run)
show-tests:
$(PYTEST) tests/telemetry/ --collect-only -q

View File

@@ -373,7 +373,7 @@ async def main():
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://docs.micronaut.io/4.9.9/guide/",
url="https://docs.micronaut.io/4.7.6/guide/",
config=run_config
)
print(len(result.markdown.raw_markdown))
@@ -425,7 +425,7 @@ async def main():
"type": "attribute",
"attribute": "src"
}
]
}
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)

View File

@@ -0,0 +1,190 @@
# Crawl4AI Telemetry Testing Implementation
## Overview
This document summarizes the comprehensive testing strategy implementation for Crawl4AI's opt-in telemetry system. The implementation provides thorough test coverage across unit tests, integration tests, privacy compliance tests, and performance tests.
## Implementation Summary
### 📊 Test Statistics
- **Total Tests**: 40 tests
- **Success Rate**: 100% (40/40 passing)
- **Test Categories**: 4 categories (Unit, Integration, Privacy, Performance)
- **Code Coverage**: 51% (625 statements, 308 missing)
### 🗂️ Test Structure
#### 1. **Unit Tests** (`tests/telemetry/test_telemetry.py`)
- `TestTelemetryConfig`: Configuration management and persistence
- `TestEnvironmentDetection`: CLI, Docker, API server environment detection
- `TestTelemetryManager`: Singleton pattern and exception capture
- `TestConsentManager`: Docker default behavior and environment overrides
- `TestPublicAPI`: Public enable/disable/status functions
- `TestIntegration`: Crawler exception capture integration
#### 2. **Integration Tests** (`tests/telemetry/test_integration.py`)
- `TestTelemetryCLI`: CLI command testing (status, enable, disable)
- `TestAsyncWebCrawlerIntegration`: Real crawler integration with decorators
- `TestDockerIntegration`: Docker environment-specific behavior
- `TestTelemetryProviderIntegration`: Sentry provider initialization and fallbacks
#### 3. **Privacy & Performance Tests** (`tests/telemetry/test_privacy_performance.py`)
- `TestTelemetryPrivacy`: Data sanitization and PII protection
- `TestTelemetryPerformance`: Decorator overhead measurement
- `TestTelemetryScalability`: Multiple and concurrent exception handling
#### 4. **Hello World Test** (`tests/telemetry/test_hello_world_telemetry.py`)
- Basic telemetry functionality validation
### 🔧 Testing Infrastructure
#### **Pytest Configuration** (`pytest.ini`)
```ini
[pytest]
testpaths = tests/telemetry
markers =
unit: Unit tests
integration: Integration tests
privacy: Privacy compliance tests
performance: Performance tests
asyncio_mode = auto
```
#### **Test Fixtures** (`tests/conftest.py`)
- `temp_config_dir`: Temporary configuration directory
- `enabled_telemetry_config`: Pre-configured enabled telemetry
- `disabled_telemetry_config`: Pre-configured disabled telemetry
- `mock_sentry_provider`: Mocked Sentry provider for testing
#### **Makefile Targets** (`Makefile.telemetry`)
```makefile
test-all: Run all telemetry tests
test-unit: Run unit tests only
test-integration: Run integration tests only
test-privacy: Run privacy tests only
test-performance: Run performance tests only
test-coverage: Run tests with coverage report
test-watch: Run tests in watch mode
test-parallel: Run tests in parallel
```
## 🎯 Key Features Tested
### Privacy Compliance
- ✅ No URLs captured in telemetry data
- ✅ No content captured in telemetry data
- ✅ No PII (personally identifiable information) captured
- ✅ Sanitized context only (error types, stack traces without content)
### Performance Impact
- ✅ Telemetry decorator overhead < 1ms
- ✅ Async decorator overhead < 1ms
- ✅ Disabled telemetry has minimal performance impact
- ✅ Configuration loading performance acceptable
- ✅ Multiple exception capture scalability
- ✅ Concurrent exception capture handling
### Integration Points
- ✅ CLI command integration (status, enable, disable)
- ✅ AsyncWebCrawler decorator integration
- ✅ Docker environment auto-detection
- ✅ Sentry provider initialization
- ✅ Graceful degradation without Sentry
- ✅ Environment variable overrides
### Core Functionality
- ✅ Configuration persistence and loading
- ✅ Consent management (Docker defaults, user prompts)
- ✅ Environment detection (CLI, Docker, Jupyter, etc.)
- ✅ Singleton pattern for TelemetryManager
- ✅ Exception capture and forwarding
- ✅ Provider abstraction (Sentry, Null)
## 🚀 Usage Examples
### Run All Tests
```bash
make -f Makefile.telemetry test-all
```
### Run Specific Test Categories
```bash
# Unit tests only
make -f Makefile.telemetry test-unit
# Integration tests only
make -f Makefile.telemetry test-integration
# Privacy tests only
make -f Makefile.telemetry test-privacy
# Performance tests only
make -f Makefile.telemetry test-performance
```
### Coverage Report
```bash
make -f Makefile.telemetry test-coverage
```
### Parallel Execution
```bash
make -f Makefile.telemetry test-parallel
```
## 📁 File Structure
```
tests/
├── conftest.py # Shared pytest fixtures
└── telemetry/
├── test_hello_world_telemetry.py # Basic functionality test
├── test_telemetry.py # Unit tests
├── test_integration.py # Integration tests
└── test_privacy_performance.py # Privacy & performance tests
# Configuration
pytest.ini # Pytest configuration with markers
Makefile.telemetry # Convenient test execution targets
```
## 🔍 Test Isolation & Mocking
### Environment Isolation
- Tests run in isolated temporary directories
- Environment variables are properly mocked/isolated
- No interference between test runs
- Clean state for each test
### Mock Strategies
- `unittest.mock` for external dependencies
- Temporary file systems for configuration testing
- Subprocess mocking for CLI command testing
- Time measurement for performance testing
## 📈 Coverage Analysis
Current test coverage: **51%** (625 statements)
### Well-Covered Areas:
- Core configuration management (78%)
- Telemetry initialization (69%)
- Environment detection (64%)
### Areas for Future Enhancement:
- Consent management UI (20% - interactive prompts)
- Sentry provider implementation (25% - network calls)
- Base provider abstractions (49% - error handling paths)
## 🎉 Implementation Success
The comprehensive testing strategy has been **successfully implemented** with:
-**100% test pass rate** (40/40 tests passing)
-**Complete test infrastructure** (fixtures, configuration, targets)
-**Privacy compliance verification** (no PII, URLs, or content captured)
-**Performance validation** (minimal overhead confirmed)
-**Integration testing** (CLI, Docker, AsyncWebCrawler)
-**CI/CD ready** (Makefile targets for automation)
The telemetry system now has robust test coverage ensuring reliability, privacy compliance, and performance characteristics while maintaining comprehensive validation of all core functionality.

View File

@@ -19,7 +19,7 @@ import re
from pathlib import Path
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig, LinkPreviewConfig, LLMConfig
from crawl4ai.async_configs import CrawlerRunConfig, LinkPreviewConfig
from crawl4ai.models import Link, CrawlResult
import numpy as np
@@ -178,7 +178,7 @@ class AdaptiveConfig:
# Embedding strategy parameters
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_llm_config: Optional[Union[LLMConfig, Dict]] = None # Separate config for embeddings
embedding_llm_config: Optional[Dict] = None # Separate config for embeddings
n_query_variations: int = 10
coverage_threshold: float = 0.85
alpha_shape_alpha: float = 0.5
@@ -250,30 +250,6 @@ class AdaptiveConfig:
assert 0 <= self.embedding_quality_max_confidence <= 1, "embedding_quality_max_confidence must be between 0 and 1"
assert self.embedding_quality_scale_factor > 0, "embedding_quality_scale_factor must be positive"
assert 0 <= self.embedding_min_confidence_threshold <= 1, "embedding_min_confidence_threshold must be between 0 and 1"
@property
def _embedding_llm_config_dict(self) -> Optional[Dict]:
"""Convert LLMConfig to dict format for backward compatibility."""
if self.embedding_llm_config is None:
return None
if isinstance(self.embedding_llm_config, dict):
# Already a dict - return as-is for backward compatibility
return self.embedding_llm_config
# Convert LLMConfig object to dict format
return {
'provider': self.embedding_llm_config.provider,
'api_token': self.embedding_llm_config.api_token,
'base_url': getattr(self.embedding_llm_config, 'base_url', None),
'temperature': getattr(self.embedding_llm_config, 'temperature', None),
'max_tokens': getattr(self.embedding_llm_config, 'max_tokens', None),
'top_p': getattr(self.embedding_llm_config, 'top_p', None),
'frequency_penalty': getattr(self.embedding_llm_config, 'frequency_penalty', None),
'presence_penalty': getattr(self.embedding_llm_config, 'presence_penalty', None),
'stop': getattr(self.embedding_llm_config, 'stop', None),
'n': getattr(self.embedding_llm_config, 'n', None),
}
class CrawlStrategy(ABC):
@@ -617,7 +593,7 @@ class StatisticalStrategy(CrawlStrategy):
class EmbeddingStrategy(CrawlStrategy):
"""Embedding-based adaptive crawling using semantic space coverage"""
def __init__(self, embedding_model: str = None, llm_config: Union[LLMConfig, Dict] = None):
def __init__(self, embedding_model: str = None, llm_config: Dict = None):
self.embedding_model = embedding_model or "sentence-transformers/all-MiniLM-L6-v2"
self.llm_config = llm_config
self._embedding_cache = {}
@@ -629,24 +605,14 @@ class EmbeddingStrategy(CrawlStrategy):
self._kb_embeddings_hash = None # Track KB changes
self._validation_embeddings_cache = None # Cache validation query embeddings
self._kb_similarity_threshold = 0.95 # Threshold for deduplication
def _get_embedding_llm_config_dict(self) -> Dict:
"""Get embedding LLM config as dict with fallback to default."""
if hasattr(self, 'config') and self.config:
config_dict = self.config._embedding_llm_config_dict
if config_dict:
return config_dict
# Fallback to default if no config provided
return {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
async def _get_embeddings(self, texts: List[str]) -> Any:
"""Get embeddings using configured method"""
from .utils import get_text_embeddings
embedding_llm_config = self._get_embedding_llm_config_dict()
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
return await get_text_embeddings(
texts,
embedding_llm_config,
@@ -713,20 +679,8 @@ class EmbeddingStrategy(CrawlStrategy):
Return as a JSON array of strings."""
# Use the LLM for query generation
# Convert LLMConfig to dict if needed
llm_config_dict = None
if self.llm_config:
if isinstance(self.llm_config, dict):
llm_config_dict = self.llm_config
else:
# Convert LLMConfig object to dict
llm_config_dict = {
'provider': self.llm_config.provider,
'api_token': self.llm_config.api_token
}
provider = llm_config_dict.get('provider', 'openai/gpt-4o-mini') if llm_config_dict else 'openai/gpt-4o-mini'
api_token = llm_config_dict.get('api_token') if llm_config_dict else None
provider = self.llm_config.get('provider', 'openai/gpt-4o-mini') if self.llm_config else 'openai/gpt-4o-mini'
api_token = self.llm_config.get('api_token') if self.llm_config else None
# response = perform_completion_with_backoff(
# provider=provider,
@@ -889,7 +843,10 @@ class EmbeddingStrategy(CrawlStrategy):
# Batch embed only uncached links
if texts_to_embed:
embedding_llm_config = self._get_embedding_llm_config_dict()
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
new_embeddings = await get_text_embeddings(texts_to_embed, embedding_llm_config, self.embedding_model)
# Cache the new embeddings
@@ -1227,7 +1184,10 @@ class EmbeddingStrategy(CrawlStrategy):
return
# Get embeddings for new texts
embedding_llm_config = self._get_embedding_llm_config_dict()
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
new_embeddings = await get_text_embeddings(new_texts, embedding_llm_config, self.embedding_model)
# Deduplicate embeddings before adding to KB
@@ -1296,12 +1256,10 @@ class AdaptiveCrawler:
if strategy_name == "statistical":
return StatisticalStrategy()
elif strategy_name == "embedding":
strategy = EmbeddingStrategy(
return EmbeddingStrategy(
embedding_model=self.config.embedding_model,
llm_config=self.config.embedding_llm_config
)
strategy.config = self.config # Pass config to strategy
return strategy
else:
raise ValueError(f"Unknown strategy: {strategy_name}")

View File

@@ -1,6 +1,5 @@
import os
from typing import Union
import warnings
from .config import (
DEFAULT_PROVIDER,
DEFAULT_PROVIDER_API_KEY,
@@ -98,16 +97,13 @@ def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict:
if value != param.default and not ignore_default_value:
current_values[name] = to_serializable_dict(value)
# Don't serialize private __slots__ - they're internal implementation details
# not constructor parameters. This was causing URLPatternFilter to fail
# because _simple_suffixes was being serialized as 'simple_suffixes'
# if hasattr(obj, '__slots__'):
# for slot in obj.__slots__:
# if slot.startswith('_'): # Handle private slots
# attr_name = slot[1:] # Remove leading '_'
# value = getattr(obj, slot, None)
# if value is not None:
# current_values[attr_name] = to_serializable_dict(value)
if hasattr(obj, '__slots__'):
for slot in obj.__slots__:
if slot.startswith('_'): # Handle private slots
attr_name = slot[1:] # Remove leading '_'
value = getattr(obj, slot, None)
if value is not None:
current_values[attr_name] = to_serializable_dict(value)
@@ -258,39 +254,24 @@ class ProxyConfig:
@staticmethod
def from_string(proxy_str: str) -> "ProxyConfig":
"""Create a ProxyConfig from a string.
Supported formats:
- 'http://username:password@ip:port'
- 'http://ip:port'
- 'socks5://ip:port'
- 'ip:port:username:password'
- 'ip:port'
"""
s = (proxy_str or "").strip()
# URL with credentials
if "@" in s and "://" in s:
auth_part, server_part = s.split("@", 1)
protocol, credentials = auth_part.split("://", 1)
if ":" in credentials:
username, password = credentials.split(":", 1)
return ProxyConfig(
server=f"{protocol}://{server_part}",
username=username,
password=password,
)
# URL without credentials (keep scheme)
if "://" in s and "@" not in s:
return ProxyConfig(server=s)
# Colon separated forms
parts = s.split(":")
if len(parts) == 4:
"""Create a ProxyConfig from a string in the format 'ip:port:username:password'."""
parts = proxy_str.split(":")
if len(parts) == 4: # ip:port:username:password
ip, port, username, password = parts
return ProxyConfig(server=f"http://{ip}:{port}", username=username, password=password)
if len(parts) == 2:
return ProxyConfig(
server=f"http://{ip}:{port}",
username=username,
password=password,
ip=ip
)
elif len(parts) == 2: # ip:port only
ip, port = parts
return ProxyConfig(server=f"http://{ip}:{port}")
raise ValueError(f"Invalid proxy string format: {proxy_str}")
return ProxyConfig(
server=f"http://{ip}:{port}",
ip=ip
)
else:
raise ValueError(f"Invalid proxy string format: {proxy_str}")
@staticmethod
def from_dict(proxy_dict: Dict) -> "ProxyConfig":
@@ -454,7 +435,6 @@ class BrowserConfig:
host: str = "localhost",
enable_stealth: bool = False,
):
self.browser_type = browser_type
self.headless = headless
self.browser_mode = browser_mode
@@ -467,22 +447,13 @@ class BrowserConfig:
if self.browser_type in ["firefox", "webkit"]:
self.channel = ""
self.chrome_channel = ""
if proxy:
warnings.warn("The 'proxy' parameter is deprecated and will be removed in a future release. Use 'proxy_config' instead.", UserWarning)
self.proxy = proxy
self.proxy_config = proxy_config
if isinstance(self.proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(self.proxy_config)
if isinstance(self.proxy_config, str):
self.proxy_config = ProxyConfig.from_string(self.proxy_config)
if self.proxy and self.proxy_config:
warnings.warn("Both 'proxy' and 'proxy_config' are provided. 'proxy_config' will take precedence.", UserWarning)
self.proxy = None
elif self.proxy:
# Convert proxy string to ProxyConfig if proxy_config is not provided
self.proxy_config = ProxyConfig.from_string(self.proxy)
self.proxy = None
self.viewport_width = viewport_width
self.viewport_height = viewport_height
@@ -860,6 +831,12 @@ class HTTPCrawlerConfig:
return HTTPCrawlerConfig.from_kwargs(config)
class CrawlerRunConfig():
_UNWANTED_PROPS = {
'disable_cache' : 'Instead, use cache_mode=CacheMode.DISABLED',
'bypass_cache' : 'Instead, use cache_mode=CacheMode.BYPASS',
'no_cache_read' : 'Instead, use cache_mode=CacheMode.WRITE_ONLY',
'no_cache_write' : 'Instead, use cache_mode=CacheMode.READ_ONLY',
}
"""
Configuration class for controlling how the crawler runs each crawl operation.
@@ -1066,12 +1043,6 @@ class CrawlerRunConfig():
url: str = None # This is not a compulsory parameter
"""
_UNWANTED_PROPS = {
'disable_cache' : 'Instead, use cache_mode=CacheMode.DISABLED',
'bypass_cache' : 'Instead, use cache_mode=CacheMode.BYPASS',
'no_cache_read' : 'Instead, use cache_mode=CacheMode.WRITE_ONLY',
'no_cache_write' : 'Instead, use cache_mode=CacheMode.READ_ONLY',
}
def __init__(
self,
@@ -1150,7 +1121,6 @@ class CrawlerRunConfig():
exclude_domains: list = None,
exclude_internal_links: bool = False,
score_links: bool = False,
preserve_https_for_internal_links: bool = False,
# Debugging and Logging Parameters
verbose: bool = True,
log_console: bool = False,
@@ -1274,7 +1244,6 @@ class CrawlerRunConfig():
self.exclude_domains = exclude_domains or []
self.exclude_internal_links = exclude_internal_links
self.score_links = score_links
self.preserve_https_for_internal_links = preserve_https_for_internal_links
# Debugging and Logging Parameters
self.verbose = verbose
@@ -1548,7 +1517,6 @@ class CrawlerRunConfig():
exclude_domains=kwargs.get("exclude_domains", []),
exclude_internal_links=kwargs.get("exclude_internal_links", False),
score_links=kwargs.get("score_links", False),
preserve_https_for_internal_links=kwargs.get("preserve_https_for_internal_links", False),
# Debugging and Logging Parameters
verbose=kwargs.get("verbose", True),
log_console=kwargs.get("log_console", False),
@@ -1655,7 +1623,6 @@ class CrawlerRunConfig():
"exclude_domains": self.exclude_domains,
"exclude_internal_links": self.exclude_internal_links,
"score_links": self.score_links,
"preserve_https_for_internal_links": self.preserve_https_for_internal_links,
"verbose": self.verbose,
"log_console": self.log_console,
"capture_network_requests": self.capture_network_requests,

View File

@@ -49,6 +49,9 @@ from .utils import (
preprocess_html_for_schema,
)
# Import telemetry
from .telemetry import capture_exception, telemetry_decorator, async_telemetry_decorator
class AsyncWebCrawler:
"""
@@ -201,6 +204,7 @@ class AsyncWebCrawler:
"""异步空上下文管理器"""
yield
@async_telemetry_decorator
async def arun(
self,
url: str,
@@ -354,7 +358,6 @@ class AsyncWebCrawler:
###############################################################
# Process the HTML content, Call CrawlerStrategy.process_html #
###############################################################
from urllib.parse import urlparse
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
@@ -365,7 +368,6 @@ class AsyncWebCrawler:
verbose=config.verbose,
is_raw_html=True if url.startswith("raw:") else False,
redirected_url=async_response.redirected_url,
original_scheme=urlparse(url).scheme,
**kwargs,
)
@@ -432,6 +434,7 @@ class AsyncWebCrawler:
)
)
@async_telemetry_decorator
async def aprocess_html(
self,
url: str,

View File

@@ -15,7 +15,6 @@ from .js_snippet import load_js_script
from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from .utils import get_chromium_path
import warnings
BROWSER_DISABLE_OPTIONS = [
@@ -742,18 +741,17 @@ class BrowserManager:
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.config.proxy:
warnings.warn(
"BrowserConfig.proxy is deprecated and ignored. Use proxy_config instead.",
DeprecationWarning,
)
if self.config.proxy_config:
if self.config.proxy or self.config.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
)
browser_args["proxy"] = proxy_settings

View File

@@ -1385,6 +1385,97 @@ def profiles_cmd():
# Run interactive profile manager
anyio.run(manage_profiles)
@cli.group("telemetry")
def telemetry_cmd():
"""Manage telemetry settings for Crawl4AI
Telemetry helps improve Crawl4AI by sending anonymous crash reports.
No personal data or crawled content is ever collected.
"""
pass
@telemetry_cmd.command("enable")
@click.option("--email", "-e", help="Optional email for follow-up on critical issues")
@click.option("--always/--once", default=True, help="Always send errors (default) or just once")
def telemetry_enable_cmd(email: Optional[str], always: bool):
"""Enable telemetry to help improve Crawl4AI
Examples:
crwl telemetry enable # Enable telemetry
crwl telemetry enable --email me@ex.com # Enable with email
crwl telemetry enable --once # Send only next error
"""
from crawl4ai.telemetry import enable
try:
enable(email=email, always=always, once=not always)
console.print("[green]✅ Telemetry enabled successfully[/green]")
if email:
console.print(f" Email: {email}")
console.print(f" Mode: {'Always send errors' if always else 'Send next error only'}")
except Exception as e:
console.print(f"[red]❌ Failed to enable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("disable")
def telemetry_disable_cmd():
"""Disable telemetry
Stop sending anonymous crash reports to help improve Crawl4AI.
"""
from crawl4ai.telemetry import disable
try:
disable()
console.print("[green]✅ Telemetry disabled successfully[/green]")
except Exception as e:
console.print(f"[red]❌ Failed to disable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("status")
def telemetry_status_cmd():
"""Show current telemetry status
Display whether telemetry is enabled and current settings.
"""
from crawl4ai.telemetry import status
try:
info = status()
# Create status table
table = Table(title="Telemetry Status", show_header=False)
table.add_column("Setting", style="cyan")
table.add_column("Value")
# Status emoji
status_icon = "" if info['enabled'] else ""
table.add_row("Status", f"{status_icon} {'Enabled' if info['enabled'] else 'Disabled'}")
table.add_row("Consent", info['consent'].replace('_', ' ').title())
if info['email']:
table.add_row("Email", info['email'])
table.add_row("Environment", info['environment'])
table.add_row("Provider", info['provider'])
if info['errors_sent'] > 0:
table.add_row("Errors Sent", str(info['errors_sent']))
console.print(table)
# Add helpful messages
if not info['enabled']:
console.print("\n[yellow] Telemetry is disabled. Enable it to help improve Crawl4AI:[/yellow]")
console.print(" [dim]crwl telemetry enable[/dim]")
except Exception as e:
console.print(f"[red]❌ Failed to get telemetry status: {e}[/red]")
sys.exit(1)
@cli.command(name="")
@click.argument("url", required=False)
@click.option("--example", is_flag=True, help="Show usage examples")

View File

@@ -258,11 +258,7 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
continue
try:
normalized_href = normalize_url(
href, url,
preserve_https=kwargs.get('preserve_https_for_internal_links', False),
original_scheme=kwargs.get('original_scheme')
)
normalized_href = normalize_url(href, url)
link_data = {
"href": normalized_href,
"text": link.text_content().strip(),

View File

@@ -47,13 +47,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.url_scorer = url_scorer
self.include_external = include_external
self.max_pages = max_pages
# self.logger = logger or logging.getLogger(__name__)
# Ensure logger is always a Logger instance, not a dict from serialization
if isinstance(logger, logging.Logger):
self.logger = logger
else:
# Create a new logger if logger is None, dict, or any other non-Logger type
self.logger = logging.getLogger(__name__)
self.logger = logger or logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0
@@ -122,6 +116,11 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
valid_links.append(base_url)
# If we have more valid links than capacity, limit them
if len(valid_links) > remaining_capacity:
valid_links = valid_links[:remaining_capacity]
self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit")
# Record the new depths and add to next_links
for url in valid_links:
depths[url] = new_depth
@@ -141,8 +140,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
"""
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
# Push the initial URL with score 0 and depth 0.
initial_score = self.url_scorer.score(start_url) if self.url_scorer else 0
await queue.put((-initial_score, 0, start_url, None))
await queue.put((0, 0, start_url, None))
visited: Set[str] = set()
depths: Dict[str, int] = {start_url: 0}
@@ -189,7 +187,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
result.metadata["parent_url"] = parent_url
result.metadata["score"] = -score
result.metadata["score"] = score
# Count only successful crawls toward max_pages limit
if result.success:
@@ -210,7 +208,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
for new_url, new_parent in new_links:
new_depth = depths.get(new_url, depth + 1)
new_score = self.url_scorer.score(new_url) if self.url_scorer else 0
await queue.put((-new_score, new_depth, new_url, new_parent))
await queue.put((new_score, new_depth, new_url, new_parent))
# End of crawl.

View File

@@ -38,13 +38,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.include_external = include_external
self.score_threshold = score_threshold
self.max_pages = max_pages
# self.logger = logger or logging.getLogger(__name__)
# Ensure logger is always a Logger instance, not a dict from serialization
if isinstance(logger, logging.Logger):
self.logger = logger
else:
# Create a new logger if logger is None, dict, or any other non-Logger type
self.logger = logging.getLogger(__name__)
self.logger = logger or logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0

View File

@@ -120,9 +120,6 @@ class URLPatternFilter(URLFilter):
"""Pattern filter balancing speed and completeness"""
__slots__ = (
"patterns", # Store original patterns for serialization
"use_glob", # Store original use_glob for serialization
"reverse", # Store original reverse for serialization
"_simple_suffixes",
"_simple_prefixes",
"_domain_patterns",
@@ -145,11 +142,6 @@ class URLPatternFilter(URLFilter):
reverse: bool = False,
):
super().__init__()
# Store original constructor params for serialization
self.patterns = patterns
self.use_glob = use_glob
self.reverse = reverse
self._reverse = reverse
patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns

View File

@@ -253,16 +253,6 @@ class CrawlResult(BaseModel):
requirements change, this is where you would update the logic.
"""
result = super().model_dump(*args, **kwargs)
# Remove any property descriptors that might have been included
# These deprecated properties should not be in the serialized output
for key in ['fit_html', 'fit_markdown', 'markdown_v2']:
if key in result and isinstance(result[key], property):
# del result[key]
# Nasrin: I decided to convert it to string instead of removing it.
result[key] = str(result[key])
# Add the markdown field properly
if self._markdown is not None:
result["markdown"] = self._markdown.model_dump()
return result

View File

@@ -0,0 +1,440 @@
"""
Crawl4AI Telemetry Module.
Provides opt-in error tracking to improve stability.
"""
import os
import sys
import functools
import traceback
from typing import Optional, Any, Dict, Callable, Type
from contextlib import contextmanager, asynccontextmanager
from .base import TelemetryProvider, NullProvider
from .config import TelemetryConfig, TelemetryConsent
from .consent import ConsentManager
from .environment import Environment, EnvironmentDetector
class TelemetryManager:
"""
Main telemetry manager for Crawl4AI.
Coordinates provider, config, and consent management.
"""
_instance: Optional['TelemetryManager'] = None
def __init__(self):
"""Initialize telemetry manager."""
self.config = TelemetryConfig()
self.consent_manager = ConsentManager(self.config)
self.environment = EnvironmentDetector.detect()
self._provider: Optional[TelemetryProvider] = None
self._initialized = False
self._error_count = 0
self._max_errors = 100 # Prevent telemetry spam
# Load provider based on config
self._setup_provider()
@classmethod
def get_instance(cls) -> 'TelemetryManager':
"""
Get singleton instance of telemetry manager.
Returns:
TelemetryManager instance
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _setup_provider(self) -> None:
"""Setup telemetry provider based on configuration."""
# Update config from environment
self.config.update_from_env()
# Check if telemetry is enabled
if not self.config.is_enabled():
self._provider = NullProvider()
return
# Try to load Sentry provider
try:
from .providers.sentry import SentryProvider
# Get Crawl4AI version for release tracking
try:
from crawl4ai import __version__
release = f"crawl4ai@{__version__}"
except ImportError:
release = "crawl4ai@unknown"
self._provider = SentryProvider(
environment=self.environment.value,
release=release
)
# Initialize provider
if not self._provider.initialize():
# Fallback to null provider if init fails
self._provider = NullProvider()
except ImportError:
# Sentry not installed - use null provider
self._provider = NullProvider()
self._initialized = True
def capture_exception(
self,
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture and send an exception.
Args:
exception: The exception to capture
context: Optional additional context
Returns:
True if exception was sent
"""
# Check error count limit
if self._error_count >= self._max_errors:
return False
# Check consent on first error
if self._error_count == 0:
consent = self.consent_manager.check_and_prompt()
# Update provider if consent changed
if consent == TelemetryConsent.DENIED:
self._provider = NullProvider()
return False
elif consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]:
if isinstance(self._provider, NullProvider):
self._setup_provider()
# Check if we should send this error
if not self.config.should_send_current():
return False
# Prepare context
full_context = EnvironmentDetector.get_environment_context()
if context:
full_context.update(context)
# Add user email if available
email = self.config.get_email()
if email:
full_context['email'] = email
# Add source info
full_context['source'] = 'crawl4ai'
# Send exception
try:
if self._provider:
success = self._provider.send_exception(exception, full_context)
if success:
self._error_count += 1
return success
except Exception:
# Telemetry itself failed - ignore
pass
return False
def capture_message(
self,
message: str,
level: str = 'info',
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture a message event.
Args:
message: Message to send
level: Message level (info, warning, error)
context: Optional context
Returns:
True if message was sent
"""
if not self.config.is_enabled():
return False
payload = {
'level': level,
'message': message
}
if context:
payload.update(context)
try:
if self._provider:
return self._provider.send_event(message, payload)
except Exception:
pass
return False
def enable(
self,
email: Optional[str] = None,
always: bool = True,
once: bool = False
) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors
once: If True, send only next error
"""
if once:
consent = TelemetryConsent.ONCE
elif always:
consent = TelemetryConsent.ALWAYS
else:
consent = TelemetryConsent.ALWAYS
self.config.set_consent(consent, email)
self._setup_provider()
print("✅ Telemetry enabled")
if email:
print(f" Email: {email}")
print(f" Mode: {'once' if once else 'always'}")
def disable(self) -> None:
"""Disable telemetry."""
self.config.set_consent(TelemetryConsent.DENIED)
self._provider = NullProvider()
print("✅ Telemetry disabled")
def status(self) -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return {
'enabled': self.config.is_enabled(),
'consent': self.config.get_consent().value,
'email': self.config.get_email(),
'environment': self.environment.value,
'provider': type(self._provider).__name__ if self._provider else 'None',
'errors_sent': self._error_count
}
def flush(self) -> None:
"""Flush any pending telemetry data."""
if self._provider:
self._provider.flush()
def shutdown(self) -> None:
"""Shutdown telemetry."""
if self._provider:
self._provider.shutdown()
# Global instance
_telemetry_manager: Optional[TelemetryManager] = None
def get_telemetry() -> TelemetryManager:
"""
Get global telemetry manager instance.
Returns:
TelemetryManager instance
"""
global _telemetry_manager
if _telemetry_manager is None:
_telemetry_manager = TelemetryManager.get_instance()
return _telemetry_manager
def capture_exception(
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture an exception for telemetry.
Args:
exception: Exception to capture
context: Optional context
Returns:
True if sent successfully
"""
try:
return get_telemetry().capture_exception(exception, context)
except Exception:
return False
def telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from a function.
Args:
func: Function to wrap
Returns:
Wrapped function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
def async_telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from an async function.
Args:
func: Async function to wrap
Returns:
Wrapped async function
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
@contextmanager
def telemetry_context(operation: str):
"""
Context manager for capturing exceptions.
Args:
operation: Name of the operation
Example:
with telemetry_context("web_crawl"):
# Your code here
pass
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
@asynccontextmanager
async def async_telemetry_context(operation: str):
"""
Async context manager for capturing exceptions in async code.
Args:
operation: Name of the operation
Example:
async with async_telemetry_context("async_crawl"):
# Your async code here
await something()
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
def install_exception_handler():
"""Install global exception handler for uncaught exceptions."""
original_hook = sys.excepthook
def telemetry_exception_hook(exc_type, exc_value, exc_traceback):
"""Custom exception hook with telemetry."""
# Don't capture KeyboardInterrupt
if not issubclass(exc_type, KeyboardInterrupt):
capture_exception(exc_value, {
'uncaught': True,
'type': exc_type.__name__
})
# Call original hook
original_hook(exc_type, exc_value, exc_traceback)
sys.excepthook = telemetry_exception_hook
# Public API
def enable(email: Optional[str] = None, always: bool = True, once: bool = False) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors (default)
once: If True, send only the next error
"""
get_telemetry().enable(email=email, always=always, once=once)
def disable() -> None:
"""Disable telemetry."""
get_telemetry().disable()
def status() -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return get_telemetry().status()
# Auto-install exception handler on import
# (Only for main library usage, not for Docker/API)
if EnvironmentDetector.detect() not in [Environment.DOCKER, Environment.API_SERVER]:
install_exception_handler()
__all__ = [
'TelemetryManager',
'get_telemetry',
'capture_exception',
'telemetry_decorator',
'async_telemetry_decorator',
'telemetry_context',
'async_telemetry_context',
'enable',
'disable',
'status',
]

140
crawl4ai/telemetry/base.py Normal file
View File

@@ -0,0 +1,140 @@
"""
Base telemetry provider interface for Crawl4AI.
Provides abstraction for different telemetry backends.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Union
import traceback
class TelemetryProvider(ABC):
"""Abstract base class for telemetry providers."""
def __init__(self, **kwargs):
"""Initialize the provider with optional configuration."""
self.config = kwargs
self._initialized = False
@abstractmethod
def initialize(self) -> bool:
"""
Initialize the telemetry provider.
Returns True if initialization successful, False otherwise.
"""
pass
@abstractmethod
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send an exception to the telemetry backend.
Args:
exc: The exception to report
context: Optional context data (email, environment, etc.)
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send a generic telemetry event.
Args:
event_name: Name of the event
payload: Optional event data
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def flush(self) -> None:
"""Flush any pending telemetry data."""
pass
@abstractmethod
def shutdown(self) -> None:
"""Clean shutdown of the provider."""
pass
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove sensitive information from telemetry data.
Override in subclasses for custom sanitization.
Args:
data: Raw data dictionary
Returns:
Sanitized data dictionary
"""
# Default implementation - remove common sensitive fields
sensitive_keys = {
'password', 'token', 'api_key', 'secret', 'credential',
'auth', 'authorization', 'cookie', 'session'
}
def _sanitize_dict(d: Dict) -> Dict:
sanitized = {}
for key, value in d.items():
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
sanitized[key] = '[REDACTED]'
elif isinstance(value, dict):
sanitized[key] = _sanitize_dict(value)
elif isinstance(value, list):
sanitized[key] = [
_sanitize_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
sanitized[key] = value
return sanitized
return _sanitize_dict(data) if isinstance(data, dict) else data
class NullProvider(TelemetryProvider):
"""No-op provider for when telemetry is disabled."""
def initialize(self) -> bool:
"""No initialization needed for null provider."""
self._initialized = True
return True
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op exception sending."""
return True
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op event sending."""
return True
def flush(self) -> None:
"""No-op flush."""
pass
def shutdown(self) -> None:
"""No-op shutdown."""
pass

View File

@@ -0,0 +1,196 @@
"""
Configuration management for Crawl4AI telemetry.
Handles user preferences and persistence.
"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
from enum import Enum
class TelemetryConsent(Enum):
"""Telemetry consent levels."""
NOT_SET = "not_set"
DENIED = "denied"
ONCE = "once" # Send current error only
ALWAYS = "always" # Send all errors
class TelemetryConfig:
"""Manages telemetry configuration and persistence."""
def __init__(self, config_dir: Optional[Path] = None):
"""
Initialize configuration manager.
Args:
config_dir: Optional custom config directory
"""
if config_dir:
self.config_dir = config_dir
else:
# Default to ~/.crawl4ai/
self.config_dir = Path.home() / '.crawl4ai'
self.config_file = self.config_dir / 'config.json'
self._config: Dict[str, Any] = {}
self._load_config()
def _ensure_config_dir(self) -> None:
"""Ensure configuration directory exists."""
self.config_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self) -> None:
"""Load configuration from disk."""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
self._config = json.load(f)
except (json.JSONDecodeError, IOError):
# Corrupted or inaccessible config - start fresh
self._config = {}
else:
self._config = {}
def _save_config(self) -> bool:
"""
Save configuration to disk.
Returns:
True if saved successfully
"""
try:
self._ensure_config_dir()
# Write to temporary file first
temp_file = self.config_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(self._config, f, indent=2)
# Atomic rename
temp_file.replace(self.config_file)
return True
except (IOError, OSError):
return False
def get_telemetry_settings(self) -> Dict[str, Any]:
"""
Get current telemetry settings.
Returns:
Dictionary with telemetry settings
"""
return self._config.get('telemetry', {
'consent': TelemetryConsent.NOT_SET.value,
'email': None
})
def get_consent(self) -> TelemetryConsent:
"""
Get current consent status.
Returns:
TelemetryConsent enum value
"""
settings = self.get_telemetry_settings()
consent_value = settings.get('consent', TelemetryConsent.NOT_SET.value)
# Handle legacy boolean values
if isinstance(consent_value, bool):
consent_value = TelemetryConsent.ALWAYS.value if consent_value else TelemetryConsent.DENIED.value
try:
return TelemetryConsent(consent_value)
except ValueError:
return TelemetryConsent.NOT_SET
def set_consent(
self,
consent: TelemetryConsent,
email: Optional[str] = None
) -> bool:
"""
Set telemetry consent and optional email.
Args:
consent: Consent level
email: Optional email for follow-up
Returns:
True if saved successfully
"""
if 'telemetry' not in self._config:
self._config['telemetry'] = {}
self._config['telemetry']['consent'] = consent.value
# Only update email if provided
if email is not None:
self._config['telemetry']['email'] = email
return self._save_config()
def get_email(self) -> Optional[str]:
"""
Get stored email if any.
Returns:
Email address or None
"""
settings = self.get_telemetry_settings()
return settings.get('email')
def is_enabled(self) -> bool:
"""
Check if telemetry is enabled.
Returns:
True if telemetry should send data
"""
consent = self.get_consent()
return consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]
def should_send_current(self) -> bool:
"""
Check if current error should be sent.
Used for one-time consent.
Returns:
True if current error should be sent
"""
consent = self.get_consent()
if consent == TelemetryConsent.ONCE:
# After sending once, reset to NOT_SET
self.set_consent(TelemetryConsent.NOT_SET)
return True
return consent == TelemetryConsent.ALWAYS
def clear(self) -> bool:
"""
Clear all telemetry settings.
Returns:
True if cleared successfully
"""
if 'telemetry' in self._config:
del self._config['telemetry']
return self._save_config()
return True
def update_from_env(self) -> None:
"""Update configuration from environment variables."""
# Check for telemetry disable flag
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.set_consent(TelemetryConsent.DENIED)
# Check for email override
env_email = os.environ.get('CRAWL4AI_TELEMETRY_EMAIL')
if env_email and self.is_enabled():
current_settings = self.get_telemetry_settings()
self.set_consent(
TelemetryConsent(current_settings['consent']),
email=env_email
)

View File

@@ -0,0 +1,314 @@
"""
User consent handling for Crawl4AI telemetry.
Provides interactive prompts for different environments.
"""
import sys
from typing import Optional, Tuple
from .config import TelemetryConsent, TelemetryConfig
from .environment import Environment, EnvironmentDetector
class ConsentManager:
"""Manages user consent for telemetry."""
def __init__(self, config: Optional[TelemetryConfig] = None):
"""
Initialize consent manager.
Args:
config: Optional TelemetryConfig instance
"""
self.config = config or TelemetryConfig()
self.environment = EnvironmentDetector.detect()
def check_and_prompt(self) -> TelemetryConsent:
"""
Check consent status and prompt if needed.
Returns:
Current consent status
"""
current_consent = self.config.get_consent()
# If already set, return current value
if current_consent != TelemetryConsent.NOT_SET:
return current_consent
# Docker/API server: default enabled (check env var)
if self.environment in [Environment.DOCKER, Environment.API_SERVER]:
return self._handle_docker_consent()
# Interactive environments: prompt user
if EnvironmentDetector.is_interactive():
return self._prompt_for_consent()
# Non-interactive: default disabled
return TelemetryConsent.DENIED
def _handle_docker_consent(self) -> TelemetryConsent:
"""
Handle consent in Docker environment.
Default enabled unless disabled via env var.
"""
import os
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.config.set_consent(TelemetryConsent.DENIED)
return TelemetryConsent.DENIED
# Default enabled for Docker
self.config.set_consent(TelemetryConsent.ALWAYS)
return TelemetryConsent.ALWAYS
def _prompt_for_consent(self) -> TelemetryConsent:
"""
Prompt user for consent based on environment.
Returns:
User's consent choice
"""
if self.environment == Environment.CLI:
return self._cli_prompt()
elif self.environment in [Environment.JUPYTER, Environment.COLAB]:
return self._notebook_prompt()
else:
return TelemetryConsent.DENIED
def _cli_prompt(self) -> TelemetryConsent:
"""
Show CLI prompt for consent.
Returns:
User's consent choice
"""
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detection")
print("="*60)
print("\nWe noticed an error occurred. Help improve Crawl4AI by")
print("sending anonymous crash reports?")
print("\n[1] Yes, send this error only")
print("[2] Yes, always send errors")
print("[3] No, don't send")
print("\n" + "-"*60)
# Get choice
while True:
try:
choice = input("Your choice (1/2/3): ").strip()
if choice == '1':
consent = TelemetryConsent.ONCE
break
elif choice == '2':
consent = TelemetryConsent.ALWAYS
break
elif choice == '3':
consent = TelemetryConsent.DENIED
break
else:
print("Please enter 1, 2, or 3")
except (KeyboardInterrupt, EOFError):
# User cancelled - treat as denial
consent = TelemetryConsent.DENIED
break
# Optional email
email = None
if consent != TelemetryConsent.DENIED:
print("\nOptional: Enter email for follow-up (or press Enter to skip):")
try:
email_input = input("Email: ").strip()
if email_input and '@' in email_input:
email = email_input
except (KeyboardInterrupt, EOFError):
pass
# Save choice
self.config.set_consent(consent, email)
if consent != TelemetryConsent.DENIED:
print("\n✅ Thank you for helping improve Crawl4AI!")
else:
print("\n✅ Telemetry disabled. You can enable it anytime with:")
print(" crawl4ai telemetry enable")
print("="*60 + "\n")
return consent
def _notebook_prompt(self) -> TelemetryConsent:
"""
Show notebook prompt for consent.
Uses widgets if available, falls back to print + code.
Returns:
User's consent choice
"""
if EnvironmentDetector.supports_widgets():
return self._widget_prompt()
else:
return self._notebook_fallback_prompt()
def _widget_prompt(self) -> TelemetryConsent:
"""
Show interactive widget prompt in Jupyter/Colab.
Returns:
User's consent choice
"""
try:
import ipywidgets as widgets
from IPython.display import display, HTML
# Create styled HTML
html = HTML("""
<div style="padding: 15px; border: 2px solid #ff6b6b; border-radius: 8px; background: #fff5f5;">
<h3 style="color: #c92a2a; margin-top: 0;">🚨 Crawl4AI Error Detected</h3>
<p style="color: #495057;">Help us improve by sending anonymous crash reports?</p>
</div>
""")
display(html)
# Create buttons
btn_once = widgets.Button(
description='Send this error',
button_style='info',
icon='check'
)
btn_always = widgets.Button(
description='Always send',
button_style='success',
icon='check-circle'
)
btn_never = widgets.Button(
description='Don\'t send',
button_style='danger',
icon='times'
)
# Email input
email_input = widgets.Text(
placeholder='Optional: your@email.com',
description='Email:',
style={'description_width': 'initial'}
)
# Output area for feedback
output = widgets.Output()
# Container
button_box = widgets.HBox([btn_once, btn_always, btn_never])
container = widgets.VBox([button_box, email_input, output])
# Variable to store choice
consent_choice = {'value': None}
def on_button_click(btn):
"""Handle button click."""
with output:
output.clear_output()
if btn == btn_once:
consent_choice['value'] = TelemetryConsent.ONCE
print("✅ Sending this error only")
elif btn == btn_always:
consent_choice['value'] = TelemetryConsent.ALWAYS
print("✅ Always sending errors")
else:
consent_choice['value'] = TelemetryConsent.DENIED
print("✅ Telemetry disabled")
# Save with email if provided
email = email_input.value.strip() if email_input.value else None
self.config.set_consent(consent_choice['value'], email)
# Disable buttons after choice
btn_once.disabled = True
btn_always.disabled = True
btn_never.disabled = True
email_input.disabled = True
# Attach handlers
btn_once.on_click(on_button_click)
btn_always.on_click(on_button_click)
btn_never.on_click(on_button_click)
# Display widget
display(container)
# Wait for user choice (in notebook, this is non-blocking)
# Return NOT_SET for now, actual choice will be saved via callback
return consent_choice.get('value', TelemetryConsent.NOT_SET)
except Exception:
# Fallback if widgets fail
return self._notebook_fallback_prompt()
def _notebook_fallback_prompt(self) -> TelemetryConsent:
"""
Fallback prompt for notebooks without widget support.
Returns:
User's consent choice (defaults to DENIED)
"""
try:
from IPython.display import display, Markdown
markdown_content = """
### 🚨 Crawl4AI Error Detected
Help us improve by sending anonymous crash reports.
**Telemetry is currently OFF.** To enable, run:
```python
import crawl4ai
crawl4ai.telemetry.enable(email="your@email.com", always=True)
```
To send just this error:
```python
crawl4ai.telemetry.enable(once=True)
```
To keep telemetry disabled:
```python
crawl4ai.telemetry.disable()
```
"""
display(Markdown(markdown_content))
except ImportError:
# Pure print fallback
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detected")
print("="*60)
print("\nTelemetry is OFF. To enable, run:")
print("\nimport crawl4ai")
print('crawl4ai.telemetry.enable(email="you@example.com", always=True)')
print("\n" + "="*60)
# Default to disabled in fallback mode
return TelemetryConsent.DENIED
def force_prompt(self) -> Tuple[TelemetryConsent, Optional[str]]:
"""
Force a consent prompt regardless of current settings.
Used for manual telemetry configuration.
Returns:
Tuple of (consent choice, optional email)
"""
# Temporarily reset consent to force prompt
original_consent = self.config.get_consent()
self.config.set_consent(TelemetryConsent.NOT_SET)
try:
new_consent = self._prompt_for_consent()
email = self.config.get_email()
return new_consent, email
except Exception:
# Restore original on error
self.config.set_consent(original_consent)
raise

View File

@@ -0,0 +1,199 @@
"""
Environment detection for Crawl4AI telemetry.
Detects whether we're running in CLI, Docker, Jupyter, etc.
"""
import os
import sys
from enum import Enum
from typing import Optional
class Environment(Enum):
"""Detected runtime environment."""
CLI = "cli"
DOCKER = "docker"
JUPYTER = "jupyter"
COLAB = "colab"
API_SERVER = "api_server"
UNKNOWN = "unknown"
class EnvironmentDetector:
"""Detects the current runtime environment."""
@staticmethod
def detect() -> Environment:
"""
Detect current runtime environment.
Returns:
Environment enum value
"""
# Check for Docker
if EnvironmentDetector._is_docker():
# Further check if it's API server
if EnvironmentDetector._is_api_server():
return Environment.API_SERVER
return Environment.DOCKER
# Check for Google Colab
if EnvironmentDetector._is_colab():
return Environment.COLAB
# Check for Jupyter
if EnvironmentDetector._is_jupyter():
return Environment.JUPYTER
# Check for CLI
if EnvironmentDetector._is_cli():
return Environment.CLI
return Environment.UNKNOWN
@staticmethod
def _is_docker() -> bool:
"""Check if running inside Docker container."""
# Check for Docker-specific files
if os.path.exists('/.dockerenv'):
return True
# Check cgroup for docker signature
try:
with open('/proc/1/cgroup', 'r') as f:
return 'docker' in f.read()
except (IOError, OSError):
pass
# Check environment variable (if set in Dockerfile)
return os.environ.get('CRAWL4AI_DOCKER', '').lower() == 'true'
@staticmethod
def _is_api_server() -> bool:
"""Check if running as API server."""
# Check for API server indicators
return (
os.environ.get('CRAWL4AI_API_SERVER', '').lower() == 'true' or
'deploy/docker/server.py' in ' '.join(sys.argv) or
'deploy/docker/api.py' in ' '.join(sys.argv)
)
@staticmethod
def _is_jupyter() -> bool:
"""Check if running in Jupyter notebook."""
try:
# Check for IPython
from IPython import get_ipython
ipython = get_ipython()
if ipython is None:
return False
# Check for notebook kernel
if 'IPKernelApp' in ipython.config:
return True
# Check for Jupyter-specific attributes
if hasattr(ipython, 'kernel'):
return True
except (ImportError, AttributeError):
pass
return False
@staticmethod
def _is_colab() -> bool:
"""Check if running in Google Colab."""
try:
import google.colab
return True
except ImportError:
pass
# Alternative check
return 'COLAB_GPU' in os.environ or 'COLAB_TPU_ADDR' in os.environ
@staticmethod
def _is_cli() -> bool:
"""Check if running from command line."""
# Check if we have a terminal
return (
hasattr(sys, 'ps1') or
sys.stdin.isatty() or
bool(os.environ.get('TERM'))
)
@staticmethod
def is_interactive() -> bool:
"""
Check if environment supports interactive prompts.
Returns:
True if interactive prompts are supported
"""
env = EnvironmentDetector.detect()
# Docker/API server are non-interactive
if env in [Environment.DOCKER, Environment.API_SERVER]:
return False
# CLI with TTY is interactive
if env == Environment.CLI:
return sys.stdin.isatty()
# Jupyter/Colab can be interactive with widgets
if env in [Environment.JUPYTER, Environment.COLAB]:
return True
return False
@staticmethod
def supports_widgets() -> bool:
"""
Check if environment supports IPython widgets.
Returns:
True if widgets are supported
"""
env = EnvironmentDetector.detect()
if env not in [Environment.JUPYTER, Environment.COLAB]:
return False
try:
import ipywidgets
from IPython.display import display
return True
except ImportError:
return False
@staticmethod
def get_environment_context() -> dict:
"""
Get environment context for telemetry.
Returns:
Dictionary with environment information
"""
env = EnvironmentDetector.detect()
context = {
'environment_type': env.value,
'python_version': f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
'platform': sys.platform,
}
# Add environment-specific context
if env == Environment.DOCKER:
context['docker'] = True
context['container_id'] = os.environ.get('HOSTNAME', 'unknown')
elif env == Environment.COLAB:
context['colab'] = True
context['gpu'] = bool(os.environ.get('COLAB_GPU'))
elif env == Environment.JUPYTER:
context['jupyter'] = True
return context

View File

@@ -0,0 +1,15 @@
"""
Telemetry providers for Crawl4AI.
"""
from ..base import TelemetryProvider, NullProvider
__all__ = ['TelemetryProvider', 'NullProvider']
# Try to import Sentry provider if available
try:
from .sentry import SentryProvider
__all__.append('SentryProvider')
except ImportError:
# Sentry SDK not installed
pass

View File

@@ -0,0 +1,234 @@
"""
Sentry telemetry provider for Crawl4AI.
"""
import os
from typing import Dict, Any, Optional
from ..base import TelemetryProvider
# Hardcoded DSN for Crawl4AI project
# This is safe to embed as it's the public part of the DSN
# TODO: Replace with actual Crawl4AI Sentry project DSN before release
# Format: "https://<public_key>@<organization>.ingest.sentry.io/<project_id>"
DEFAULT_SENTRY_DSN = "https://your-public-key@sentry.io/your-project-id"
class SentryProvider(TelemetryProvider):
"""Sentry implementation of telemetry provider."""
def __init__(self, dsn: Optional[str] = None, **kwargs):
"""
Initialize Sentry provider.
Args:
dsn: Optional DSN override (for testing/development)
**kwargs: Additional Sentry configuration
"""
super().__init__(**kwargs)
# Allow DSN override via environment variable or parameter
self.dsn = (
dsn or
os.environ.get('CRAWL4AI_SENTRY_DSN') or
DEFAULT_SENTRY_DSN
)
self._sentry_sdk = None
self.environment = kwargs.get('environment', 'production')
self.release = kwargs.get('release', None)
def initialize(self) -> bool:
"""Initialize Sentry SDK."""
try:
import sentry_sdk
from sentry_sdk.integrations.stdlib import StdlibIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
# Initialize Sentry with minimal integrations
sentry_sdk.init(
dsn=self.dsn,
environment=self.environment,
release=self.release,
# Performance monitoring disabled by default
traces_sample_rate=0.0,
# Only capture errors, not transactions
# profiles_sample_rate=0.0,
# Minimal integrations
integrations=[
StdlibIntegration(),
ExcepthookIntegration(always_run=False),
],
# Privacy settings
send_default_pii=False,
attach_stacktrace=True,
# Before send hook for additional sanitization
before_send=self._before_send,
# Disable automatic breadcrumbs
max_breadcrumbs=0,
# Disable request data collection
# request_bodies='never',
# # Custom transport options
# transport_options={
# 'keepalive': True,
# },
)
self._sentry_sdk = sentry_sdk
self._initialized = True
return True
except ImportError:
# Sentry SDK not installed
return False
except Exception:
# Initialization failed silently
return False
def _before_send(self, event: Dict[str, Any], hint: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Process event before sending to Sentry.
Provides additional privacy protection.
"""
# Remove sensitive data
if 'request' in event:
event['request'] = self._sanitize_request(event['request'])
# Remove local variables that might contain sensitive data
if 'exception' in event and 'values' in event['exception']:
for exc in event['exception']['values']:
if 'stacktrace' in exc and 'frames' in exc['stacktrace']:
for frame in exc['stacktrace']['frames']:
# Remove local variables from frames
frame.pop('vars', None)
# Apply general sanitization
event = self.sanitize_data(event)
return event
def _sanitize_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize request data to remove sensitive information."""
sanitized = request_data.copy()
# Remove sensitive fields
sensitive_fields = ['cookies', 'headers', 'data', 'query_string', 'env']
for field in sensitive_fields:
if field in sanitized:
sanitized[field] = '[REDACTED]'
# Keep only safe fields
safe_fields = ['method', 'url']
return {k: v for k, v in sanitized.items() if k in safe_fields}
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send exception to Sentry.
Args:
exc: Exception to report
context: Optional context (email, environment info)
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
with self._sentry_sdk.push_scope() as scope:
# Add user context if email provided
if context and 'email' in context:
scope.set_user({'email': context['email']})
# Add additional context
if context:
for key, value in context.items():
if key != 'email':
scope.set_context(key, value)
# Add tags for filtering
scope.set_tag('source', context.get('source', 'unknown'))
scope.set_tag('environment_type', context.get('environment_type', 'unknown'))
# Capture the exception
self._sentry_sdk.capture_exception(exc)
return True
except Exception:
# Silently fail - telemetry should never crash the app
return False
return False
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send custom event to Sentry.
Args:
event_name: Name of the event
payload: Event data
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
# Sanitize payload
safe_payload = self.sanitize_data(payload) if payload else {}
# Send as a message with extra data
self._sentry_sdk.capture_message(
event_name,
level='info',
extras=safe_payload
)
return True
except Exception:
return False
return False
def flush(self) -> None:
"""Flush pending events to Sentry."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
except Exception:
pass
def shutdown(self) -> None:
"""Shutdown Sentry client."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
# Note: sentry_sdk doesn't have a shutdown method
# Flush is sufficient for cleanup
except Exception:
pass
finally:
self._initialized = False

View File

@@ -1790,10 +1790,6 @@ def perform_completion_with_backoff(
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
@@ -2150,9 +2146,7 @@ def normalize_url(
drop_query_tracking=True,
sort_query=True,
keep_fragment=False,
extra_drop_params=None,
preserve_https=False,
original_scheme=None
extra_drop_params=None
):
"""
Extended URL normalizer
@@ -2177,42 +2171,21 @@ def normalize_url(
str | None
A clean, canonical URL or None if href is empty/None.
"""
if not href or not href.strip():
if not href:
return None
# Resolve relative paths first
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse once, edit parts, then rebuild
parsed = urlparse(full_url)
# ── netloc ──
netloc = parsed.netloc.lower()
# Remove default ports
if ':' in netloc:
host, port = netloc.rsplit(':', 1)
if (parsed.scheme == 'http' and port == '80') or (parsed.scheme == 'https' and port == '443'):
netloc = host
else:
netloc = f"{host}:{port}"
# ── path ──
# Strip duplicate slashes and trailing "/" (except root)
# IMPORTANT: Don't use quote(unquote()) as it mangles + signs in URLs
# The path from urlparse is already properly encoded
path = parsed.path
# Strip duplicate slashes and trailing “/” (except root)
path = quote(unquote(parsed.path))
if path.endswith('/') and path != '/':
path = path.rstrip('/')
@@ -2220,25 +2193,21 @@ def normalize_url(
query = parsed.query
if query:
# explode, mutate, then rebuild
params = list(parse_qsl(query, keep_blank_values=True)) # Parse query string into key-value pairs, preserving blank values
params = [(k.lower(), v) for k, v in parse_qsl(query, keep_blank_values=True)]
if drop_query_tracking:
# Define default tracking parameters to remove for cleaner URLs
default_tracking = {
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term',
'utm_content', 'gclid', 'fbclid', 'ref', 'ref_src'
}
if extra_drop_params:
default_tracking |= {p.lower() for p in extra_drop_params} # Add any extra parameters to drop, case-insensitive
params = [(k, v) for k, v in params if k not in default_tracking] # Filter out tracking parameters
# Normalize parameter keys
params = [(k, v) for k, v in params]
default_tracking |= {p.lower() for p in extra_drop_params}
params = [(k, v) for k, v in params if k not in default_tracking]
if sort_query:
params.sort(key=lambda kv: kv[0]) # Sort parameters alphabetically by key (now lowercase)
params.sort(key=lambda kv: kv[0])
query = urlencode(params, doseq=True) if params else '' # Rebuild query string, handling sequences properly
query = urlencode(params, doseq=True) if params else ''
# ── fragment ──
fragment = parsed.fragment if keep_fragment else ''
@@ -2256,7 +2225,7 @@ def normalize_url(
return normalized
def normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
def normalize_url_for_deep_crawl(href, base_url):
"""Normalize URLs to ensure consistent format"""
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
@@ -2267,17 +2236,6 @@ def normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_
# Use urljoin to handle relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse the URL for normalization
parsed = urlparse(full_url)
@@ -2315,7 +2273,7 @@ def normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_
return normalized
@lru_cache(maxsize=10000)
def efficient_normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
def efficient_normalize_url_for_deep_crawl(href, base_url):
"""Efficient URL normalization with proper parsing"""
from urllib.parse import urljoin
@@ -2325,17 +2283,6 @@ def efficient_normalize_url_for_deep_crawl(href, base_url, preserve_https=False,
# Resolve relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Use proper URL parsing
parsed = urlparse(full_url)

View File

@@ -10,23 +10,4 @@ GEMINI_API_TOKEN=your_gemini_key_here
# Optional: Override the default LLM provider
# Examples: "openai/gpt-4", "anthropic/claude-3-opus", "deepseek/chat", etc.
# If not set, uses the provider specified in config.yml (default: openai/gpt-4o-mini)
# LLM_PROVIDER=anthropic/claude-3-opus
# Optional: Global LLM temperature setting (0.0-2.0)
# Controls randomness in responses. Lower = more focused, Higher = more creative
# LLM_TEMPERATURE=0.7
# Optional: Global custom API base URL
# Use this to point to custom endpoints or proxy servers
# LLM_BASE_URL=https://api.custom.com/v1
# Optional: Provider-specific temperature overrides
# These take precedence over the global LLM_TEMPERATURE
# OPENAI_TEMPERATURE=0.5
# ANTHROPIC_TEMPERATURE=0.3
# GROQ_TEMPERATURE=0.8
# Optional: Provider-specific base URL overrides
# Use for provider-specific proxy endpoints
# OPENAI_BASE_URL=https://custom-openai.company.com/v1
# GROQ_BASE_URL=https://custom-groq.company.com/v1
# LLM_PROVIDER=anthropic/claude-3-opus

View File

@@ -692,7 +692,8 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
# api_key: sk-... # If you pass the API key directly (not recommended)
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# Redis Configuration (Used by internal Redis server managed by supervisord)
redis:

View File

@@ -4,7 +4,7 @@ import asyncio
from typing import List, Tuple, Dict
from functools import partial
from uuid import uuid4
from datetime import datetime, timezone
from datetime import datetime
from base64 import b64encode
import logging
@@ -42,9 +42,7 @@ from utils import (
should_cleanup_task,
decode_redis_hash,
get_llm_api_key,
validate_llm_provider,
get_llm_temperature,
get_llm_base_url
validate_llm_provider
)
import psutil, time
@@ -98,9 +96,7 @@ async def handle_llm_qa(
response = perform_completion_with_backoff(
provider=config["llm"]["provider"],
prompt_with_variables=prompt,
api_token=get_llm_api_key(config), # Returns None to let litellm handle it
temperature=get_llm_temperature(config),
base_url=get_llm_base_url(config)
api_token=get_llm_api_key(config)
)
return response.choices[0].message.content
@@ -119,9 +115,7 @@ async def process_llm_extraction(
instruction: str,
schema: Optional[str] = None,
cache: str = "0",
provider: Optional[str] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
provider: Optional[str] = None
) -> None:
"""Process LLM extraction in background."""
try:
@@ -133,13 +127,11 @@ async def process_llm_extraction(
"error": error_msg
})
return
api_key = get_llm_api_key(config, provider) # Returns None to let litellm handle it
api_key = get_llm_api_key(config, provider)
llm_strategy = LLMExtractionStrategy(
llm_config=LLMConfig(
provider=provider or config["llm"]["provider"],
api_token=api_key,
temperature=temperature or get_llm_temperature(config, provider),
base_url=base_url or get_llm_base_url(config, provider)
api_token=api_key
),
instruction=instruction,
schema=json.loads(schema) if schema else None,
@@ -186,9 +178,7 @@ async def handle_markdown_request(
query: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
provider: Optional[str] = None
) -> str:
"""Handle markdown generation requests."""
try:
@@ -213,9 +203,7 @@ async def handle_markdown_request(
FilterType.LLM: LLMContentFilter(
llm_config=LLMConfig(
provider=provider or config["llm"]["provider"],
api_token=get_llm_api_key(config, provider), # Returns None to let litellm handle it
temperature=temperature or get_llm_temperature(config, provider),
base_url=base_url or get_llm_base_url(config, provider)
api_token=get_llm_api_key(config, provider),
),
instruction=query or "Extract main content"
)
@@ -260,9 +248,7 @@ async def handle_llm_request(
schema: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
provider: Optional[str] = None
) -> JSONResponse:
"""Handle LLM extraction requests."""
base_url = get_base_url(request)
@@ -293,9 +279,7 @@ async def handle_llm_request(
cache,
base_url,
config,
provider,
temperature,
api_base_url
provider
)
except Exception as e:
@@ -340,9 +324,7 @@ async def create_new_task(
cache: str,
base_url: str,
config: dict,
provider: Optional[str] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
provider: Optional[str] = None
) -> JSONResponse:
"""Create and initialize a new task."""
decoded_url = unquote(input_path)
@@ -367,9 +349,7 @@ async def create_new_task(
query,
schema,
cache,
provider,
temperature,
api_base_url
provider
)
return JSONResponse({
@@ -413,9 +393,6 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# Ensure fit_html is JSON-serializable
if "fit_html" in result_dict and not (result_dict["fit_html"] is None or isinstance(result_dict["fit_html"], str)):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -496,9 +473,6 @@ async def handle_crawl_request(
processed_results = []
for result in results:
result_dict = result.model_dump()
# if fit_html is not a string, set it to None to avoid serialization errors
if "fit_html" in result_dict and not (result_dict["fit_html"] is None or isinstance(result_dict["fit_html"], str)):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -602,7 +576,7 @@ async def handle_crawl_job(
task_id = f"crawl_{uuid4().hex[:8]}"
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"created_at": datetime.utcnow().isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",

View File

@@ -28,43 +28,25 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -
signing_key = get_jwk_from_secret(SECRET_KEY)
return instance.encode(to_encode, signing_key, alg='HS256')
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Verify the JWT token from the Authorization header."""
if not credentials or not credentials.credentials:
raise HTTPException(
status_code=401,
detail="No token provided",
headers={"WWW-Authenticate": "Bearer"}
)
if credentials is None:
return None
token = credentials.credentials
verifying_key = get_jwk_from_secret(SECRET_KEY)
try:
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
return payload
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Invalid or expired token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
def get_token_dependency(config: Dict):
"""Return the token dependency if JWT is enabled, else a function that returns None."""
if config.get("security", {}).get("jwt_enabled", False):
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Enforce JWT authentication when enabled."""
if credentials is None:
raise HTTPException(
status_code=401,
detail="Authentication required. Please provide a valid Bearer token.",
headers={"WWW-Authenticate": "Bearer"}
)
return verify_token(credentials)
return jwt_required
return verify_token
else:
return lambda: None

View File

@@ -7520,18 +7520,17 @@ class BrowserManager:
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.config.proxy:
warnings.warn(
"BrowserConfig.proxy is deprecated and ignored. Use proxy_config instead.",
DeprecationWarning,
)
if self.config.proxy_config:
if self.config.proxy or self.config.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
)
browser_args["proxy"] = proxy_settings

View File

@@ -2241,7 +2241,7 @@ docker build -t crawl4ai
| Argument | Description | Default | Options |
|----------|-------------|---------|----------|
| PYTHON_VERSION | Python version | 3.10 | 3.10, 3.11, 3.12, 3.13 |
| PYTHON_VERSION | Python version | 3.10 | 3.8, 3.9, 3.10 |
| INSTALL_TYPE | Feature set | default | default, all, torch, transformer |
| ENABLE_GPU | GPU support | false | true, false |
| APP_HOME | Install path | /app | any valid path |

View File

@@ -11,7 +11,8 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini"
# api_key: sk-... # If you pass the API key directly (not recommended)
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# Redis Configuration
redis:
@@ -38,8 +39,8 @@ rate_limiting:
# Security Configuration
security:
enabled: false
jwt_enabled: false
enabled: false
jwt_enabled: false
https_redirect: false
trusted_hosts: ["*"]
headers:

View File

@@ -37,8 +37,6 @@ class LlmJobPayload(BaseModel):
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
temperature: Optional[float] = None
base_url: Optional[str] = None
class CrawlJobPayload(BaseModel):
@@ -65,8 +63,6 @@ async def llm_job_enqueue(
cache=payload.cache,
config=_config,
provider=payload.provider,
temperature=payload.temperature,
api_base_url=payload.base_url,
)
@@ -76,7 +72,7 @@ async def llm_job_status(
task_id: str,
_td: Dict = Depends(lambda: _token_dep())
):
return await handle_task_status(_redis, task_id, base_url=str(request.base_url))
return await handle_task_status(_redis, task_id)
# ---------- CRAWL job -------------------------------------------------------

View File

@@ -15,3 +15,4 @@ PyJWT==2.10.1
mcp>=1.6.0
websockets>=15.0.1
httpx[http2]>=0.27.2
sentry-sdk>=2.0.0

View File

@@ -16,8 +16,6 @@ class MarkdownRequest(BaseModel):
q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters")
c: Optional[str] = Field("0", description="Cachebust / revision counter")
provider: Optional[str] = Field(None, description="LLM provider override (e.g., 'anthropic/claude-3-opus')")
temperature: Optional[float] = Field(None, description="LLM temperature override (0.0-2.0)")
base_url: Optional[str] = Field(None, description="LLM API base URL override")
class RawCode(BaseModel):

View File

@@ -74,6 +74,32 @@ setup_logging(config)
__version__ = "0.5.1-d1"
# ───────────────────── telemetry setup ────────────────────────
# Docker/API server telemetry: enabled by default unless CRAWL4AI_TELEMETRY=0
import os as _os
if _os.environ.get('CRAWL4AI_TELEMETRY') != '0':
# Set environment variable to indicate we're in API server mode
_os.environ['CRAWL4AI_API_SERVER'] = 'true'
# Import and enable telemetry for Docker/API environment
from crawl4ai.telemetry import enable as enable_telemetry
from crawl4ai.telemetry import capture_exception
# Enable telemetry automatically in Docker mode
enable_telemetry(always=True)
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("✅ Telemetry enabled for Docker/API server")
else:
# Define no-op for capture_exception if telemetry is disabled
def capture_exception(exc, context=None):
pass
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("❌ Telemetry disabled via CRAWL4AI_TELEMETRY=0")
# ── global page semaphore (hard cap) ─────────────────────────
MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30)
GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
@@ -241,8 +267,7 @@ async def get_markdown(
raise HTTPException(
400, "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)")
markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config, body.provider,
body.temperature, body.base_url
body.url, body.f, body.q, body.c, config, body.provider
)
return JSONResponse({
"url": body.url,
@@ -267,26 +292,12 @@ async def generate_html(
Use when you need sanitized HTML structures for building schemas or further processing.
"""
cfg = CrawlerRunConfig()
try:
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
# Check if the crawl was successful
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
except Exception as e:
# Log and raise as HTTP 500 for other exceptions
raise HTTPException(
status_code=500,
detail=str(e)
)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
# Screenshot endpoint
@@ -304,29 +315,18 @@ async def generate_screenshot(
Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot.
Then in result instead of the screenshot you will get a path to the saved file.
"""
try:
cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
# PDF endpoint
@@ -344,28 +344,17 @@ async def generate_pdf(
Use when you need a printable or archivable snapshot of the page. It is recommended to provide an output path to save the PDF.
Then in result instead of the PDF you will get a path to the saved file.
"""
try:
cfg = CrawlerRunConfig(pdf=True)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
pdf_data = results[0].pdf
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(pdf_data)
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
cfg = CrawlerRunConfig(pdf=True)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
pdf_data = results[0].pdf
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(pdf_data)
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
@app.post("/execute_js")
@@ -421,23 +410,12 @@ async def execute_js(
```
"""
try:
cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
# Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump()
return JSONResponse(data)
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
# Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump()
return JSONResponse(data)
@app.get("/llm/{url:path}")
@@ -482,24 +460,16 @@ async def crawl(
):
"""
Crawl a list of URLs and return the results as JSON.
For streaming responses, use /crawl/stream endpoint.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
# Check whether it is a redirection for a streaming request
crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config)
if crawler_config.stream:
return await stream_process(crawl_request=crawl_request)
results = await handle_crawl_request(
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
# check if all of the results are not successful
if all(not result["success"] for result in results["results"]):
raise HTTPException(500, f"Crawl request failed: {results['results'][0]['error_message']}")
return JSONResponse(results)
return JSONResponse(res)
@app.post("/crawl/stream")
@@ -511,16 +481,12 @@ async def crawl_stream(
):
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
return await stream_process(crawl_request=crawl_request)
async def stream_process(crawl_request: CrawlRequest):
crawler, gen = await handle_stream_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
)
return StreamingResponse(
stream_results(crawler, gen),
media_type="application/x-ndjson",

View File

@@ -371,7 +371,7 @@
<div class="flex items-center">
<input id="st-stream" type="checkbox" class="mr-2">
<label for="st-stream" class="text-sm">Enable streaming mode</label>
<label for="st-stream" class="text-sm">Use /crawl/stream</label>
<button id="st-run"
class="ml-auto bg-accent text-dark px-4 py-2 rounded hover:bg-opacity-90 font-medium">
Run Stress Test
@@ -596,14 +596,6 @@
forceHighlightElement(curlCodeEl);
}
// Detect if stream is requested inside payload
function shouldUseStream(payload) {
const toBool = (v) => v === true || (typeof v === 'string' && v.toLowerCase() === 'true');
const fromCrawler = payload && payload.crawler_config && payload.crawler_config.params && payload.crawler_config.params.stream;
const direct = payload && payload.stream;
return toBool(fromCrawler) || toBool(direct);
}
// Main run function
async function runCrawl() {
const endpoint = document.getElementById('endpoint').value;
@@ -619,24 +611,16 @@
: { browser_config: cfgJson };
}
} catch (err) {
const codeText = cm.getValue();
const streamFlag = /stream\s*=\s*True/i.test(codeText);
const isCrawlEndpoint = document.getElementById('endpoint').value === 'crawl';
if (isCrawlEndpoint && streamFlag) {
// Fallback: proceed with minimal config only for stream
advConfig = { crawler_config: { stream: true } };
} else {
updateStatus('error');
document.querySelector('#response-content code').textContent =
JSON.stringify({ error: err.message }, null, 2);
forceHighlightElement(document.querySelector('#response-content code'));
return; // stop run
}
updateStatus('error');
document.querySelector('#response-content code').textContent =
JSON.stringify({ error: err.message }, null, 2);
forceHighlightElement(document.querySelector('#response-content code'));
return; // stop run
}
const endpointMap = {
crawl: '/crawl',
crawl_stream: '/crawl/stream', // Keep for backward compatibility
// crawl_stream: '/crawl/stream',
md: '/md',
llm: '/llm'
};
@@ -663,7 +647,7 @@
// This will be handled directly in the fetch below
payload = null;
} else {
// Default payload for /crawl (supports both streaming and batch modes)
// Default payload for /crawl and /crawl/stream
payload = {
urls,
...advConfig
@@ -675,7 +659,6 @@
try {
const startTime = performance.now();
let response, responseData;
const useStreamOverride = (endpoint === 'crawl') && shouldUseStream(payload);
if (endpoint === 'llm') {
// Special handling for LLM endpoint which uses URL pattern: /llm/{encoded_url}?q={query}
@@ -698,8 +681,8 @@
document.querySelector('#response-content code').textContent = JSON.stringify(responseData, null, 2);
document.querySelector('#response-content code').className = 'json hljs';
forceHighlightElement(document.querySelector('#response-content code'));
} else if (endpoint === 'crawl_stream' || useStreamOverride) {
// Stream processing - now handled directly by /crawl endpoint
} else if (endpoint === 'crawl_stream') {
// Stream processing
response = await fetch(api, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -774,7 +757,6 @@
const question = document.getElementById('llm-question').value.trim() || "What is this page about?";
generateSnippets(`${api}/${encodedUrl}?q=${encodeURIComponent(question)}`, null, 'GET');
} else {
// Use the same API endpoint for both streaming and non-streaming
generateSnippets(api, payload);
}
} catch (error) {
@@ -804,7 +786,7 @@
document.getElementById('stress-avg-time').textContent = '0';
document.getElementById('stress-peak-mem').textContent = '0';
const api = '/crawl'; // Always use /crawl - backend handles streaming internally
const api = useStream ? '/crawl/stream' : '/crawl';
const urls = Array.from({ length: total }, (_, i) => `https://httpbin.org/anything/stress-${i}-${Date.now()}`);
const chunks = [];

View File

@@ -71,7 +71,7 @@ def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]:
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> str:
"""Get the appropriate API key based on the LLM provider.
Args:
@@ -79,14 +79,19 @@ def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[st
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The API key if directly configured, otherwise None to let litellm handle it
The API key for the provider, or empty string if not found
"""
# Check if direct API key is configured (for backward compatibility)
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Check if direct API key is configured
if "api_key" in config["llm"]:
return config["llm"]["api_key"]
# Return None - litellm will automatically find the right environment variable
return None
# Fall back to the configured api_key_env if no match
return os.environ.get(config["llm"].get("api_key_env", ""), "")
def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple[bool, str]:
@@ -99,78 +104,19 @@ def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple
Returns:
Tuple of (is_valid, error_message)
"""
# If a direct API key is configured, validation passes
if "api_key" in config["llm"]:
return True, ""
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Get the API key for this provider
api_key = get_llm_api_key(config, provider)
if not api_key:
return False, f"No API key found for provider '{provider}'. Please set the appropriate environment variable."
# Otherwise, trust that litellm will find the appropriate environment variable
# We can't easily validate this without reimplementing litellm's logic
return True, ""
def get_llm_temperature(config: Dict, provider: Optional[str] = None) -> Optional[float]:
"""Get temperature setting based on the LLM provider.
Priority order:
1. Provider-specific environment variable (e.g., OPENAI_TEMPERATURE)
2. Global LLM_TEMPERATURE environment variable
3. None (to use litellm/provider defaults)
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The temperature setting if configured, otherwise None
"""
# Check provider-specific temperature first
if provider:
provider_name = provider.split('/')[0].upper()
provider_temp = os.environ.get(f"{provider_name}_TEMPERATURE")
if provider_temp:
try:
return float(provider_temp)
except ValueError:
logging.warning(f"Invalid temperature value for {provider_name}: {provider_temp}")
# Check global LLM_TEMPERATURE
global_temp = os.environ.get("LLM_TEMPERATURE")
if global_temp:
try:
return float(global_temp)
except ValueError:
logging.warning(f"Invalid global temperature value: {global_temp}")
# Return None to use litellm/provider defaults
return None
def get_llm_base_url(config: Dict, provider: Optional[str] = None) -> Optional[str]:
"""Get base URL setting based on the LLM provider.
Priority order:
1. Provider-specific environment variable (e.g., OPENAI_BASE_URL)
2. Global LLM_BASE_URL environment variable
3. None (to use default endpoints)
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The base URL if configured, otherwise None
"""
# Check provider-specific base URL first
if provider:
provider_name = provider.split('/')[0].upper()
provider_url = os.environ.get(f"{provider_name}_BASE_URL")
if provider_url:
return provider_url
# Check global LLM_BASE_URL
return os.environ.get("LLM_BASE_URL")
def verify_email_domain(email: str) -> bool:
try:
domain = email.split('@')[1]

View File

@@ -1,154 +0,0 @@
import asyncio
import os
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig, LLMConfig
async def test_configuration(name: str, config: AdaptiveConfig, url: str, query: str):
"""Test a specific configuration"""
print(f"\n{'='*60}")
print(f"Configuration: {name}")
print(f"{'='*60}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
result = await adaptive.digest(start_url=url, query=query)
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
print(f"\n{'='*50}")
print(f"Pages crawled: {len(result.crawled_urls)}")
print(f"Final confidence: {adaptive.confidence:.1%}")
print(f"Stopped reason: {result.metrics.get('stopped_reason', 'max_pages')}")
if result.metrics.get('is_irrelevant', False):
print("⚠️ Query detected as irrelevant!")
return result
async def llm_embedding():
"""Demonstrate various embedding configurations"""
print("EMBEDDING STRATEGY CONFIGURATION EXAMPLES")
print("=" * 60)
# Base URL and query for testing
test_url = "https://docs.python.org/3/library/asyncio.html"
openai_llm_config = LLMConfig(
provider='openai/text-embedding-3-small',
api_token=os.getenv('OPENAI_API_KEY'),
temperature=0.7,
max_tokens=2000
)
config_openai = AdaptiveConfig(
strategy="embedding",
max_pages=10,
# Use OpenAI embeddings
embedding_llm_config=openai_llm_config,
# embedding_llm_config={
# 'provider': 'openai/text-embedding-3-small',
# 'api_token': os.getenv('OPENAI_API_KEY')
# },
# OpenAI embeddings are high quality, can be stricter
embedding_k_exp=4.0,
n_query_variations=12
)
await test_configuration(
"OpenAI Embeddings",
config_openai,
test_url,
# "event-driven architecture patterns"
"async await context managers coroutines"
)
return
async def basic_adaptive_crawling():
"""Basic adaptive crawling example"""
# Initialize the crawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Create an adaptive crawler with default settings (statistical strategy)
adaptive = AdaptiveCrawler(crawler)
# Note: You can also use embedding strategy for semantic understanding:
# from crawl4ai import AdaptiveConfig
# config = AdaptiveConfig(strategy="embedding")
# adaptive = AdaptiveCrawler(crawler, config)
# Start adaptive crawling
print("Starting adaptive crawl for Python async programming information...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/library/asyncio.html",
query="async await context managers coroutines"
)
# Display crawl statistics
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
# Show final confidence
print(f"\n{'='*50}")
print(f"Final Confidence: {adaptive.confidence:.2%}")
print(f"Total Pages Crawled: {len(result.crawled_urls)}")
print(f"Knowledge Base Size: {len(adaptive.state.knowledge_base)} documents")
if adaptive.confidence >= 0.8:
print("✓ High confidence - can answer detailed questions about async Python")
elif adaptive.confidence >= 0.6:
print("~ Moderate confidence - can answer basic questions")
else:
print("✗ Low confidence - need more information")
if __name__ == "__main__":
asyncio.run(llm_embedding())
# asyncio.run(basic_adaptive_crawling())

View File

@@ -1,221 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
#directories
models
schemas
saved_requests

View File

@@ -1,252 +0,0 @@
# Web Scraper API with Custom Model Support
A powerful web scraping API that converts any website into structured data using AI. Features a beautiful minimalist frontend interface and support for custom LLM models!
## Features
- **AI-Powered Scraping**: Provide a URL and plain English query to extract structured data
- **Beautiful Frontend**: Modern minimalist black-and-white interface with smooth UX
- **Custom Model Support**: Use any LLM provider (OpenAI, Gemini, Anthropic, etc.) with your own API keys
- **Model Management**: Save, list, and manage multiple model configurations via web interface
- **Dual Scraping Approaches**: Choose between Schema-based (faster) or LLM-based (more flexible) extraction
- **API Request History**: Automatic saving and display of all API requests with cURL commands
- **Schema Caching**: Intelligent caching of generated schemas for faster subsequent requests
- **Duplicate Prevention**: Avoids saving duplicate requests (same URL + query)
- **RESTful API**: Easy-to-use HTTP endpoints for all operations
## Quick Start
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Start the API Server
```bash
python app.py
```
The server will start on `http://localhost:8000` with a beautiful web interface!
### 3. Using the Web Interface
Once the server is running, open your browser and go to `http://localhost:8000` to access the modern web interface!
#### Pages:
- **Scrape Data**: Enter URLs and queries to extract structured data
- **Models**: Manage your AI model configurations (add, list, delete)
- **API Requests**: View history of all scraping requests with cURL commands
#### Features:
- **Minimalist Design**: Clean black-and-white theme inspired by modern web apps
- **Real-time Results**: See extracted data in formatted JSON
- **Copy to Clipboard**: Easy copying of results
- **Toast Notifications**: User-friendly feedback
- **Dual Scraping Modes**: Choose between Schema-based and LLM-based approaches
## Model Management
### Adding Models via Web Interface
1. Go to the **Models** page
2. Enter your model details:
- **Provider**: LLM provider (e.g., `gemini/gemini-2.5-flash`, `openai/gpt-4o`)
- **API Token**: Your API key for the provider
3. Click "Add Model"
### API Usage for Model Management
#### Save a Model Configuration
```bash
curl -X POST "http://localhost:8000/models" \
-H "Content-Type: application/json" \
-d '{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}'
```
#### List Saved Models
```bash
curl -X GET "http://localhost:8000/models"
```
#### Delete a Model Configuration
```bash
curl -X DELETE "http://localhost:8000/models/my-gemini"
```
## Scraping Approaches
### 1. Schema-based Scraping (Faster)
- Generates CSS selectors for targeted extraction
- Caches schemas for repeated requests
- Faster execution for structured websites
### 2. LLM-based Scraping (More Flexible)
- Direct LLM extraction without schema generation
- More flexible for complex or dynamic content
- Better for unstructured data extraction
## Supported LLM Providers
The API supports any LLM provider that crawl4ai supports, including:
- **Google Gemini**: `gemini/gemini-2.5-flash`, `gemini/gemini-pro`
- **OpenAI**: `openai/gpt-4`, `openai/gpt-3.5-turbo`
- **Anthropic**: `anthropic/claude-3-opus`, `anthropic/claude-3-sonnet`
- **And more...**
## API Endpoints
### Core Endpoints
- `POST /scrape` - Schema-based scraping
- `POST /scrape-with-llm` - LLM-based scraping
- `GET /schemas` - List cached schemas
- `POST /clear-cache` - Clear schema cache
- `GET /health` - Health check
### Model Management Endpoints
- `GET /models` - List saved model configurations
- `POST /models` - Save a new model configuration
- `DELETE /models/{model_name}` - Delete a model configuration
### API Request History
- `GET /saved-requests` - List all saved API requests
- `DELETE /saved-requests/{request_id}` - Delete a saved request
## Request/Response Examples
### Scrape Request
```json
{
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"model_name": "my-custom-model"
}
```
### Scrape Response
```json
{
"success": true,
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"extracted_data": {
"product_name": "Example Product",
"price": "$99.99",
"description": "This is an example product description"
},
"schema_used": { ... },
"timestamp": "2024-01-01T12:00:00Z"
}
```
### Model Configuration Request
```json
{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}
```
## Testing
Run the test script to verify the model management functionality:
```bash
python test_models.py
```
## File Structure
```
parse_example/
├── api_server.py # FastAPI server with all endpoints
├── web_scraper_lib.py # Core scraping library
├── test_models.py # Test script for model management
├── requirements.txt # Dependencies
├── static/ # Frontend files
│ ├── index.html # Main HTML interface
│ ├── styles.css # CSS styles (minimalist theme)
│ └── script.js # JavaScript functionality
├── schemas/ # Cached schemas
├── models/ # Saved model configurations
├── saved_requests/ # API request history
└── README.md # This file
```
## Advanced Usage
### Using the Library Directly
```python
from web_scraper_lib import WebScraperAgent
# Initialize agent
agent = WebScraperAgent()
# Save a model configuration
agent.save_model_config(
model_name="my-model",
provider="openai/gpt-4",
api_token="your-api-key"
)
# Schema-based scraping
result = await agent.scrape_data(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
# LLM-based scraping
result = await agent.scrape_data_with_llm(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
```
### Schema Caching
The system automatically caches generated schemas based on URL and query combinations:
- **First request**: Generates schema using AI
- **Subsequent requests**: Uses cached schema for faster extraction
### API Request History
All API requests are automatically saved with:
- Request details (URL, query, model used)
- Response data
- Timestamp
- cURL command for re-execution
### Duplicate Prevention
The system prevents saving duplicate requests:
- Same URL + query combinations are not saved multiple times
- Returns existing request ID for duplicates
- Keeps the API request history clean
## Error Handling
The API provides detailed error messages for common issues:
- Invalid URLs
- Missing model configurations
- API key errors
- Network timeouts
- Parsing errors

View File

@@ -1,363 +0,0 @@
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import Dict, Any, Optional, Union, List
import uvicorn
import asyncio
import os
import json
from datetime import datetime
from web_scraper_lib import WebScraperAgent, scrape_website
app = FastAPI(
title="Web Scraper API",
description="Convert any website into a structured data API. Provide a URL and tell AI what data you need in plain English.",
version="1.0.0"
)
# Mount static files
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
# Mount assets directory
if os.path.exists("assets"):
app.mount("/assets", StaticFiles(directory="assets"), name="assets")
# Initialize the scraper agent
scraper_agent = WebScraperAgent()
# Create directory for saved API requests
os.makedirs("saved_requests", exist_ok=True)
class ScrapeRequest(BaseModel):
url: HttpUrl
query: str
model_name: Optional[str] = None
class ModelConfigRequest(BaseModel):
model_name: str
provider: str
api_token: str
class ScrapeResponse(BaseModel):
success: bool
url: str
query: str
extracted_data: Union[Dict[str, Any], list]
schema_used: Optional[Dict[str, Any]] = None
timestamp: Optional[str] = None
error: Optional[str] = None
class SavedApiRequest(BaseModel):
id: str
endpoint: str
method: str
headers: Dict[str, str]
body: Dict[str, Any]
timestamp: str
response: Optional[Dict[str, Any]] = None
def save_api_request(endpoint: str, method: str, headers: Dict[str, str], body: Dict[str, Any], response: Optional[Dict[str, Any]] = None) -> str:
"""Save an API request to a JSON file."""
# Check for duplicate requests (same URL and query)
if endpoint in ["/scrape", "/scrape-with-llm"] and "url" in body and "query" in body:
existing_requests = get_saved_requests()
for existing_request in existing_requests:
if (existing_request.endpoint == endpoint and
existing_request.body.get("url") == body["url"] and
existing_request.body.get("query") == body["query"]):
print(f"Duplicate request found for URL: {body['url']} and query: {body['query']}")
return existing_request.id # Return existing request ID instead of creating new one
request_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
saved_request = SavedApiRequest(
id=request_id,
endpoint=endpoint,
method=method,
headers=headers,
body=body,
timestamp=datetime.now().isoformat(),
response=response
)
file_path = os.path.join("saved_requests", f"{request_id}.json")
with open(file_path, "w") as f:
json.dump(saved_request.dict(), f, indent=2)
return request_id
def get_saved_requests() -> List[SavedApiRequest]:
"""Get all saved API requests."""
requests = []
if os.path.exists("saved_requests"):
for filename in os.listdir("saved_requests"):
if filename.endswith('.json'):
file_path = os.path.join("saved_requests", filename)
try:
with open(file_path, "r") as f:
data = json.load(f)
requests.append(SavedApiRequest(**data))
except Exception as e:
print(f"Error loading saved request {filename}: {e}")
# Sort by timestamp (newest first)
requests.sort(key=lambda x: x.timestamp, reverse=True)
return requests
@app.get("/")
async def root():
"""Serve the frontend interface."""
if os.path.exists("static/index.html"):
return FileResponse("static/index.html")
else:
return {
"message": "Web Scraper API",
"description": "Convert any website into structured data with AI",
"endpoints": {
"/scrape": "POST - Scrape data from a website",
"/schemas": "GET - List cached schemas",
"/clear-cache": "POST - Clear schema cache",
"/models": "GET - List saved model configurations",
"/models": "POST - Save a new model configuration",
"/models/{model_name}": "DELETE - Delete a model configuration",
"/saved-requests": "GET - List saved API requests"
}
}
@app.post("/scrape", response_model=ScrapeResponse)
async def scrape_website_endpoint(request: ScrapeRequest):
"""
Scrape structured data from any website.
This endpoint:
1. Takes a URL and plain English query
2. Generates a custom scraper using AI
3. Returns structured data
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
schema_used=result["schema_used"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.post("/scrape-with-llm", response_model=ScrapeResponse)
async def scrape_website_endpoint_with_llm(request: ScrapeRequest):
"""
Scrape structured data from any website using a custom LLM model.
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data_with_llm(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.get("/saved-requests")
async def list_saved_requests():
"""List all saved API requests."""
try:
requests = get_saved_requests()
return {
"success": True,
"requests": [req.dict() for req in requests],
"count": len(requests)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list saved requests: {str(e)}")
@app.delete("/saved-requests/{request_id}")
async def delete_saved_request(request_id: str):
"""Delete a saved API request."""
try:
file_path = os.path.join("saved_requests", f"{request_id}.json")
if os.path.exists(file_path):
os.remove(file_path)
return {
"success": True,
"message": f"Saved request '{request_id}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Saved request '{request_id}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete saved request: {str(e)}")
@app.get("/schemas")
async def list_cached_schemas():
"""List all cached schemas."""
try:
schemas = await scraper_agent.get_cached_schemas()
return {
"success": True,
"cached_schemas": schemas,
"count": len(schemas)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list schemas: {str(e)}")
@app.post("/clear-cache")
async def clear_schema_cache():
"""Clear all cached schemas."""
try:
scraper_agent.clear_cache()
return {
"success": True,
"message": "Schema cache cleared successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@app.get("/models")
async def list_models():
"""List all saved model configurations."""
try:
models = scraper_agent.list_saved_models()
return {
"success": True,
"models": models,
"count": len(models)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list models: {str(e)}")
@app.post("/models")
async def save_model_config(request: ModelConfigRequest):
"""Save a new model configuration."""
try:
success = scraper_agent.save_model_config(
model_name=request.model_name,
provider=request.provider,
api_token=request.api_token
)
if success:
return {
"success": True,
"message": f"Model configuration '{request.model_name}' saved successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to save model configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save model: {str(e)}")
@app.delete("/models/{model_name}")
async def delete_model_config(model_name: str):
"""Delete a model configuration."""
try:
success = scraper_agent.delete_model_config(model_name)
if success:
return {
"success": True,
"message": f"Model configuration '{model_name}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Model configuration '{model_name}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete model: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "web-scraper-api"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env python3
"""
Startup script for the Web Scraper API with frontend interface.
"""
import os
import sys
import uvicorn
from pathlib import Path
def main():
# Check if static directory exists
static_dir = Path("static")
if not static_dir.exists():
print("❌ Static directory not found!")
print("Please make sure the 'static' directory exists with the frontend files.")
sys.exit(1)
# Check if required frontend files exist
required_files = ["index.html", "styles.css", "script.js"]
missing_files = []
for file in required_files:
if not (static_dir / file).exists():
missing_files.append(file)
if missing_files:
print(f"❌ Missing frontend files: {', '.join(missing_files)}")
print("Please make sure all frontend files are present in the static directory.")
sys.exit(1)
print("🚀 Starting Web Scraper API with Frontend Interface")
print("=" * 50)
print("📁 Static files found and ready to serve")
print("🌐 Frontend will be available at: http://localhost:8000")
print("🔌 API endpoints available at: http://localhost:8000/docs")
print("=" * 50)
# Start the server
uvicorn.run(
"api_server:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
if __name__ == "__main__":
main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.8 KiB

View File

@@ -1,5 +0,0 @@
crawl4ai
fastapi
uvicorn
pydantic
litellm

View File

@@ -1,201 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Web2API Example</title>
<link rel="stylesheet" href="/static/styles.css">
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
</head>
<body>
<!-- Header -->
<header class="header">
<div class="header-content">
<div class="logo">
<img src="/assets/crawl4ai_logo.jpg" alt="Crawl4AI Logo" class="logo-image">
<span>Web2API Example</span>
</div>
<nav class="nav-links">
<a href="#" class="nav-link active" data-page="scrape">Scrape</a>
<a href="#" class="nav-link" data-page="models">Models</a>
<a href="#" class="nav-link" data-page="requests">API Requests</a>
</nav>
</div>
</header>
<!-- Main Content -->
<main class="main-content">
<!-- Scrape Page -->
<div id="scrape-page" class="page active">
<div class="hero-section">
<h1 class="hero-title">Turn Any Website Into An API</h1>
<p class="hero-subtitle">This example shows how to turn any website into an API using Crawl4AI.</p>
</div>
<!-- Workflow Demonstration -->
<div class="workflow-demo">
<div class="workflow-step">
<h3 class="step-title">1. Your Request</h3>
<div class="request-box">
<div class="input-group">
<label>URL:</label>
<input type="url" id="url" name="url" placeholder="https://example-bookstore.com/new-releases" required>
</div>
<div class="input-group">
<label>QUERY:</label>
<textarea id="query" name="query" placeholder="Extract all the book titles, their authors, and the biography of the author" required></textarea>
</div>
<div class="form-options">
<div class="option-group">
<label for="scraping-approach">Approach:</label>
<select id="scraping-approach" name="scraping_approach">
<option value="llm">LLM-based (More Flexible)</option>
<option value="schema">Schema-based (Uses LLM once!)</option>
</select>
</div>
<div class="option-group">
<label for="model-select">Model:</label>
<select id="model-select" name="model_name" required>
<option value="">Select a Model</option>
</select>
</div>
</div>
<button type="submit" id="extract-btn" class="extract-btn">
<i class="fas fa-magic"></i>
Extract Data
</button>
</div>
</div>
<div class="workflow-arrow"></div>
<div class="workflow-step">
<h3 class="step-title">2. Your Instant API & Data</h3>
<div class="response-container">
<div class="api-request-box">
<label>API Request (cURL):</label>
<pre id="curl-example">curl -X POST http://localhost:8000/scrape -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'
# Or for LLM-based approach:
curl -X POST http://localhost:8000/scrape-with-llm -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'</pre>
</div>
<div class="json-response-box">
<label>JSON Response:</label>
<pre id="json-output">{
"success": true,
"extracted_data": [
{
"title": "Example Book",
"author": "John Doe",
"description": "A great book..."
}
]
}</pre>
</div>
</div>
</div>
</div>
<!-- Results Section -->
<div id="results-section" class="results-section" style="display: none;">
<div class="results-header">
<h2>Extracted Data</h2>
<button id="copy-json" class="copy-btn">
<i class="fas fa-copy"></i>
Copy JSON
</button>
</div>
<div class="results-content">
<div class="result-info">
<div class="info-item">
<span class="label">URL:</span>
<span id="result-url" class="value"></span>
</div>
<div class="info-item">
<span class="label">Query:</span>
<span id="result-query" class="value"></span>
</div>
<div class="info-item">
<span class="label">Model Used:</span>
<span id="result-model" class="value"></span>
</div>
</div>
<div class="json-display">
<pre id="actual-json-output"></pre>
</div>
</div>
</div>
<!-- Loading State -->
<div id="loading" class="loading" style="display: none;">
<div class="spinner"></div>
<p>AI is analyzing the website and extracting data...</p>
</div>
</div>
<!-- Models Page -->
<div id="models-page" class="page">
<div class="models-header">
<h1>Model Configuration</h1>
<p>Configure and manage your AI model configurations</p>
</div>
<div class="models-container">
<!-- Add New Model Form -->
<div class="model-form-section">
<h3>Add New Model</h3>
<form id="model-form" class="model-form">
<div class="form-row">
<div class="input-group">
<label for="model-name">Model Name:</label>
<input type="text" id="model-name" name="model_name" placeholder="my-gemini" required>
</div>
<div class="input-group">
<label for="provider">Provider:</label>
<input type="text" id="provider" name="provider" placeholder="gemini/gemini-2.5-flash" required>
</div>
</div>
<div class="input-group">
<label for="api-token">API Token:</label>
<input type="password" id="api-token" name="api_token" placeholder="Enter your API token" required>
</div>
<button type="submit" class="save-btn">
<i class="fas fa-save"></i>
Save Model
</button>
</form>
</div>
<!-- Saved Models List -->
<div class="saved-models-section">
<h3>Saved Models</h3>
<div id="models-list" class="models-list">
<!-- Models will be loaded here -->
</div>
</div>
</div>
</div>
<!-- API Requests Page -->
<div id="requests-page" class="page">
<div class="requests-header">
<h1>Saved API Requests</h1>
<p>View and manage your previous API requests</p>
</div>
<div class="requests-container">
<div class="requests-list" id="requests-list">
<!-- Saved requests will be loaded here -->
</div>
</div>
</div>
</main>
<!-- Toast Notifications -->
<div id="toast-container" class="toast-container"></div>
<script src="/static/script.js"></script>
</body>
</html>

View File

@@ -1,401 +0,0 @@
// API Configuration
const API_BASE_URL = 'http://localhost:8000';
// DOM Elements
const navLinks = document.querySelectorAll('.nav-link');
const pages = document.querySelectorAll('.page');
const scrapeForm = document.getElementById('scrape-form');
const modelForm = document.getElementById('model-form');
const modelSelect = document.getElementById('model-select');
const modelsList = document.getElementById('models-list');
const resultsSection = document.getElementById('results-section');
const loadingSection = document.getElementById('loading');
const copyJsonBtn = document.getElementById('copy-json');
// Navigation
navLinks.forEach(link => {
link.addEventListener('click', (e) => {
e.preventDefault();
const targetPage = link.dataset.page;
// Update active nav link
navLinks.forEach(l => l.classList.remove('active'));
link.classList.add('active');
// Show target page
pages.forEach(page => page.classList.remove('active'));
document.getElementById(`${targetPage}-page`).classList.add('active');
// Load data for the page
if (targetPage === 'models') {
loadModels();
} else if (targetPage === 'requests') {
loadSavedRequests();
}
});
});
// Scrape Form Handler
document.getElementById('extract-btn').addEventListener('click', async (e) => {
e.preventDefault();
// Scroll to results section immediately when button is clicked
document.getElementById('results-section').scrollIntoView({
behavior: 'smooth',
block: 'start'
});
const url = document.getElementById('url').value;
const query = document.getElementById('query').value;
const headless = true; // Always use headless mode
const model_name = document.getElementById('model-select').value || null;
const scraping_approach = document.getElementById('scraping-approach').value;
if (!url || !query) {
showToast('Please fill in both URL and query fields', 'error');
return;
}
if (!model_name) {
showToast('Please select a model from the dropdown or add one from the Models page', 'error');
return;
}
const data = {
url: url,
query: query,
headless: headless,
model_name: model_name
};
// Show loading state
showLoading(true);
hideResults();
try {
// Choose endpoint based on scraping approach
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const response = await fetch(`${API_BASE_URL}${endpoint}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
displayResults(result);
showToast(`Data extracted successfully using ${scraping_approach === 'llm' ? 'LLM-based' : 'Schema-based'} approach!`, 'success');
} else {
throw new Error(result.detail || 'Failed to extract data');
}
} catch (error) {
console.error('Scraping error:', error);
showToast(`Error: ${error.message}`, 'error');
} finally {
showLoading(false);
}
});
// Model Form Handler
modelForm.addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData(modelForm);
const data = {
model_name: formData.get('model_name'),
provider: formData.get('provider'),
api_token: formData.get('api_token')
};
try {
const response = await fetch(`${API_BASE_URL}/models`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
showToast('Model saved successfully!', 'success');
modelForm.reset();
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to save model');
}
} catch (error) {
console.error('Model save error:', error);
showToast(`Error: ${error.message}`, 'error');
}
});
// Copy JSON Button
copyJsonBtn.addEventListener('click', () => {
const actualJsonOutput = document.getElementById('actual-json-output');
const textToCopy = actualJsonOutput.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
showToast('JSON copied to clipboard!', 'success');
}).catch(() => {
showToast('Failed to copy JSON', 'error');
});
});
// Load Models
async function loadModels() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
displayModels(result.models);
} else {
throw new Error(result.detail || 'Failed to load models');
}
} catch (error) {
console.error('Load models error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Models
function displayModels(models) {
if (models.length === 0) {
modelsList.innerHTML = '<p style="text-align: center; color: #7f8c8d; padding: 2rem;">No models saved yet. Add your first model above!</p>';
return;
}
modelsList.innerHTML = models.map(model => `
<div class="model-card">
<div class="model-info">
<div class="model-name">${model}</div>
<div class="model-provider">Model Configuration</div>
</div>
<div class="model-actions">
<button class="btn btn-danger" onclick="deleteModel('${model}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
`).join('');
}
// Delete Model
async function deleteModel(modelName) {
if (!confirm(`Are you sure you want to delete the model "${modelName}"?`)) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/models/${modelName}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Model deleted successfully!', 'success');
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to delete model');
}
} catch (error) {
console.error('Delete model error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Load Model Select Options
async function loadModelSelect() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
// Clear existing options
modelSelect.innerHTML = '<option value="">Select a Model</option>';
// Add model options
result.models.forEach(model => {
const option = document.createElement('option');
option.value = model;
option.textContent = model;
modelSelect.appendChild(option);
});
}
} catch (error) {
console.error('Load model select error:', error);
}
}
// Display Results
function displayResults(result) {
// Update result info
document.getElementById('result-url').textContent = result.url;
document.getElementById('result-query').textContent = result.query;
document.getElementById('result-model').textContent = result.model_name || 'Default Model';
// Display JSON in the actual results section
const actualJsonOutput = document.getElementById('actual-json-output');
actualJsonOutput.textContent = JSON.stringify(result.extracted_data, null, 2);
// Don't update the sample JSON in the workflow demo - keep it as example
// Update the cURL example based on the approach used
const scraping_approach = document.getElementById('scraping-approach').value;
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const curlExample = document.getElementById('curl-example');
curlExample.textContent = `curl -X POST http://localhost:8000${endpoint} -H "Content-Type: application/json" -d '{"url": "${result.url}", "query": "${result.query}"}'`;
// Show results section
resultsSection.style.display = 'block';
resultsSection.scrollIntoView({ behavior: 'smooth' });
}
// Show/Hide Loading
function showLoading(show) {
loadingSection.style.display = show ? 'block' : 'none';
}
// Hide Results
function hideResults() {
resultsSection.style.display = 'none';
}
// Toast Notifications
function showToast(message, type = 'info') {
const toastContainer = document.getElementById('toast-container');
const toast = document.createElement('div');
toast.className = `toast ${type}`;
const icon = type === 'success' ? 'fas fa-check-circle' :
type === 'error' ? 'fas fa-exclamation-circle' :
'fas fa-info-circle';
toast.innerHTML = `
<i class="${icon}"></i>
<span>${message}</span>
`;
toastContainer.appendChild(toast);
// Auto remove after 5 seconds
setTimeout(() => {
toast.remove();
}, 5000);
}
// Load Saved Requests
async function loadSavedRequests() {
try {
const response = await fetch(`${API_BASE_URL}/saved-requests`);
const result = await response.json();
if (response.ok) {
displaySavedRequests(result.requests);
} else {
throw new Error(result.detail || 'Failed to load saved requests');
}
} catch (error) {
console.error('Load saved requests error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Saved Requests
function displaySavedRequests(requests) {
const requestsList = document.getElementById('requests-list');
if (requests.length === 0) {
requestsList.innerHTML = '<p style="text-align: center; color: #CCCCCC; padding: 2rem;">No saved API requests yet. Make your first request from the Scrape page!</p>';
return;
}
requestsList.innerHTML = requests.map(request => {
const url = request.body.url;
const query = request.body.query;
const model = request.body.model_name || 'Default Model';
const endpoint = request.endpoint;
// Create curl command
const curlCommand = `curl -X POST http://localhost:8000${endpoint} \\
-H "Content-Type: application/json" \\
-d '{
"url": "${url}",
"query": "${query}",
"model_name": "${model}"
}'`;
return `
<div class="request-card">
<div class="request-header">
<div class="request-info">
<div class="request-url">${url}</div>
<div class="request-query">${query}</div>
</div>
<div class="request-actions">
<button class="btn-danger" onclick="deleteSavedRequest('${request.id}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
<div class="request-curl">
<h4>cURL Command:</h4>
<pre>${curlCommand}</pre>
</div>
</div>
`;
}).join('');
}
// Delete Saved Request
async function deleteSavedRequest(requestId) {
if (!confirm('Are you sure you want to delete this saved request?')) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/saved-requests/${requestId}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Saved request deleted successfully!', 'success');
loadSavedRequests();
} else {
throw new Error(result.detail || 'Failed to delete saved request');
}
} catch (error) {
console.error('Delete saved request error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Initialize
document.addEventListener('DOMContentLoaded', () => {
loadModelSelect();
// Check if API is available
fetch(`${API_BASE_URL}/health`)
.then(response => {
if (!response.ok) {
showToast('Warning: API server might not be running', 'error');
}
})
.catch(() => {
showToast('Warning: Cannot connect to API server. Make sure it\'s running on localhost:8000', 'error');
});
});

View File

@@ -1,765 +0,0 @@
/* Reset and Base Styles */
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #000000;
color: #FFFFFF;
line-height: 1.6;
font-size: 16px;
}
/* Header */
.header {
border-bottom: 1px solid #333;
padding: 1rem 0;
background: #000000;
position: sticky;
top: 0;
z-index: 100;
}
.header-content {
max-width: 1200px;
margin: 0 auto;
padding: 0 2rem;
display: flex;
justify-content: space-between;
align-items: center;
}
.logo {
display: flex;
align-items: center;
gap: 0.5rem;
font-size: 1.5rem;
font-weight: 600;
color: #FFFFFF;
}
.logo-image {
width: 40px;
height: 40px;
border-radius: 4px;
object-fit: contain;
}
.nav-links {
display: flex;
gap: 2rem;
}
.nav-link {
color: #CCCCCC;
text-decoration: none;
font-weight: 500;
transition: color 0.2s ease;
}
.nav-link:hover,
.nav-link.active {
color: #FFFFFF;
}
/* Main Content */
.main-content {
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
}
.page {
display: none;
}
.page.active {
display: block;
}
/* Hero Section */
.hero-section {
text-align: center;
margin-bottom: 4rem;
padding: 2rem 0;
}
.hero-title {
font-size: 3rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
line-height: 1.2;
}
.hero-subtitle {
font-size: 1.25rem;
color: #CCCCCC;
max-width: 600px;
margin: 0 auto;
}
/* Workflow Demo */
.workflow-demo {
display: grid;
grid-template-columns: 1fr auto 1fr;
gap: 2rem;
align-items: start;
margin-bottom: 4rem;
}
.workflow-step {
display: flex;
flex-direction: column;
gap: 1rem;
}
.step-title {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
text-align: center;
margin-bottom: 1rem;
}
.workflow-arrow {
font-size: 2rem;
font-weight: 700;
color: #09b5a5;
display: flex;
align-items: center;
justify-content: center;
margin-top: 20rem;
}
/* Request Box */
.request-box {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
background: #111111;
}
.input-group {
margin-bottom: 1.5rem;
}
.input-group label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.input-group input,
.input-group textarea,
.input-group select {
width: 100%;
padding: 0.75rem;
border: 1px solid #333;
border-radius: 4px;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
background: #1A1A1A;
color: #FFFFFF;
transition: border-color 0.2s ease;
}
.input-group input:focus,
.input-group textarea:focus,
.input-group select:focus {
outline: none;
border-color: #09b5a5;
}
.input-group textarea {
min-height: 80px;
resize: vertical;
}
.form-options {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
margin-bottom: 1.5rem;
}
.option-group {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.option-group label {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.option-group input[type="checkbox"] {
width: auto;
margin-right: 0.5rem;
}
.extract-btn {
width: 100%;
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.extract-btn:hover {
background: #09b5a5;
}
/* Dropdown specific styling */
select,
.input-group select,
.option-group select {
cursor: pointer !important;
appearance: none !important;
-webkit-appearance: none !important;
-moz-appearance: none !important;
-ms-appearance: none !important;
background-image: url("data:image/svg+xml;charset=UTF-8,%3csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 24 24' fill='none' stroke='%23FFFFFF' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3e%3cpolyline points='6,9 12,15 18,9'%3e%3c/polyline%3e%3c/svg%3e") !important;
background-repeat: no-repeat !important;
background-position: right 0.75rem center !important;
background-size: 1rem !important;
padding-right: 2.5rem !important;
border: 1px solid #333 !important;
border-radius: 4px !important;
font-family: 'Courier New', monospace !important;
font-size: 0.9rem !important;
background-color: #1A1A1A !important;
color: #FFFFFF !important;
}
select:hover,
.input-group select:hover,
.option-group select:hover {
border-color: #09b5a5 !important;
}
select:focus,
.input-group select:focus,
.option-group select:focus {
outline: none !important;
border-color: #09b5a5 !important;
}
select option,
.input-group select option,
.option-group select option {
background: #1A1A1A !important;
color: #FFFFFF !important;
padding: 0.5rem !important;
}
/* Response Container */
.response-container {
display: flex;
flex-direction: column;
gap: 1rem;
}
.api-request-box,
.json-response-box {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
}
.api-request-box label,
.json-response-box label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.api-request-box pre,
.json-response-box pre {
font-family: 'Courier New', monospace;
font-size: 0.85rem;
line-height: 1.5;
color: #FFFFFF;
background: #1A1A1A;
padding: 1rem;
border-radius: 4px;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
}
/* Results Section */
.results-section {
border: 2px solid #333;
border-radius: 8px;
overflow: hidden;
margin-top: 2rem;
background: #111111;
}
.results-header {
background: #1A1A1A;
color: #FFFFFF;
padding: 1rem 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid #333;
}
.results-header h2 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
}
.copy-btn {
background: #09b5a5;
color: #000000;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
display: flex;
align-items: center;
gap: 0.5rem;
transition: background-color 0.2s ease;
}
.copy-btn:hover {
background: #09b5a5;
}
.results-content {
padding: 1.5rem;
}
.result-info {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 1rem;
margin-bottom: 1.5rem;
padding: 1rem;
background: #1A1A1A;
border-radius: 4px;
border: 1px solid #333;
}
.info-item {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.info-item .label {
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.info-item .value {
color: #CCCCCC;
word-break: break-all;
}
.json-display {
background: #1A1A1A;
border-radius: 4px;
overflow: hidden;
border: 1px solid #333;
}
.json-display pre {
color: #FFFFFF;
padding: 1.5rem;
margin: 0;
overflow-x: auto;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
line-height: 1.5;
}
/* Loading State */
.loading {
text-align: center;
padding: 3rem;
}
.spinner {
width: 40px;
height: 40px;
border: 3px solid #333;
border-top: 3px solid #09b5a5;
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 0 auto 1rem;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
/* Models Page */
.models-header {
text-align: center;
margin-bottom: 3rem;
}
.models-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.models-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
/* API Requests Page */
.requests-header {
text-align: center;
margin-bottom: 3rem;
}
.requests-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.requests-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
.requests-container {
max-width: 1200px;
margin: 0 auto;
}
.requests-list {
display: grid;
gap: 1.5rem;
}
.request-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
transition: border-color 0.2s ease;
}
.request-card:hover {
border-color: #09b5a5;
}
.request-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding-bottom: 1rem;
border-bottom: 1px solid #333;
}
.request-info {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.request-url {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #09b5a5;
font-size: 1.1rem;
word-break: break-all;
}
.request-query {
color: #CCCCCC;
font-size: 0.9rem;
margin-top: 0.5rem;
word-break: break-all;
}
.request-actions {
display: flex;
gap: 0.5rem;
}
.request-curl {
background: #1A1A1A;
border: 1px solid #333;
border-radius: 4px;
padding: 1rem;
margin-top: 1rem;
}
.request-curl h4 {
color: #FFFFFF;
font-size: 0.9rem;
font-weight: 600;
margin-bottom: 0.5rem;
font-family: 'Courier New', monospace;
}
.request-curl pre {
color: #CCCCCC;
font-size: 0.8rem;
line-height: 1.4;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
background: #111111;
padding: 0.75rem;
border-radius: 4px;
border: 1px solid #333;
}
.models-container {
max-width: 800px;
margin: 0 auto;
}
.model-form-section {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
margin-bottom: 2rem;
background: #111111;
}
.model-form-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.model-form {
display: flex;
flex-direction: column;
gap: 1.5rem;
}
.form-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
}
.save-btn {
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.save-btn:hover {
background: #09b5a5;
}
.saved-models-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.models-list {
display: grid;
gap: 1rem;
}
.model-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
transition: border-color 0.2s ease;
background: #111111;
}
.model-card:hover {
border-color: #09b5a5;
}
.model-info {
flex: 1;
}
.model-name {
font-weight: 600;
color: #FFFFFF;
font-size: 1.1rem;
margin-bottom: 0.5rem;
}
.model-provider {
color: #CCCCCC;
font-size: 0.9rem;
}
.model-actions {
display: flex;
gap: 0.5rem;
}
.btn-danger {
background: #FF4444;
color: #FFFFFF;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
gap: 0.5rem;
}
.btn-danger:hover {
background: #CC3333;
}
/* Toast Notifications */
.toast-container {
position: fixed;
top: 20px;
right: 20px;
z-index: 1000;
}
.toast {
background: #111111;
border: 2px solid #333;
border-radius: 4px;
padding: 1rem 1.5rem;
margin-bottom: 0.5rem;
display: flex;
align-items: center;
gap: 0.5rem;
animation: slideIn 0.3s ease;
max-width: 400px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3);
color: #FFFFFF;
}
.toast.success {
border-color: #09b5a5;
background: #0A1A1A;
}
.toast.error {
border-color: #FF4444;
background: #1A0A0A;
}
.toast.info {
border-color: #09b5a5;
background: #0A1A1A;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
/* Responsive Design */
@media (max-width: 768px) {
.header-content {
padding: 0 1rem;
}
.main-content {
padding: 1rem;
}
.hero-title {
font-size: 2rem;
}
.workflow-demo {
grid-template-columns: 1fr;
gap: 1rem;
}
.workflow-arrow {
transform: rotate(90deg);
margin: 1rem 0;
}
.form-options {
grid-template-columns: 1fr;
}
.form-row {
grid-template-columns: 1fr;
}
.result-info {
grid-template-columns: 1fr;
}
.model-card {
flex-direction: column;
gap: 1rem;
text-align: center;
}
.model-actions {
width: 100%;
justify-content: center;
}
}

View File

@@ -1,28 +0,0 @@
import asyncio
from web_scraper_lib import scrape_website
import os
async def test_library():
"""Test the mini library directly."""
print("=== Testing Mini Library ===")
# Test 1: Scrape with a custom model
url = "https://marketplace.mainstreet.co.in/collections/adidas-yeezy/products/adidas-yeezy-boost-350-v2-yecheil-non-reflective"
query = "Extract the following data: Product name, Product price, Product description, Product size. DO NOT EXTRACT ANYTHING ELSE."
if os.path.exists("models"):
model_name = os.listdir("models")[0].split(".")[0]
else:
raise Exception("No models found in models directory")
print(f"Scraping: {url}")
print(f"Query: {query}")
try:
result = await scrape_website(url, query, model_name)
print("✅ Library test successful!")
print(f"Extracted data: {result['extracted_data']}")
except Exception as e:
print(f"❌ Library test failed: {e}")
if __name__ == "__main__":
asyncio.run(test_library())

View File

@@ -1,67 +0,0 @@
#!/usr/bin/env python3
"""
Test script for the new model management functionality.
This script demonstrates how to save and use custom model configurations.
"""
import asyncio
import requests
import json
# API base URL
BASE_URL = "http://localhost:8000"
def test_model_management():
"""Test the model management endpoints."""
print("=== Testing Model Management ===")
# 1. List current models
print("\n1. Listing current models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 2. Save another model configuration (OpenAI example)
print("\n2. Saving OpenAI model configuration:")
openai_config = {
"model_name": "my-openai",
"provider": "openai",
"api_token": "your-openai-api-key-here"
}
response = requests.post(f"{BASE_URL}/models", json=openai_config)
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 3. List models again to see the new ones
print("\n3. Listing models after adding new ones:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 4. Delete a model configuration
print("\n4. Deleting a model configuration:")
response = requests.delete(f"{BASE_URL}/models/my-openai")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 5. Final list of models
print("\n5. Final list of models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
if __name__ == "__main__":
print("Model Management Test Script")
print("Make sure the API server is running on http://localhost:8000")
print("=" * 50)
try:
test_model_management()
except requests.exceptions.ConnectionError:
print("Error: Could not connect to the API server.")
print("Make sure the server is running with: python api_server.py")
except Exception as e:
print(f"Error: {e}")

View File

@@ -1,397 +0,0 @@
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CacheMode,
CrawlerRunConfig,
LLMConfig,
JsonCssExtractionStrategy,
LLMExtractionStrategy
)
import os
import json
import hashlib
from typing import Dict, Any, Optional, List
from litellm import completion
class ModelConfig:
"""Configuration for LLM models."""
def __init__(self, provider: str, api_token: str):
self.provider = provider
self.api_token = api_token
def to_dict(self) -> Dict[str, Any]:
return {
"provider": self.provider,
"api_token": self.api_token
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ModelConfig':
return cls(
provider=data["provider"],
api_token=data["api_token"]
)
class WebScraperAgent:
"""
A mini library that converts any website into a structured data API.
Features:
1. Provide a URL and tell AI what data you need in plain English
2. Generate: Agent reverse-engineers the site and deploys custom scraper
3. Integrate: Use private API endpoint to get structured data
4. Support for custom LLM models and API keys
"""
def __init__(self, schemas_dir: str = "schemas", models_dir: str = "models"):
self.schemas_dir = schemas_dir
self.models_dir = models_dir
os.makedirs(self.schemas_dir, exist_ok=True)
os.makedirs(self.models_dir, exist_ok=True)
def _generate_schema_key(self, url: str, query: str) -> str:
"""Generate a unique key for schema caching based on URL and query."""
content = f"{url}:{query}"
return hashlib.md5(content.encode()).hexdigest()
def save_model_config(self, model_name: str, provider: str, api_token: str) -> bool:
"""
Save a model configuration for later use.
Args:
model_name: User-friendly name for the model
provider: LLM provider (e.g., 'gemini', 'openai', 'anthropic')
api_token: API token for the provider
Returns:
True if saved successfully
"""
try:
model_config = ModelConfig(provider, api_token)
config_path = os.path.join(self.models_dir, f"{model_name}.json")
with open(config_path, "w") as f:
json.dump(model_config.to_dict(), f, indent=2)
print(f"Model configuration saved: {model_name}")
return True
except Exception as e:
print(f"Failed to save model configuration: {e}")
return False
def load_model_config(self, model_name: str) -> Optional[ModelConfig]:
"""
Load a saved model configuration.
Args:
model_name: Name of the saved model configuration
Returns:
ModelConfig object or None if not found
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if not os.path.exists(config_path):
return None
with open(config_path, "r") as f:
data = json.load(f)
return ModelConfig.from_dict(data)
except Exception as e:
print(f"Failed to load model configuration: {e}")
return None
def list_saved_models(self) -> List[str]:
"""List all saved model configurations."""
models = []
for filename in os.listdir(self.models_dir):
if filename.endswith('.json'):
models.append(filename[:-5]) # Remove .json extension
return models
def delete_model_config(self, model_name: str) -> bool:
"""
Delete a saved model configuration.
Args:
model_name: Name of the model configuration to delete
Returns:
True if deleted successfully
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if os.path.exists(config_path):
os.remove(config_path)
print(f"Model configuration deleted: {model_name}")
return True
return False
except Exception as e:
print(f"Failed to delete model configuration: {e}")
return False
async def _load_or_generate_schema(self, url: str, query: str, session_id: str = "schema_generator", model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Loads schema from cache if exists, otherwise generates using AI.
This is the "Generate" step - our agent reverse-engineers the site.
Args:
url: URL to scrape
query: Query for data extraction
session_id: Session identifier
model_name: Name of saved model configuration to use
"""
schema_key = self._generate_schema_key(url, query)
schema_path = os.path.join(self.schemas_dir, f"{schema_key}.json")
if os.path.exists(schema_path):
print(f"Schema found in cache for {url}")
with open(schema_path, "r") as f:
return json.load(f)
print(f"Generating new schema for {url}")
print(f"Query: {query}")
query += """
IMPORTANT:
GENERATE THE SCHEMA WITH ONLY THE FIELDS MENTIONED IN THE QUERY. MAKE SURE THE NUMBER OF FIELDS IN THE SCHEME MATCH THE NUMBER OF FIELDS IN THE QUERY.
"""
# Step 1: Fetch the page HTML
async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
session_id=session_id,
simulate_user=True,
remove_overlay_elements=True,
delay_before_return_html=5,
)
)
html = result.fit_html
# Step 2: Generate schema using AI with custom model if specified
print("AI is analyzing the page structure...")
# Use custom model configuration if provided
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
schema = JsonCssExtractionStrategy.generate_schema(
html=html,
llm_config=llm_config,
query=query
)
# Step 3: Cache the generated schema
print(f"Schema generated and cached: {json.dumps(schema, indent=2)}")
with open(schema_path, "w") as f:
json.dump(schema, f, indent=2)
return schema
def _generate_llm_schema(self, query: str, llm_config: LLMConfig) -> Dict[str, Any]:
"""
Generate a schema for a given query using a custom LLM model.
Args:
query: Plain English description of what data to extract
model_config: Model configuration to use
"""
# ask the model to generate a schema for the given query in the form of a json.
prompt = f"""
IDENTIFY THE FIELDS FOR EXTRACTION MENTIONED IN THE QUERY and GENERATE A JSON SCHEMA FOR THE FIELDS.
eg.
{{
"name": "str",
"age": "str",
"email": "str",
"product_name": "str",
"product_price": "str",
"product_description": "str",
"product_image": "str",
"product_url": "str",
"product_rating": "str",
"product_reviews": "str",
}}
Here is the query:
{query}
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
response = completion(
model=llm_config.provider,
messages=[{"role": "user", "content": prompt}],
api_key=llm_config.api_token,
result_type="json"
)
return response.json()["choices"][0]["message"]["content"]
async def scrape_data_with_llm(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
query += """\n
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT WITH THE ONLY THE FIELDS MENTIONED IN THE QUERY.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
schema = self._generate_llm_schema(query, llm_config)
print(f"Schema: {schema}")
llm_extraction_strategy = LLMExtractionStrategy(
llm_config=llm_config,
instruction=query,
result_type="json",
schema=schema
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
simulate_user=True,
extraction_strategy=llm_extraction_strategy,
)
)
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def scrape_data(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Main method to scrape structured data from any website.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Structured data extracted from the website
"""
# Step 1: Generate or load schema (reverse-engineer the site)
schema = await self._load_or_generate_schema(url=url, query=query, model_name=model_name)
# Step 2: Deploy custom high-speed scraper
print(f"Deploying custom scraper for {url}")
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
run_config = CrawlerRunConfig(
extraction_strategy=JsonCssExtractionStrategy(schema=schema),
)
result = await crawler.arun(url=url, config=run_config)
# Step 3: Return structured data
# Parse extracted_content if it's a JSON string
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"schema_used": schema,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def get_cached_schemas(self) -> Dict[str, str]:
"""Get list of cached schemas."""
schemas = {}
for filename in os.listdir(self.schemas_dir):
if filename.endswith('.json'):
schema_key = filename[:-5] # Remove .json extension
schemas[schema_key] = filename
return schemas
def clear_cache(self):
"""Clear all cached schemas."""
import shutil
if os.path.exists(self.schemas_dir):
shutil.rmtree(self.schemas_dir)
os.makedirs(self.schemas_dir, exist_ok=True)
print("Schema cache cleared")
# Convenience function for simple usage
async def scrape_website(url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Simple function to scrape any website with plain English instructions.
Args:
url: Website URL
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Extracted structured data
"""
agent = WebScraperAgent()
return await agent.scrape_data(url, query, model_name)
async def scrape_website_with_llm(url: str, query: str, model_name: Optional[str] = None):
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
agent = WebScraperAgent()
return await agent.scrape_data_with_llm(url, query, model_name)

View File

@@ -126,6 +126,30 @@ Factors:
- URL depth (fewer slashes = higher authority)
- Clean URL structure
### Custom Link Scoring
```python
class CustomLinkScorer:
def score(self, link: Link, query: str, state: CrawlState) -> float:
# Prioritize specific URL patterns
if "/api/reference/" in link.href:
return 2.0 # Double the score
# Deprioritize certain sections
if "/archive/" in link.href:
return 0.1 # Reduce score by 90%
# Default scoring
return 1.0
# Use with adaptive crawler
adaptive = AdaptiveCrawler(
crawler,
config=config,
link_scorer=CustomLinkScorer()
)
```
## Domain-Specific Configurations
### Technical Documentation
@@ -206,12 +230,8 @@ config = AdaptiveConfig(
# Periodically clean state
if len(state.knowledge_base) > 1000:
# Keep only the top 500 most relevant docs
top_content = adaptive.get_relevant_content(top_k=500)
keep_indices = {d["index"] for d in top_content}
state.knowledge_base = [
doc for i, doc in enumerate(state.knowledge_base) if i in keep_indices
]
# Keep only most relevant
state.knowledge_base = get_top_relevant(state.knowledge_base, 500)
```
### Parallel Processing
@@ -232,6 +252,18 @@ tasks = [
results = await asyncio.gather(*tasks)
```
### Caching Strategy
```python
# Enable caching for repeated crawls
async with AsyncWebCrawler(
config=BrowserConfig(
cache_mode=CacheMode.ENABLED
)
) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
```
## Debugging & Analysis
### Enable Verbose Logging
@@ -290,9 +322,9 @@ with open("crawl_analysis.json", "w") as f:
### Implementing a Custom Strategy
```python
from crawl4ai.adaptive_crawler import CrawlStrategy
from crawl4ai.adaptive_crawler import BaseStrategy
class DomainSpecificStrategy(CrawlStrategy):
class DomainSpecificStrategy(BaseStrategy):
def calculate_coverage(self, state: CrawlState) -> float:
# Custom coverage calculation
# e.g., weight certain terms more heavily
@@ -319,7 +351,7 @@ adaptive = AdaptiveCrawler(
### Combining Strategies
```python
class HybridStrategy(CrawlStrategy):
class HybridStrategy(BaseStrategy):
def __init__(self):
self.strategies = [
TechnicalDocStrategy(),

View File

@@ -7,13 +7,13 @@ Simple proxy configuration with `BrowserConfig`:
```python
from crawl4ai.async_configs import BrowserConfig
# Using HTTP proxy
browser_config = BrowserConfig(proxy_config={"server": "http://proxy.example.com:8080"})
# Using proxy URL
browser_config = BrowserConfig(proxy="http://proxy.example.com:8080")
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
# Using SOCKS proxy
browser_config = BrowserConfig(proxy_config={"server": "socks5://proxy.example.com:1080"})
browser_config = BrowserConfig(proxy="socks5://proxy.example.com:1080")
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```
@@ -25,11 +25,7 @@ Use an authenticated proxy with `BrowserConfig`:
```python
from crawl4ai.async_configs import BrowserConfig
browser_config = BrowserConfig(proxy_config={
"server": "http://[host]:[port]",
"username": "[username]",
"password": "[password]",
})
browser_config = BrowserConfig(proxy="http://[username]:[password]@[host]:[port]")
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```

View File

@@ -23,7 +23,7 @@ browser_cfg = BrowserConfig(
| **`headless`** | `bool` (default: `True`) | Headless means no visible UI. `False` is handy for debugging. |
| **`viewport_width`** | `int` (default: `1080`) | Initial page width (in px). Useful for testing responsive layouts. |
| **`viewport_height`** | `int` (default: `600`) | Initial page height (in px). |
| **`proxy`** | `str` (deprecated) | Deprecated. Use `proxy_config` instead. If set, it will be auto-converted internally. |
| **`proxy`** | `str` (default: `None`) | Single-proxy URL if you want all traffic to go through it, e.g. `"http://user:pass@proxy:8080"`. |
| **`proxy_config`** | `dict` (default: `None`) | For advanced or multi-proxy needs, specify details like `{"server": "...", "username": "...", ...}`. |
| **`use_persistent_context`** | `bool` (default: `False`) | If `True`, uses a **persistent** browser context (keep cookies, sessions across runs). Also sets `use_managed_browser=True`. |
| **`user_data_dir`** | `str or None` (default: `None`) | Directory to store user data (profiles, cookies). Must be set if you want permanent sessions. |
@@ -155,7 +155,6 @@ If your page is a single-page app with repeated JS updates, set `js_only=True` i
| **`exclude_external_links`** | `bool` (False) | Removes all links pointing outside the current domain. |
| **`exclude_social_media_links`** | `bool` (False) | Strips links specifically to social sites (like Facebook or Twitter). |
| **`exclude_domains`** | `list` ([]) | Provide a custom list of domains to exclude (like `["ads.com", "trackers.io"]`). |
| **`preserve_https_for_internal_links`** | `bool` (False) | If `True`, preserves HTTPS scheme for internal links even when the server redirects to HTTP. Useful for security-conscious crawling. |
Use these for link-level content filtering (often to keep crawls “internal” or to remove spammy domains).

View File

@@ -108,19 +108,7 @@ config = AdaptiveConfig(
embedding_min_confidence_threshold=0.1 # Stop if completely irrelevant
)
# With custom LLM provider for query expansion (recommended)
from crawl4ai import LLMConfig
config = AdaptiveConfig(
strategy="embedding",
embedding_llm_config=LLMConfig(
provider='openai/text-embedding-3-small',
api_token='your-api-key',
temperature=0.7
)
)
# Alternative: Dictionary format (backward compatible)
# With custom embedding provider (e.g., OpenAI)
config = AdaptiveConfig(
strategy="embedding",
embedding_llm_config={

View File

@@ -472,17 +472,6 @@ Note that for BestFirstCrawlingStrategy, score_threshold is not needed since pag
5.**Balance breadth vs. depth.** Choose your strategy wisely - BFS for comprehensive coverage, DFS for deep exploration, BestFirst for focused relevance-based crawling.
6.**Preserve HTTPS for security.** If crawling HTTPS sites that redirect to HTTP, use `preserve_https_for_internal_links=True` to maintain secure connections:
```python
config = CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(max_depth=2),
preserve_https_for_internal_links=True # Keep HTTPS even if server redirects to HTTP
)
```
This is especially useful for security-conscious crawling or when dealing with sites that support both protocols.
---
## 10. Summary & Next Steps

View File

@@ -89,16 +89,6 @@ ANTHROPIC_API_KEY=your-anthropic-key
# TOGETHER_API_KEY=your-together-key
# MISTRAL_API_KEY=your-mistral-key
# GEMINI_API_TOKEN=your-gemini-token
# Optional: Global LLM settings
# LLM_PROVIDER=openai/gpt-4o-mini
# LLM_TEMPERATURE=0.7
# LLM_BASE_URL=https://api.custom.com/v1
# Optional: Provider-specific overrides
# OPENAI_TEMPERATURE=0.5
# OPENAI_BASE_URL=https://custom-openai.com/v1
# ANTHROPIC_TEMPERATURE=0.3
EOL
```
> 🔑 **Note**: Keep your API keys secure! Never commit `.llm.env` to version control.
@@ -166,43 +156,27 @@ cp deploy/docker/.llm.env.example .llm.env
**Flexible LLM Provider Configuration:**
The Docker setup now supports flexible LLM provider configuration through a hierarchical system:
The Docker setup now supports flexible LLM provider configuration through three methods:
1. **API Request Parameters** (Highest Priority): Specify per request
1. **Environment Variable** (Highest Priority): Set `LLM_PROVIDER` to override the default
```bash
export LLM_PROVIDER="anthropic/claude-3-opus"
# Or in your .llm.env file:
# LLM_PROVIDER=anthropic/claude-3-opus
```
2. **API Request Parameter**: Specify provider per request
```json
{
"url": "https://example.com",
"f": "llm",
"provider": "groq/mixtral-8x7b",
"temperature": 0.7,
"base_url": "https://api.custom.com/v1"
"provider": "groq/mixtral-8x7b"
}
```
2. **Provider-Specific Environment Variables**: Override for specific providers
```bash
# In your .llm.env file:
OPENAI_TEMPERATURE=0.5
OPENAI_BASE_URL=https://custom-openai.com/v1
ANTHROPIC_TEMPERATURE=0.3
```
3. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
3. **Global Environment Variables**: Set defaults for all providers
```bash
# In your .llm.env file:
LLM_PROVIDER=anthropic/claude-3-opus
LLM_TEMPERATURE=0.7
LLM_BASE_URL=https://api.proxy.com/v1
```
4. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
The system automatically selects the appropriate API key based on the provider. LiteLLM handles finding the correct environment variable for each provider (e.g., OPENAI_API_KEY for OpenAI, GEMINI_API_TOKEN for Google Gemini, etc.).
**Supported LLM Parameters:**
- `provider`: LLM provider and model (e.g., "openai/gpt-4", "anthropic/claude-3-opus")
- `temperature`: Controls randomness (0.0-2.0, lower = more focused, higher = more creative)
- `base_url`: Custom API endpoint for proxy servers or alternative endpoints
The system automatically selects the appropriate API key based on the configured `api_key_env` in the config file.
#### 3. Build and Run with Compose
@@ -581,101 +555,6 @@ Crucially, when sending configurations directly via JSON, they **must** follow t
**LLM Extraction Strategy** *(Keep example, ensure schema uses type/value wrapper)*
*(Keep Deep Crawler Example)*
### LLM Configuration Examples
The Docker API supports dynamic LLM configuration through multiple levels:
#### Temperature Control
Temperature affects the randomness of LLM responses (0.0 = deterministic, 2.0 = very creative):
```python
import requests
# Low temperature for factual extraction
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Extract all dates and numbers from this page",
"temperature": 0.2 # Very focused, deterministic
}
)
# High temperature for creative tasks
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Write a creative summary of this content",
"temperature": 1.2 # More creative, varied responses
}
)
```
#### Custom API Endpoints
Use custom base URLs for proxy servers or alternative API endpoints:
```python
# Using a local LLM server
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Extract key information",
"provider": "ollama/llama2",
"base_url": "http://localhost:11434/v1"
}
)
```
#### Dynamic Provider Selection
Switch between providers based on task requirements:
```python
async def smart_extraction(url: str, content_type: str):
"""Select provider and temperature based on content type"""
configs = {
"technical": {
"provider": "openai/gpt-4",
"temperature": 0.3,
"query": "Extract technical specifications and code examples"
},
"creative": {
"provider": "anthropic/claude-3-opus",
"temperature": 0.9,
"query": "Create an engaging narrative summary"
},
"quick": {
"provider": "groq/mixtral-8x7b",
"temperature": 0.5,
"query": "Quick summary in bullet points"
}
}
config = configs.get(content_type, configs["quick"])
response = await httpx.post(
"http://localhost:11235/md",
json={
"url": url,
"f": "llm",
"q": config["query"],
"provider": config["provider"],
"temperature": config["temperature"]
}
)
return response.json()
```
### REST API Examples
Update URLs to use port `11235`.
@@ -814,8 +693,8 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
# api_key: sk-... # If you pass the API key directly (not recommended)
# temperature and base_url are controlled via environment variables or request parameters
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# Redis Configuration (Used by internal Redis server managed by supervisord)
redis:

View File

@@ -79,7 +79,7 @@ if __name__ == "__main__":
asyncio.run(main())
```
> IMPORTANT: By default cache mode is set to `CacheMode.BYPASS` to have fresh content. Set `CacheMode.ENABLED` to enable caching.
> IMPORTANT: By default cache mode is set to `CacheMode.ENABLED`. So to have fresh content, you need to set it to `CacheMode.BYPASS`
Well explore more advanced config in later tutorials (like enabling proxies, PDF output, multi-tab sessions, etc.). For now, just note how you pass these objects to manage crawling.

View File

@@ -0,0 +1,242 @@
# Telemetry
Crawl4AI includes **opt-in telemetry** to help improve stability by capturing anonymous crash reports. No personal data or crawled content is ever collected.
!!! info "Privacy First"
Telemetry is completely optional and respects your privacy. Only exception information is collected - no URLs, no personal data, no crawled content.
## Overview
- **Privacy-first**: Only exceptions and crashes are reported
- **Opt-in by default**: You control when telemetry is enabled (except in Docker where it's on by default)
- **No PII**: No URLs, request data, or personal information is collected
- **Provider-agnostic**: Currently uses Sentry, but designed to support multiple backends
## Installation
Telemetry requires the optional Sentry SDK:
```bash
# Install with telemetry support
pip install crawl4ai[telemetry]
# Or install Sentry SDK separately
pip install sentry-sdk>=2.0.0
```
## Environments
### 1. Python Library & CLI
On first exception, you'll see an interactive prompt:
```
🚨 Crawl4AI Error Detection
==============================================================
We noticed an error occurred. Help improve Crawl4AI by
sending anonymous crash reports?
[1] Yes, send this error only
[2] Yes, always send errors
[3] No, don't send
Your choice (1/2/3):
```
Control via CLI:
```bash
# Enable telemetry
crwl telemetry enable
crwl telemetry enable --email you@example.com
# Disable telemetry
crwl telemetry disable
# Check status
crwl telemetry status
```
### 2. Docker / API Server
!!! warning "Default Enabled in Docker"
Telemetry is **enabled by default** in Docker environments to help identify container-specific issues. This is different from the CLI where it's opt-in.
To disable:
```bash
# Via environment variable
docker run -e CRAWL4AI_TELEMETRY=0 ...
# In docker-compose.yml
environment:
- CRAWL4AI_TELEMETRY=0
```
### 3. Jupyter / Google Colab
In notebooks, you'll see an interactive widget (if available) or a code snippet:
```python
import crawl4ai
# Enable telemetry
crawl4ai.telemetry.enable(email="you@example.com", always=True)
# Send only next error
crawl4ai.telemetry.enable(once=True)
# Disable telemetry
crawl4ai.telemetry.disable()
# Check status
crawl4ai.telemetry.status()
```
## Python API
### Basic Usage
```python
from crawl4ai import telemetry
# Enable/disable telemetry
telemetry.enable(email="optional@email.com", always=True)
telemetry.disable()
# Check current status
status = telemetry.status()
print(f"Telemetry enabled: {status['enabled']}")
print(f"Consent: {status['consent']}")
```
### Manual Exception Capture
```python
from crawl4ai.telemetry import capture_exception
try:
# Your code here
risky_operation()
except Exception as e:
# Manually capture exception with context
capture_exception(e, {
'operation': 'custom_crawler',
'url': 'https://example.com' # Will be sanitized
})
raise
```
### Decorator Pattern
```python
from crawl4ai.telemetry import telemetry_decorator
@telemetry_decorator
def my_crawler_function():
# Exceptions will be automatically captured
pass
```
### Context Manager
```python
from crawl4ai.telemetry import telemetry_context
with telemetry_context("data_extraction"):
# Any exceptions in this block will be captured
result = extract_data(html)
```
## Configuration
Settings are stored in `~/.crawl4ai/config.json`:
```json
{
"telemetry": {
"consent": "always",
"email": "user@example.com"
}
}
```
Consent levels:
- `"not_set"` - No decision made yet
- `"denied"` - Telemetry disabled
- `"once"` - Send current error only
- `"always"` - Always send errors
## Environment Variables
- `CRAWL4AI_TELEMETRY=0` - Disable telemetry (overrides config)
- `CRAWL4AI_TELEMETRY_EMAIL=email@example.com` - Set email for follow-up
- `CRAWL4AI_SENTRY_DSN=https://...` - Override default DSN (for maintainers)
## What's Collected
### Collected ✅
- Exception type and traceback
- Crawl4AI version
- Python version
- Operating system
- Environment type (CLI, Docker, Jupyter)
- Optional email (if provided)
### NOT Collected ❌
- URLs being crawled
- HTML content
- Request/response data
- Cookies or authentication tokens
- IP addresses
- Any personally identifiable information
## Provider Architecture
Telemetry is designed to be provider-agnostic:
```python
from crawl4ai.telemetry.base import TelemetryProvider
class CustomProvider(TelemetryProvider):
def send_exception(self, exc, context=None):
# Your implementation
pass
```
## FAQ
### Q: Can I completely disable telemetry?
A: Yes! Use `crwl telemetry disable` or set `CRAWL4AI_TELEMETRY=0`
### Q: Is telemetry required?
A: No, it's completely optional (except enabled by default in Docker)
### Q: What if I don't install sentry-sdk?
A: Telemetry will gracefully degrade to a no-op state
### Q: Can I see what's being sent?
A: Yes, check the source code in `crawl4ai/telemetry/`
### Q: How do I remove my email?
A: Delete `~/.crawl4ai/config.json` or edit it to remove the email field
## Privacy Commitment
1. **Transparency**: All telemetry code is open source
2. **Control**: You can enable/disable at any time
3. **Minimal**: Only crash data, no user content
4. **Secure**: Data transmitted over HTTPS to Sentry
5. **Anonymous**: No tracking or user identification
## Contributing
Help improve telemetry:
- Report issues with telemetry itself
- Suggest privacy improvements
- Add new provider backends
## Support
If you have concerns about telemetry:
- Open an issue on GitHub
- Email the maintainers
- Review the code in `crawl4ai/telemetry/`

View File

@@ -102,16 +102,16 @@ async def smart_blog_crawler():
# Step 2: Configure discovery - let's find all blog posts
config = SeedingConfig(
source="sitemap+cc", # Use the website's sitemap+cc
pattern="*/courses/*", # Only courses related posts
source="sitemap", # Use the website's sitemap
pattern="*/blog/*.html", # Only blog posts
extract_head=True, # Get page metadata
max_urls=100 # Limit for this example
)
# Step 3: Discover URLs from the Python blog
print("🔍 Discovering course posts...")
print("🔍 Discovering blog posts...")
urls = await seeder.urls("realpython.com", config)
print(f"✅ Found {len(urls)} course posts")
print(f"✅ Found {len(urls)} blog posts")
# Step 4: Filter for Python tutorials (using metadata!)
tutorials = [
@@ -134,8 +134,7 @@ async def smart_blog_crawler():
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
only_text=True,
word_count_threshold=300, # Only substantial articles
stream=True
word_count_threshold=300 # Only substantial articles
)
# Extract URLs and crawl them
@@ -156,7 +155,7 @@ asyncio.run(smart_blog_crawler())
**What just happened?**
1. We discovered all blog URLs from the sitemap+cc
1. We discovered all blog URLs from the sitemap
2. We filtered using metadata (no crawling needed!)
3. We crawled only the relevant tutorials
4. We saved tons of time and bandwidth
@@ -283,8 +282,8 @@ config = SeedingConfig(
live_check=True, # Verify each URL is accessible
concurrency=20 # Check 20 URLs in parallel
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
urls = await seeder.urls("example.com", config)
# Now you can filter by status
live_urls = [u for u in urls if u["status"] == "valid"]
@@ -312,8 +311,8 @@ This is where URL seeding gets really powerful. Instead of crawling entire pages
config = SeedingConfig(
extract_head=True # Extract metadata from <head> section
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
urls = await seeder.urls("example.com", config)
# Now each URL has rich metadata
for url in urls[:3]:
@@ -388,8 +387,8 @@ config = SeedingConfig(
scoring_method="bm25",
score_threshold=0.3
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
urls = await seeder.urls("example.com", config)
# URLs are scored based on:
# 1. Domain parts matching (e.g., 'python' in python.example.com)
@@ -430,8 +429,8 @@ config = SeedingConfig(
extract_head=True,
live_check=True
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("blog.example.com", config)
urls = await seeder.urls("blog.example.com", config)
# Analyze the results
for url in urls[:5]:
@@ -489,8 +488,8 @@ config = SeedingConfig(
scoring_method="bm25", # Use BM25 algorithm
score_threshold=0.3 # Minimum relevance score
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("realpython.com", config)
urls = await seeder.urls("realpython.com", config)
# Results are automatically sorted by relevance!
for url in urls[:5]:
@@ -512,8 +511,8 @@ config = SeedingConfig(
score_threshold=0.5,
max_urls=20
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("docs.example.com", config)
urls = await seeder.urls("docs.example.com", config)
# The highest scoring URLs will be API docs!
```
@@ -530,8 +529,8 @@ config = SeedingConfig(
score_threshold=0.4,
pattern="*/product/*" # Combine with pattern matching
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("shop.example.com", config)
urls = await seeder.urls("shop.example.com", config)
# Filter further by price (from metadata)
affordable = [
@@ -551,8 +550,8 @@ config = SeedingConfig(
scoring_method="bm25",
score_threshold=0.35
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("technews.com", config)
urls = await seeder.urls("technews.com", config)
# Filter by date
from datetime import datetime, timedelta
@@ -592,8 +591,8 @@ for query in queries:
score_threshold=0.4,
max_urls=10 # Top 10 per topic
)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("learning-platform.com", config)
urls = await seeder.urls("learning-platform.com", config)
all_tutorials.extend(urls)
# Remove duplicates while preserving order
@@ -626,8 +625,7 @@ config = SeedingConfig(
)
# Returns a dictionary: {domain: [urls]}
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(domains, config)
results = await seeder.many_urls(domains, config)
# Process results
for domain, urls in results.items():
@@ -656,8 +654,8 @@ config = SeedingConfig(
pattern="*/blog/*",
max_urls=100
)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(competitors, config)
results = await seeder.many_urls(competitors, config)
# Analyze content types
for domain, urls in results.items():
@@ -692,8 +690,8 @@ config = SeedingConfig(
score_threshold=0.3,
max_urls=20 # Per site
)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(educational_sites, config)
results = await seeder.many_urls(educational_sites, config)
# Find the best beginner tutorials
all_tutorials = []
@@ -733,8 +731,8 @@ config = SeedingConfig(
score_threshold=0.5, # High threshold for relevance
max_urls=10
)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(news_sites, config)
results = await seeder.many_urls(news_sites, config)
# Collect all mentions
mentions = []

View File

@@ -35,6 +35,7 @@ nav:
- "Page Interaction": "core/page-interaction.md"
- "Content Selection": "core/content-selection.md"
- "Cache Modes": "core/cache-modes.md"
- "Telemetry": "core/telemetry.md"
- "Local Files & Raw HTML": "core/local-files.md"
- "Link & Media": "core/link-media.md"
- Advanced:

View File

@@ -7,7 +7,7 @@ name = "Crawl4AI"
dynamic = ["version"]
description = "🚀🤖 Crawl4AI: Open-source LLM Friendly Web Crawler & scraper"
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.9"
license = "Apache-2.0"
authors = [
{name = "Unclecode", email = "unclecode@kidocode.com"}
@@ -36,7 +36,6 @@ dependencies = [
"PyYAML>=6.0",
"nltk>=3.9.1",
"rich>=13.9.4",
"cssselect>=1.2.0",
"httpx>=0.27.2",
"httpx[http2]>=0.27.2",
"fake-useragent>=2.0.3",
@@ -52,6 +51,7 @@ classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
@@ -64,6 +64,7 @@ torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers", "sentence-transformers"]
cosine = ["torch", "transformers", "nltk", "sentence-transformers"]
sync = ["selenium"]
telemetry = ["sentry-sdk>=2.0.0", "ipywidgets>=8.0.0"]
all = [
"PyPDF2",
"torch",
@@ -72,7 +73,9 @@ all = [
"transformers",
"tokenizers",
"sentence-transformers",
"selenium"
"selenium",
"sentry-sdk>=2.0.0",
"ipywidgets>=8.0.0"
]
[project.scripts]

16
pytest.ini Normal file
View File

@@ -0,0 +1,16 @@
[pytest]
testpaths = tests
python_paths = .
addopts = --maxfail=1 --disable-warnings -q --tb=short -v
asyncio_mode = auto
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
privacy: marks tests related to privacy compliance
performance: marks tests related to performance
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
env =
CRAWL4AI_TEST_MODE=1

View File

@@ -24,7 +24,6 @@ psutil>=6.1.1
PyYAML>=6.0
nltk>=3.9.1
rich>=13.9.4
cssselect>=1.2.0
chardet>=5.2.0
brotli>=1.1.0
httpx[http2]>=0.27.2

View File

@@ -56,10 +56,11 @@ setup(
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
],
python_requires=">=3.10",
python_requires=">=3.9",
)

View File

@@ -1,154 +0,0 @@
import asyncio
import os
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig, LLMConfig
async def test_configuration(name: str, config: AdaptiveConfig, url: str, query: str):
"""Test a specific configuration"""
print(f"\n{'='*60}")
print(f"Configuration: {name}")
print(f"{'='*60}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
result = await adaptive.digest(start_url=url, query=query)
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
print(f"\n{'='*50}")
print(f"Pages crawled: {len(result.crawled_urls)}")
print(f"Final confidence: {adaptive.confidence:.1%}")
print(f"Stopped reason: {result.metrics.get('stopped_reason', 'max_pages')}")
if result.metrics.get('is_irrelevant', False):
print("⚠️ Query detected as irrelevant!")
return result
async def llm_embedding():
"""Demonstrate various embedding configurations"""
print("EMBEDDING STRATEGY CONFIGURATION EXAMPLES")
print("=" * 60)
# Base URL and query for testing
test_url = "https://docs.python.org/3/library/asyncio.html"
openai_llm_config = LLMConfig(
provider='openai/text-embedding-3-small',
api_token=os.getenv('OPENAI_API_KEY'),
temperature=0.7,
max_tokens=2000
)
config_openai = AdaptiveConfig(
strategy="embedding",
max_pages=10,
# Use OpenAI embeddings
embedding_llm_config=openai_llm_config,
# embedding_llm_config={
# 'provider': 'openai/text-embedding-3-small',
# 'api_token': os.getenv('OPENAI_API_KEY')
# },
# OpenAI embeddings are high quality, can be stricter
embedding_k_exp=4.0,
n_query_variations=12
)
await test_configuration(
"OpenAI Embeddings",
config_openai,
test_url,
# "event-driven architecture patterns"
"async await context managers coroutines"
)
return
async def basic_adaptive_crawling():
"""Basic adaptive crawling example"""
# Initialize the crawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Create an adaptive crawler with default settings (statistical strategy)
adaptive = AdaptiveCrawler(crawler)
# Note: You can also use embedding strategy for semantic understanding:
# from crawl4ai import AdaptiveConfig
# config = AdaptiveConfig(strategy="embedding")
# adaptive = AdaptiveCrawler(crawler, config)
# Start adaptive crawling
print("Starting adaptive crawl for Python async programming information...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/library/asyncio.html",
query="async await context managers coroutines"
)
# Display crawl statistics
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
# Show final confidence
print(f"\n{'='*50}")
print(f"Final Confidence: {adaptive.confidence:.2%}")
print(f"Total Pages Crawled: {len(result.crawled_urls)}")
print(f"Knowledge Base Size: {len(adaptive.state.knowledge_base)} documents")
if adaptive.confidence >= 0.8:
print("✓ High confidence - can answer detailed questions about async Python")
elif adaptive.confidence >= 0.6:
print("~ Moderate confidence - can answer basic questions")
else:
print("✗ Low confidence - need more information")
if __name__ == "__main__":
asyncio.run(llm_embedding())
# asyncio.run(basic_adaptive_crawling())

View File

@@ -112,7 +112,7 @@ async def test_proxy_settings():
headless=True,
verbose=False,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
proxy_config={"server": "http://127.0.0.1:8080"}, # Assuming local proxy server for test
proxy="http://127.0.0.1:8080", # Assuming local proxy server for test
use_managed_browser=False,
use_persistent_context=False,
) as crawler:

151
tests/conftest.py Normal file
View File

@@ -0,0 +1,151 @@
"""
Shared pytest fixtures for Crawl4AI tests.
"""
import pytest
import tempfile
import os
from pathlib import Path
from unittest.mock import Mock, patch
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment
@pytest.fixture
def temp_config_dir():
"""Provide a temporary directory for telemetry config testing."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def mock_telemetry_config(temp_config_dir):
"""Provide a mocked telemetry config for testing."""
config = TelemetryConfig(config_dir=temp_config_dir)
yield config
@pytest.fixture
def clean_environment():
"""Clean environment variables before and after test."""
# Store original environment
original_env = os.environ.copy()
# Clean telemetry-related env vars
telemetry_vars = [
'CRAWL4AI_TELEMETRY',
'CRAWL4AI_DOCKER',
'CRAWL4AI_API_SERVER',
'CRAWL4AI_TEST_MODE'
]
for var in telemetry_vars:
if var in os.environ:
del os.environ[var]
# Set test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
yield
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
@pytest.fixture
def mock_sentry_provider():
"""Provide a mocked Sentry provider for testing."""
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as mock:
provider_instance = Mock()
provider_instance.initialize.return_value = True
provider_instance.send_exception.return_value = True
provider_instance.is_initialized = True
mock.return_value = provider_instance
yield provider_instance
@pytest.fixture
def enabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry enabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.ALWAYS
config.is_enabled.return_value = True
config.should_send_current.return_value = True
config.get_email.return_value = "test@example.com"
config.update_from_env.return_value = None
yield config
@pytest.fixture
def disabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry disabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.DENIED
config.is_enabled.return_value = False
config.should_send_current.return_value = False
config.update_from_env.return_value = None
yield config
@pytest.fixture
def docker_environment():
"""Mock Docker environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
yield
@pytest.fixture
def cli_environment():
"""Mock CLI environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.CLI):
with patch('sys.stdin.isatty', return_value=True):
yield
@pytest.fixture
def jupyter_environment():
"""Mock Jupyter environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.JUPYTER):
yield
@pytest.fixture(autouse=True)
def reset_telemetry_singleton():
"""Reset telemetry singleton between tests."""
from crawl4ai.telemetry import TelemetryManager
# Reset the singleton instance
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
yield
# Clean up after test
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
@pytest.fixture
def sample_exception():
"""Provide a sample exception for testing."""
try:
raise ValueError("Test exception for telemetry")
except ValueError as e:
return e
@pytest.fixture
def privacy_test_data():
"""Provide test data that should NOT be captured by telemetry."""
return {
'url': 'https://example.com/private-page',
'content': 'This is private content that should not be sent',
'user_data': {
'email': 'user@private.com',
'password': 'secret123',
'api_key': 'sk-1234567890abcdef'
},
'pii': {
'ssn': '123-45-6789',
'phone': '+1-555-123-4567',
'address': '123 Main St, Anytown, USA'
}
}

View File

@@ -1,201 +0,0 @@
"""
Test the complete fix for both the filter serialization and JSON serialization issues.
"""
import asyncio
import httpx
from crawl4ai import BrowserConfig, CacheMode, CrawlerRunConfig
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, FilterChain, URLPatternFilter
BASE_URL = "http://localhost:11234/" # Adjust port as needed
async def test_with_docker_client():
"""Test using the Docker client (same as 1419.py)."""
from crawl4ai.docker_client import Crawl4aiDockerClient
print("=" * 60)
print("Testing with Docker Client")
print("=" * 60)
try:
async with Crawl4aiDockerClient(
base_url=BASE_URL,
verbose=True,
) as client:
# Create filter chain - testing the serialization fix
filter_chain = [
URLPatternFilter(
# patterns=["*about*", "*privacy*", "*terms*"],
patterns=["*advanced*"],
reverse=True
),
]
crawler_config = CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(
max_depth=2, # Keep it shallow for testing
# max_pages=5, # Limit pages for testing
filter_chain=FilterChain(filter_chain)
),
cache_mode=CacheMode.BYPASS,
)
print("\n1. Testing crawl with filters...")
results = await client.crawl(
["https://docs.crawl4ai.com"], # Simple test page
browser_config=BrowserConfig(headless=True),
crawler_config=crawler_config,
)
if results:
print(f"✅ Crawl succeeded! Type: {type(results)}")
if hasattr(results, 'success'):
print(f"✅ Results success: {results.success}")
# Test that we can iterate results without JSON errors
if hasattr(results, '__iter__'):
for i, result in enumerate(results):
if hasattr(result, 'url'):
print(f" Result {i}: {result.url[:50]}...")
else:
print(f" Result {i}: {str(result)[:50]}...")
else:
# Handle list of results
print(f"✅ Got {len(results)} results")
for i, result in enumerate(results[:3]): # Show first 3
print(f" Result {i}: {result.url[:50]}...")
else:
print("❌ Crawl failed - no results returned")
return False
print("\n✅ Docker client test completed successfully!")
return True
except Exception as e:
print(f"❌ Docker client test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_with_rest_api():
"""Test using REST API directly."""
print("\n" + "=" * 60)
print("Testing with REST API")
print("=" * 60)
# Create filter configuration
deep_crawl_strategy_payload = {
"type": "BFSDeepCrawlStrategy",
"params": {
"max_depth": 2,
# "max_pages": 5,
"filter_chain": {
"type": "FilterChain",
"params": {
"filters": [
{
"type": "URLPatternFilter",
"params": {
"patterns": ["*advanced*"],
"reverse": True
}
}
]
}
}
}
}
crawl_payload = {
"urls": ["https://docs.crawl4ai.com"],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"deep_crawl_strategy": deep_crawl_strategy_payload,
"cache_mode": "bypass"
}
}
}
try:
async with httpx.AsyncClient() as client:
print("\n1. Sending crawl request to REST API...")
response = await client.post(
f"{BASE_URL}crawl",
json=crawl_payload,
timeout=30
)
if response.status_code == 200:
print(f"✅ REST API returned 200 OK")
data = response.json()
if data.get("success"):
results = data.get("results", [])
print(f"✅ Got {len(results)} results")
for i, result in enumerate(results[:3]):
print(f" Result {i}: {result.get('url', 'unknown')[:50]}...")
else:
print(f"❌ Crawl not successful: {data}")
return False
else:
print(f"❌ REST API returned {response.status_code}")
print(f" Response: {response.text[:500]}")
return False
print("\n✅ REST API test completed successfully!")
return True
except Exception as e:
print(f"❌ REST API test failed: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests."""
print("\n🧪 TESTING COMPLETE FIX FOR DOCKER FILTER AND JSON ISSUES")
print("=" * 60)
print("Make sure the server is running with the updated code!")
print("=" * 60)
results = []
# Test 1: Docker client
docker_passed = await test_with_docker_client()
results.append(("Docker Client", docker_passed))
# Test 2: REST API
rest_passed = await test_with_rest_api()
results.append(("REST API", rest_passed))
# Summary
print("\n" + "=" * 60)
print("FINAL TEST SUMMARY")
print("=" * 60)
all_passed = True
for test_name, passed in results:
status = "✅ PASSED" if passed else "❌ FAILED"
print(f"{test_name:20} {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("🎉 ALL TESTS PASSED! Both issues are fully resolved!")
print("\nThe fixes:")
print("1. Filter serialization: Fixed by not serializing private __slots__")
print("2. JSON serialization: Fixed by removing property descriptors from model_dump()")
else:
print("⚠️ Some tests failed. Please check the server logs for details.")
return 0 if all_passed else 1
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))

View File

@@ -1,349 +0,0 @@
#!/usr/bin/env python3
"""
Test script for LLM temperature and base_url parameters in Crawl4AI Docker API.
This demonstrates the new hierarchical configuration system:
1. Request-level parameters (highest priority)
2. Provider-specific environment variables
3. Global environment variables
4. System defaults (lowest priority)
"""
import asyncio
import httpx
import json
import os
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
console = Console()
# Configuration
BASE_URL = "http://localhost:11235" # Docker API endpoint
TEST_URL = "https://httpbin.org/html" # Simple test page
# --- Helper Functions ---
async def check_server_health(client: httpx.AsyncClient) -> bool:
"""Check if the server is healthy."""
console.print("[bold cyan]Checking server health...[/]", end="")
try:
response = await client.get("/health", timeout=10.0)
response.raise_for_status()
console.print(" [bold green]✓ Server is healthy![/]")
return True
except Exception as e:
console.print(f"\n[bold red]✗ Server health check failed: {e}[/]")
console.print(f"Is the server running at {BASE_URL}?")
return False
def print_request(endpoint: str, payload: dict, title: str = "Request"):
"""Pretty print the request."""
syntax = Syntax(json.dumps(payload, indent=2), "json", theme="monokai")
console.print(Panel.fit(
f"[cyan]POST {endpoint}[/cyan]\n{syntax}",
title=f"[bold blue]{title}[/]",
border_style="blue"
))
def print_response(response: dict, title: str = "Response"):
"""Pretty print relevant parts of the response."""
# Extract only the relevant parts
relevant = {}
if "markdown" in response:
relevant["markdown"] = response["markdown"][:200] + "..." if len(response.get("markdown", "")) > 200 else response.get("markdown", "")
if "success" in response:
relevant["success"] = response["success"]
if "url" in response:
relevant["url"] = response["url"]
if "filter" in response:
relevant["filter"] = response["filter"]
console.print(Panel.fit(
Syntax(json.dumps(relevant, indent=2), "json", theme="monokai"),
title=f"[bold green]{title}[/]",
border_style="green"
))
# --- Test Functions ---
async def test_default_no_params(client: httpx.AsyncClient):
"""Test 1: No temperature or base_url specified - uses defaults"""
console.rule("[bold yellow]Test 1: Default Configuration (No Parameters)[/]")
payload = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading of this page? Answer in exactly 5 words."
}
print_request("/md", payload, "Request without temperature/base_url")
try:
response = await client.post("/md", json=payload, timeout=30.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response (using system defaults)")
console.print("[dim]→ This used system defaults or environment variables if set[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_request_temperature(client: httpx.AsyncClient):
"""Test 2: Request-level temperature (highest priority)"""
console.rule("[bold yellow]Test 2: Request-Level Temperature[/]")
# Test with low temperature (more focused)
payload_low = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading? Be creative and poetic.",
"temperature": 0.1 # Very low - should be less creative
}
print_request("/md", payload_low, "Low Temperature (0.1)")
try:
response = await client.post("/md", json=payload_low, timeout=30.0)
response.raise_for_status()
data_low = response.json()
print_response(data_low, "Response with Low Temperature")
console.print("[dim]→ Low temperature (0.1) should produce focused, less creative output[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
console.print()
# Test with high temperature (more creative)
payload_high = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading? Be creative and poetic.",
"temperature": 1.5 # High - should be more creative
}
print_request("/md", payload_high, "High Temperature (1.5)")
try:
response = await client.post("/md", json=payload_high, timeout=30.0)
response.raise_for_status()
data_high = response.json()
print_response(data_high, "Response with High Temperature")
console.print("[dim]→ High temperature (1.5) should produce more creative, varied output[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_provider_override(client: httpx.AsyncClient):
"""Test 3: Provider override with temperature"""
console.rule("[bold yellow]Test 3: Provider Override with Temperature[/]")
provider = "gemini/gemini-2.5-flash-lite"
payload = {
"url": TEST_URL,
"f": "llm",
"q": "Summarize this page in one sentence.",
"provider": provider, # Explicitly set provider
"temperature": 0.7
}
print_request("/md", payload, "Provider + Temperature Override")
try:
response = await client.post("/md", json=payload, timeout=30.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response with Provider Override")
console.print(f"[dim]→ This explicitly uses {provider} with temperature 0.7[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_base_url_custom(client: httpx.AsyncClient):
"""Test 4: Custom base_url (will fail unless you have a custom endpoint)"""
console.rule("[bold yellow]Test 4: Custom Base URL (Demo Only)[/]")
payload = {
"url": TEST_URL,
"f": "llm",
"q": "What is this page about?",
"base_url": "https://api.custom-endpoint.com/v1", # Custom endpoint
"temperature": 0.5
}
print_request("/md", payload, "Custom Base URL Request")
console.print("[yellow]Note: This will fail unless you have a custom endpoint set up[/]")
try:
response = await client.post("/md", json=payload, timeout=10.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response from Custom Endpoint")
except httpx.HTTPStatusError as e:
console.print(f"[yellow]Expected failure (no custom endpoint): Status {e.response.status_code}[/]")
except Exception as e:
console.print(f"[yellow]Expected error: {e}[/]")
async def test_llm_job_endpoint(client: httpx.AsyncClient):
"""Test 5: Test the /llm/job endpoint with temperature and base_url"""
console.rule("[bold yellow]Test 5: LLM Job Endpoint with Parameters[/]")
payload = {
"url": TEST_URL,
"q": "Extract the main title and any key information",
"temperature": 0.3,
# "base_url": "https://api.openai.com/v1" # Optional
}
print_request("/llm/job", payload, "LLM Job with Temperature")
try:
# Submit the job
response = await client.post("/llm/job", json=payload, timeout=30.0)
response.raise_for_status()
job_data = response.json()
if "task_id" in job_data:
task_id = job_data["task_id"]
console.print(f"[green]Job created with task_id: {task_id}[/]")
# Poll for result (simplified - in production use proper polling)
await asyncio.sleep(3)
status_response = await client.get(f"/llm/job/{task_id}")
status_data = status_response.json()
if status_data.get("status") == "completed":
console.print("[green]Job completed successfully![/]")
if "result" in status_data:
console.print(Panel.fit(
Syntax(json.dumps(status_data["result"], indent=2), "json", theme="monokai"),
title="Extraction Result",
border_style="green"
))
else:
console.print(f"[yellow]Job status: {status_data.get('status', 'unknown')}[/]")
else:
console.print(f"[red]Unexpected response: {job_data}[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_llm_endpoint(client: httpx.AsyncClient):
"""
Quick QA round-trip with /llm.
Asks a trivial question against SIMPLE_URL just to show wiring.
"""
import time
import urllib.parse
page_url = "https://kidocode.com"
question = "What is the title of this page?"
enc = urllib.parse.quote_plus(page_url, safe="")
console.print(f"GET /llm/{enc}?q={question}")
try:
t0 = time.time()
resp = await client.get(f"/llm/{enc}", params={"q": question})
dt = time.time() - t0
console.print(
f"Response Status: [bold {'green' if resp.is_success else 'red'}]{resp.status_code}[/] (took {dt:.2f}s)")
resp.raise_for_status()
answer = resp.json().get("answer", "")
console.print(Panel(answer or "No answer returned",
title="LLM answer", border_style="magenta", expand=False))
except Exception as e:
console.print(f"[bold red]Error hitting /llm:[/] {e}")
async def show_environment_info():
"""Display current environment configuration"""
console.rule("[bold cyan]Current Environment Configuration[/]")
table = Table(title="LLM Environment Variables", show_header=True, header_style="bold magenta")
table.add_column("Variable", style="cyan", width=30)
table.add_column("Value", style="yellow")
table.add_column("Description", style="dim")
env_vars = [
("LLM_PROVIDER", "Global default provider"),
("LLM_TEMPERATURE", "Global default temperature"),
("LLM_BASE_URL", "Global custom API endpoint"),
("OPENAI_API_KEY", "OpenAI API key"),
("OPENAI_TEMPERATURE", "OpenAI-specific temperature"),
("OPENAI_BASE_URL", "OpenAI-specific endpoint"),
("ANTHROPIC_API_KEY", "Anthropic API key"),
("ANTHROPIC_TEMPERATURE", "Anthropic-specific temperature"),
("GROQ_API_KEY", "Groq API key"),
("GROQ_TEMPERATURE", "Groq-specific temperature"),
]
for var, desc in env_vars:
value = os.environ.get(var, "[not set]")
if "API_KEY" in var and value != "[not set]":
# Mask API keys for security
value = value[:10] + "..." if len(value) > 10 else "***"
table.add_row(var, value, desc)
console.print(table)
console.print()
# --- Main Test Runner ---
async def main():
"""Run all tests"""
console.print(Panel.fit(
"[bold cyan]Crawl4AI LLM Parameters Test Suite[/]\n" +
"Testing temperature and base_url configuration hierarchy",
border_style="cyan"
))
# Show current environment
# await show_environment_info()
# Create HTTP client
async with httpx.AsyncClient(base_url=BASE_URL, timeout=60.0) as client:
# Check server health
if not await check_server_health(client):
console.print("[red]Server is not available. Please ensure the Docker container is running.[/]")
return
# Run tests
tests = [
("Default Configuration", test_default_no_params),
("Request Temperature", test_request_temperature),
("Provider Override", test_provider_override),
("Custom Base URL", test_base_url_custom),
("LLM Job Endpoint", test_llm_job_endpoint),
("LLM Endpoint", test_llm_endpoint),
]
for i, (name, test_func) in enumerate(tests, 1):
if i > 1:
console.print() # Add spacing between tests
try:
await test_func(client)
except Exception as e:
console.print(f"[red]Test '{name}' failed with error: {e}[/]")
console.print_exception(show_locals=False)
console.rule("[bold green]All Tests Complete![/]", style="green")
# Summary
console.print("\n[bold cyan]Configuration Hierarchy Summary:[/]")
console.print("1. [yellow]Request parameters[/] - Highest priority (temperature, base_url in API call)")
console.print("2. [yellow]Provider-specific env[/] - e.g., OPENAI_TEMPERATURE, GROQ_BASE_URL")
console.print("3. [yellow]Global env variables[/] - LLM_TEMPERATURE, LLM_BASE_URL")
console.print("4. [yellow]System defaults[/] - Lowest priority (provider/litellm defaults)")
console.print()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n[yellow]Tests interrupted by user.[/]")
except Exception as e:
console.print(f"\n[bold red]An error occurred:[/]")
console.print_exception(show_locals=False)

View File

@@ -143,40 +143,7 @@ class TestCrawlEndpoints:
assert "<h1>Herman Melville - Moby-Dick</h1>" in result["html"]
# We don't specify a markdown generator in this test, so don't make assumptions about markdown field
# It might be null, missing, or populated depending on the server's default behavior
async def test_crawl_with_stream_direct(self, async_client: httpx.AsyncClient):
"""Test that /crawl endpoint handles stream=True directly without redirect."""
payload = {
"urls": [SIMPLE_HTML_URL],
"browser_config": {
"type": "BrowserConfig",
"params": {
"headless": True,
}
},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"stream": True, # Set stream to True for direct streaming
"screenshot": False,
"cache_mode": CacheMode.BYPASS.value
}
}
}
# Send a request to the /crawl endpoint - should handle streaming directly
async with async_client.stream("POST", "/crawl", json=payload) as response:
assert response.status_code == 200
assert response.headers["content-type"] == "application/x-ndjson"
assert response.headers.get("x-stream-status") == "active"
results = await process_streaming_response(response)
assert len(results) == 1
result = results[0]
await assert_crawl_result_structure(result)
assert result["success"] is True
assert result["url"] == SIMPLE_HTML_URL
assert "<h1>Herman Melville - Moby-Dick</h1>" in result["html"]
async def test_simple_crawl_single_url_streaming(self, async_client: httpx.AsyncClient):
"""Test /crawl/stream with a single URL and simple config values."""
payload = {
@@ -668,209 +635,7 @@ class TestCrawlEndpoints:
pytest.fail(f"LLM extracted content parsing or validation failed: {e}\nContent: {result['extracted_content']}")
except Exception as e: # Catch any other unexpected error
pytest.fail(f"An unexpected error occurred during LLM result processing: {e}\nContent: {result['extracted_content']}")
# 7. Error Handling Tests
async def test_invalid_url_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for invalid URLs."""
payload = {
"urls": ["invalid-url", "https://nonexistent-domain-12345.com"],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {"type": "CrawlerRunConfig", "params": {"cache_mode": CacheMode.BYPASS.value}}
}
response = await async_client.post("/crawl", json=payload)
# Should return 200 with failed results, not 500
print(f"Status code: {response.status_code}")
print(f"Response: {response.text}")
assert response.status_code == 500
data = response.json()
assert data["detail"].startswith("Crawl request failed:")
async def test_mixed_success_failure_urls(self, async_client: httpx.AsyncClient):
"""Test handling of mixed success/failure URLs."""
payload = {
"urls": [
SIMPLE_HTML_URL, # Should succeed
"https://nonexistent-domain-12345.com", # Should fail
"https://invalid-url-with-special-chars-!@#$%^&*()", # Should fail
],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"cache_mode": CacheMode.BYPASS.value,
"markdown_generator": {
"type": "DefaultMarkdownGenerator",
"params": {
"content_filter": {
"type": "PruningContentFilter",
"params": {"threshold": 0.5}
}
}
}
}
}
}
response = await async_client.post("/crawl", json=payload)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert len(data["results"]) == 3
success_count = 0
failure_count = 0
for result in data["results"]:
if result["success"]:
success_count += 1
else:
failure_count += 1
assert "error_message" in result
assert len(result["error_message"]) > 0
assert success_count >= 1 # At least one should succeed
assert failure_count >= 1 # At least one should fail
async def test_streaming_mixed_urls(self, async_client: httpx.AsyncClient):
"""Test streaming with mixed success/failure URLs."""
payload = {
"urls": [
SIMPLE_HTML_URL, # Should succeed
"https://nonexistent-domain-12345.com", # Should fail
],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"stream": True,
"cache_mode": CacheMode.BYPASS.value
}
}
}
async with async_client.stream("POST", "/crawl/stream", json=payload) as response:
response.raise_for_status()
results = await process_streaming_response(response)
assert len(results) == 2
success_count = 0
failure_count = 0
for result in results:
if result["success"]:
success_count += 1
assert result["url"] == SIMPLE_HTML_URL
else:
failure_count += 1
assert "error_message" in result
assert result["error_message"] is not None
assert success_count == 1
assert failure_count == 1
async def test_markdown_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for markdown endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
response = await async_client.post("/md", json=invalid_payload)
# Should return 400 for invalid URL format
assert response.status_code == 400
# Test non-existent URL
nonexistent_payload = {"url": "https://nonexistent-domain-12345.com", "f": "fit"}
response = await async_client.post("/md", json=nonexistent_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_html_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for HTML endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/html", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_screenshot_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for screenshot endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/screenshot", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_pdf_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for PDF endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/pdf", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_execute_js_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for execute_js endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url", "scripts": ["return document.title;"]}
response = await async_client.post("/execute_js", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_llm_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for LLM endpoint."""
# Test missing query parameter
response = await async_client.get("/llm/https://example.com")
assert response.status_code == 422 # FastAPI validation error, not 400
# Test invalid URL
response = await async_client.get("/llm/invalid-url?q=test")
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_ask_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for ask endpoint."""
# Test invalid context_type
response = await async_client.get("/ask?context_type=invalid")
assert response.status_code == 422 # Validation error
# Test invalid score_ratio
response = await async_client.get("/ask?score_ratio=2.0") # > 1.0
assert response.status_code == 422 # Validation error
# Test invalid max_results
response = await async_client.get("/ask?max_results=0") # < 1
assert response.status_code == 422 # Validation error
async def test_config_dump_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for config dump endpoint."""
# Test invalid code
invalid_payload = {"code": "invalid_code"}
response = await async_client.post("/config/dump", json=invalid_payload)
assert response.status_code == 400
# Test nested function calls (not allowed)
nested_payload = {"code": "CrawlerRunConfig(BrowserConfig())"}
response = await async_client.post("/config/dump", json=nested_payload)
assert response.status_code == 400
async def test_malformed_request_handling(self, async_client: httpx.AsyncClient):
"""Test handling of malformed requests."""
# Test missing required fields
malformed_payload = {"urls": []} # Missing browser_config and crawler_config
response = await async_client.post("/crawl", json=malformed_payload)
print(f"Response: {response.text}")
assert response.status_code == 422 # Validation error
# Test empty URLs list
empty_urls_payload = {
"urls": [],
"browser_config": {"type": "BrowserConfig", "params": {}},
"crawler_config": {"type": "CrawlerRunConfig", "params": {}}
}
response = await async_client.post("/crawl", json=empty_urls_payload)
assert response.status_code == 422 # "At least one URL required"
if __name__ == "__main__":
# Define arguments for pytest programmatically
# -v: verbose output

View File

@@ -1,117 +0,0 @@
#!/usr/bin/env python3
"""
Simple test to verify BestFirstCrawlingStrategy fixes.
This test crawls a real website and shows that:
1. Higher-scoring pages are crawled first (priority queue fix)
2. Links are scored before truncation (link discovery fix)
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.deep_crawling import BestFirstCrawlingStrategy
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
async def test_best_first_strategy():
"""Test BestFirstCrawlingStrategy with keyword scoring"""
print("=" * 70)
print("Testing BestFirstCrawlingStrategy with Real URL")
print("=" * 70)
print("\nThis test will:")
print("1. Crawl Python.org documentation")
print("2. Score pages based on keywords: 'tutorial', 'guide', 'reference'")
print("3. Show that higher-scoring pages are crawled first")
print("-" * 70)
# Create a keyword scorer that prioritizes tutorial/guide pages
scorer = KeywordRelevanceScorer(
keywords=["tutorial", "guide", "reference", "documentation"],
weight=1.0,
case_sensitive=False
)
# Create the strategy with scoring
strategy = BestFirstCrawlingStrategy(
max_depth=2, # Crawl 2 levels deep
max_pages=10, # Limit to 10 pages total
url_scorer=scorer, # Use keyword scoring
include_external=False # Only internal links
)
# Configure browser and crawler
browser_config = BrowserConfig(
headless=True, # Run in background
verbose=False # Reduce output noise
)
crawler_config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False
)
print("\nStarting crawl of https://docs.python.org/3/")
print("Looking for pages with keywords: tutorial, guide, reference, documentation")
print("-" * 70)
crawled_urls = []
async with AsyncWebCrawler(config=browser_config) as crawler:
# Crawl and collect results
results = await crawler.arun(
url="https://docs.python.org/3/",
config=crawler_config
)
# Process results
if isinstance(results, list):
for result in results:
score = result.metadata.get('score', 0) if result.metadata else 0
depth = result.metadata.get('depth', 0) if result.metadata else 0
crawled_urls.append({
'url': result.url,
'score': score,
'depth': depth,
'success': result.success
})
print("\n" + "=" * 70)
print("CRAWL RESULTS (in order of crawling)")
print("=" * 70)
for i, item in enumerate(crawled_urls, 1):
status = "" if item['success'] else ""
# Highlight high-scoring pages
if item['score'] > 0.5:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print(f" ^ HIGH SCORE - Contains keywords!")
else:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print("\n" + "=" * 70)
print("ANALYSIS")
print("=" * 70)
# Check if higher scores appear early in the crawl
scores = [item['score'] for item in crawled_urls[1:]] # Skip initial URL
high_score_indices = [i for i, s in enumerate(scores) if s > 0.3]
if high_score_indices and high_score_indices[0] < len(scores) / 2:
print("✅ SUCCESS: Higher-scoring pages (with keywords) were crawled early!")
print(" This confirms the priority queue fix is working.")
else:
print("⚠️ Check the crawl order above - higher scores should appear early")
# Show score distribution
print(f"\nScore Statistics:")
print(f" - Total pages crawled: {len(crawled_urls)}")
print(f" - Average score: {sum(item['score'] for item in crawled_urls) / len(crawled_urls):.2f}")
print(f" - Max score: {max(item['score'] for item in crawled_urls):.2f}")
print(f" - Pages with keywords: {sum(1 for item in crawled_urls if item['score'] > 0.3)}")
print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)
if __name__ == "__main__":
print("\n🔍 BestFirstCrawlingStrategy Simple Test\n")
asyncio.run(test_best_first_strategy())

View File

@@ -24,7 +24,7 @@ CASES = [
# --- BrowserConfig variants ---
"BrowserConfig()",
"BrowserConfig(headless=False, extra_args=['--disable-gpu'])",
"BrowserConfig(browser_mode='builtin', proxy_config={'server': 'http://1.2.3.4:8080'})",
"BrowserConfig(browser_mode='builtin', proxy='http://1.2.3.4:8080')",
]
for code in CASES:

View File

@@ -1,42 +0,0 @@
import warnings
import pytest
from crawl4ai.async_configs import BrowserConfig, ProxyConfig
def test_browser_config_proxy_string_emits_deprecation_and_autoconverts():
warnings.simplefilter("always", DeprecationWarning)
proxy_str = "23.95.150.145:6114:username:password"
with warnings.catch_warnings(record=True) as caught:
cfg = BrowserConfig(proxy=proxy_str, headless=True)
dep_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert dep_warnings, "Expected DeprecationWarning when using BrowserConfig(proxy=...)"
assert cfg.proxy is None, "cfg.proxy should be None after auto-conversion"
assert isinstance(cfg.proxy_config, ProxyConfig), "cfg.proxy_config should be ProxyConfig instance"
assert cfg.proxy_config.username == "username"
assert cfg.proxy_config.password == "password"
assert cfg.proxy_config.server.startswith("http://")
assert cfg.proxy_config.server.endswith(":6114")
def test_browser_config_with_proxy_config_emits_no_deprecation():
warnings.simplefilter("always", DeprecationWarning)
with warnings.catch_warnings(record=True) as caught:
cfg = BrowserConfig(
headless=True,
proxy_config={
"server": "http://127.0.0.1:8080",
"username": "u",
"password": "p",
},
)
dep_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert not dep_warnings, "Did not expect DeprecationWarning when using proxy_config"
assert cfg.proxy is None
assert isinstance(cfg.proxy_config, ProxyConfig)

View File

@@ -0,0 +1,64 @@
"""
Test configuration and utilities for telemetry testing.
"""
import os
import pytest
def pytest_configure(config): # noqa: ARG001
"""Configure pytest for telemetry tests."""
# Add custom markers
config.addinivalue_line("markers", "unit: Unit tests")
config.addinivalue_line("markers", "integration: Integration tests")
config.addinivalue_line("markers", "privacy: Privacy compliance tests")
config.addinivalue_line("markers", "performance: Performance tests")
config.addinivalue_line("markers", "slow: Slow running tests")
def pytest_collection_modifyitems(config, items): # noqa: ARG001
"""Modify test collection to add markers automatically."""
for item in items:
# Add markers based on test location and name
if "telemetry" in str(item.fspath):
if "integration" in item.name or "test_integration" in str(item.fspath):
item.add_marker(pytest.mark.integration)
elif "privacy" in item.name or "performance" in item.name:
if "privacy" in item.name:
item.add_marker(pytest.mark.privacy)
if "performance" in item.name:
item.add_marker(pytest.mark.performance)
else:
item.add_marker(pytest.mark.unit)
# Mark slow tests
if "slow" in item.name or any(mark.name == "slow" for mark in item.iter_markers()):
item.add_marker(pytest.mark.slow)
@pytest.fixture(autouse=True)
def setup_test_environment():
"""Set up test environment variables."""
# Ensure we're in test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
# Disable actual telemetry during tests unless explicitly enabled
if 'CRAWL4AI_TELEMETRY_TEST_REAL' not in os.environ:
os.environ['CRAWL4AI_TELEMETRY'] = '0'
yield
# Clean up after tests
test_vars = ['CRAWL4AI_TEST_MODE', 'CRAWL4AI_TELEMETRY_TEST_REAL']
for var in test_vars:
if var in os.environ:
del os.environ[var]
def pytest_report_header(config): # noqa: ARG001
"""Add information to pytest header."""
return [
"Crawl4AI Telemetry Tests",
f"Test mode: {'ENABLED' if os.environ.get('CRAWL4AI_TEST_MODE') else 'DISABLED'}",
f"Real telemetry: {'ENABLED' if os.environ.get('CRAWL4AI_TELEMETRY_TEST_REAL') else 'DISABLED'}"
]

View File

@@ -0,0 +1,216 @@
"""
Integration tests for telemetry CLI commands.
"""
import pytest
import subprocess
import sys
import os
from unittest.mock import patch, Mock
@pytest.mark.integration
class TestTelemetryCLI:
"""Test telemetry CLI commands integration."""
def test_telemetry_status_command(self, clean_environment, temp_config_dir):
"""Test the telemetry status CLI command."""
# Import with mocked config
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = 'not_set'
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test status command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'status']):
try:
main()
except SystemExit:
pass # CLI commands often call sys.exit()
def test_telemetry_enable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry enable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test enable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'enable', '--email', 'test@example.com']):
try:
main()
except SystemExit:
pass
def test_telemetry_disable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry disable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test disable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'disable']):
try:
main()
except SystemExit:
pass
@pytest.mark.slow
def test_cli_subprocess_integration(self, temp_config_dir):
"""Test CLI commands as subprocess calls."""
env = os.environ.copy()
env['CRAWL4AI_CONFIG_DIR'] = str(temp_config_dir)
# Test status command via subprocess
try:
result = subprocess.run(
[sys.executable, '-m', 'crawl4ai.cli', 'telemetry', 'status'],
env=env,
capture_output=True,
text=True,
timeout=10
)
# Should not crash, regardless of exit code
assert result.returncode in [0, 1] # May return 1 if not configured
except subprocess.TimeoutExpired:
pytest.skip("CLI command timed out")
except FileNotFoundError:
pytest.skip("CLI module not found")
@pytest.mark.integration
class TestAsyncWebCrawlerIntegration:
"""Test AsyncWebCrawler telemetry integration."""
@pytest.mark.asyncio
async def test_crawler_telemetry_decorator(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that AsyncWebCrawler methods are decorated with telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Check if the arun method has telemetry decoration
crawler = AsyncWebCrawler()
assert hasattr(crawler.arun, '__wrapped__') or callable(crawler.arun)
@pytest.mark.asyncio
async def test_crawler_exception_capture_integration(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that exceptions in AsyncWebCrawler are captured."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
with patch('crawl4ai.telemetry.capture_exception') as _mock_capture:
from crawl4ai import AsyncWebCrawler
async with AsyncWebCrawler() as crawler:
try:
# This should cause an exception
await crawler.arun(url="invalid://url")
except Exception:
pass # We expect this to fail
# The decorator should have attempted to capture the exception
# Note: This might not always be called depending on where the exception occurs
@pytest.mark.asyncio
async def test_crawler_with_disabled_telemetry(self, disabled_telemetry_config):
"""Test that AsyncWebCrawler works normally with disabled telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Should work normally even with telemetry disabled
async with AsyncWebCrawler() as crawler:
assert crawler is not None
@pytest.mark.integration
class TestDockerIntegration:
"""Test Docker environment telemetry integration."""
def test_docker_environment_detection(self, docker_environment, temp_config_dir):
"""Test that Docker environment is detected correctly."""
from crawl4ai.telemetry.environment import EnvironmentDetector
env = EnvironmentDetector.detect()
from crawl4ai.telemetry.environment import Environment
assert env == Environment.DOCKER
def test_docker_default_telemetry_enabled(self, temp_config_dir):
"""Test that telemetry is enabled by default in Docker."""
from crawl4ai.telemetry.environment import Environment
# Clear any existing environment variables that might interfere
with patch.dict(os.environ, {}, clear=True):
# Set only the Docker environment variable
os.environ['CRAWL4AI_DOCKER'] = 'true'
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to ALWAYS for Docker
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.ALWAYS
def test_docker_telemetry_can_be_disabled(self, temp_config_dir):
"""Test that Docker telemetry can be disabled via environment variable."""
from crawl4ai.telemetry.environment import Environment
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0', 'CRAWL4AI_DOCKER': 'true'}):
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to DENIED when env var is 0
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.DENIED
@pytest.mark.integration
class TestTelemetryProviderIntegration:
"""Test telemetry provider integration."""
def test_sentry_provider_initialization(self, enabled_telemetry_config):
"""Test that Sentry provider initializes correctly."""
try:
from crawl4ai.telemetry.providers.sentry import SentryProvider
provider = SentryProvider()
# Should not crash during initialization
assert provider is not None
except ImportError:
pytest.skip("Sentry provider not available")
def test_null_provider_fallback(self, disabled_telemetry_config):
"""Test that NullProvider is used when telemetry is disabled."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
def test_graceful_degradation_without_sentry(self, enabled_telemetry_config):
"""Test graceful degradation when sentry-sdk is not available."""
with patch.dict('sys.modules', {'sentry_sdk': None}):
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
# Should fall back to NullProvider when Sentry is not available
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,283 @@
"""
Privacy and performance tests for telemetry system.
"""
import pytest
import time
import asyncio
from unittest.mock import patch
from crawl4ai.telemetry import telemetry_decorator, async_telemetry_decorator, TelemetryManager
@pytest.mark.privacy
class TestTelemetryPrivacy:
"""Test privacy compliance of telemetry system."""
def test_no_url_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that URLs are not captured in telemetry data."""
# Ensure config is properly set for sending
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
# Mock the provider directly in the manager
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
# Create exception with URL in context
exception = ValueError("Test error")
context = {'url': privacy_test_data['url']}
manager.capture_exception(exception, context)
# Verify that the provider was called
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that context was passed to the provider (filtering happens in provider)
assert len(call_args) >= 2
def test_no_content_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that crawled content is not captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'content': privacy_test_data['content'],
'html': '<html><body>Private content</body></html>',
'text': 'Extracted private text'
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_no_pii_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that PII is not captured in telemetry."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = privacy_test_data['user_data'].copy()
context.update(privacy_test_data['pii'])
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_sanitized_context_captured(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that only safe context is captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'operation': 'crawl', # Safe to capture
'status_code': 404, # Safe to capture
'retry_count': 3, # Safe to capture
'user_email': 'secret@example.com', # Should be in context (not filtered at this level)
'content': 'private content' # Should be in context (not filtered at this level)
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Get the actual arguments passed to the mock
args, kwargs = call_args
assert len(args) >= 2, f"Expected at least 2 args, got {len(args)}"
# The second argument should be the context
captured_context = args[1]
# The basic context should be present (this tests the manager, not the provider filtering)
assert 'operation' in captured_context, f"operation not found in {captured_context}"
assert captured_context.get('operation') == 'crawl'
assert captured_context.get('status_code') == 404
assert captured_context.get('retry_count') == 3
@pytest.mark.performance
class TestTelemetryPerformance:
"""Test performance impact of telemetry system."""
def test_decorator_overhead_sync(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of sync telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with telemetry decorator."""
time.sleep(0.001) # Simulate small amount of work
return "success"
# Measure time with telemetry
start_time = time.time()
for _ in range(100):
test_function()
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead
assert telemetry_time < 1.0 # Should complete 100 calls in under 1 second
@pytest.mark.asyncio
async def test_decorator_overhead_async(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of async telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@async_telemetry_decorator
async def test_async_function():
"""Test async function with telemetry decorator."""
await asyncio.sleep(0.001) # Simulate small amount of async work
return "success"
# Measure time with telemetry
start_time = time.time()
tasks = [test_async_function() for _ in range(100)]
await asyncio.gather(*tasks)
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead to async operations
assert telemetry_time < 2.0 # Should complete 100 async calls in under 2 seconds
def test_disabled_telemetry_performance(self, disabled_telemetry_config):
"""Test that disabled telemetry has zero overhead."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with disabled telemetry."""
time.sleep(0.001)
return "success"
# Measure time with disabled telemetry
start_time = time.time()
for _ in range(100):
test_function()
disabled_time = time.time() - start_time
# Should be very fast when disabled
assert disabled_time < 0.5 # Should be faster than enabled telemetry
def test_telemetry_manager_initialization_performance(self):
"""Test that TelemetryManager initializes quickly."""
start_time = time.time()
# Initialize multiple managers (should use singleton)
for _ in range(10):
TelemetryManager.get_instance()
init_time = time.time() - start_time
# Initialization should be fast
assert init_time < 0.1 # Should initialize in under 100ms
def test_config_loading_performance(self, temp_config_dir):
"""Test that config loading is fast."""
from crawl4ai.telemetry.config import TelemetryConfig
# Create config with some data
config = TelemetryConfig(config_dir=temp_config_dir)
from crawl4ai.telemetry.config import TelemetryConsent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
start_time = time.time()
# Load config multiple times
for _ in range(100):
new_config = TelemetryConfig(config_dir=temp_config_dir)
new_config.get_consent()
load_time = time.time() - start_time
# Config loading should be fast
assert load_time < 0.5 # Should load 100 times in under 500ms
@pytest.mark.performance
class TestTelemetryScalability:
"""Test telemetry system scalability."""
def test_multiple_exception_capture(self, enabled_telemetry_config, mock_sentry_provider):
"""Test capturing multiple exceptions in sequence."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
start_time = time.time()
# Capture many exceptions
for i in range(50):
exception = ValueError(f"Test error {i}")
manager.capture_exception(exception, {'operation': f'test_{i}'})
capture_time = time.time() - start_time
# Should handle multiple exceptions efficiently
assert capture_time < 1.0 # Should capture 50 exceptions in under 1 second
assert mock_sentry_provider.send_exception.call_count <= 50 # May be less due to consent checks
@pytest.mark.asyncio
async def test_concurrent_exception_capture(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test concurrent exception capture performance."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
async def capture_exception_async(i):
exception = ValueError(f"Concurrent error {i}")
return manager.capture_exception(exception, {'operation': f'concurrent_{i}'})
start_time = time.time()
# Capture exceptions concurrently
tasks = [capture_exception_async(i) for i in range(20)]
await asyncio.gather(*tasks)
capture_time = time.time() - start_time
# Should handle concurrent exceptions efficiently
assert capture_time < 1.0 # Should capture 20 concurrent exceptions in under 1 second
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,241 @@
"""
Tests for Crawl4AI telemetry functionality.
"""
import pytest
import os
import tempfile
from pathlib import Path
import json
from unittest.mock import Mock, patch, MagicMock
from crawl4ai.telemetry import (
TelemetryManager,
capture_exception,
enable,
disable,
status
)
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment, EnvironmentDetector
from crawl4ai.telemetry.base import NullProvider
from crawl4ai.telemetry.consent import ConsentManager
class TestTelemetryConfig:
"""Test telemetry configuration management."""
def test_config_initialization(self):
"""Test config initialization with custom directory."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
assert config.config_dir == Path(tmpdir)
assert config.get_consent() == TelemetryConsent.NOT_SET
def test_consent_persistence(self):
"""Test that consent is saved and loaded correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
# Set consent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
# Create new config instance to test persistence
config2 = TelemetryConfig(config_dir=Path(tmpdir))
assert config2.get_consent() == TelemetryConsent.ALWAYS
assert config2.get_email() == "test@example.com"
def test_environment_variable_override(self):
"""Test that environment variables override config."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
config.set_consent(TelemetryConsent.ALWAYS)
# Set environment variable to disable
os.environ['CRAWL4AI_TELEMETRY'] = '0'
try:
config.update_from_env()
assert config.get_consent() == TelemetryConsent.DENIED
finally:
del os.environ['CRAWL4AI_TELEMETRY']
class TestEnvironmentDetection:
"""Test environment detection functionality."""
def test_cli_detection(self):
"""Test CLI environment detection."""
# Mock sys.stdin.isatty
with patch('sys.stdin.isatty', return_value=True):
env = EnvironmentDetector.detect()
# Should detect as CLI in most test environments
assert env in [Environment.CLI, Environment.UNKNOWN]
def test_docker_detection(self):
"""Test Docker environment detection."""
# Mock Docker environment
with patch.dict(os.environ, {'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.DOCKER
def test_api_server_detection(self):
"""Test API server detection."""
with patch.dict(os.environ, {'CRAWL4AI_API_SERVER': 'true', 'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.API_SERVER
class TestTelemetryManager:
"""Test the main telemetry manager."""
def test_singleton_pattern(self):
"""Test that TelemetryManager is a singleton."""
manager1 = TelemetryManager.get_instance()
manager2 = TelemetryManager.get_instance()
assert manager1 is manager2
def test_exception_capture(self):
"""Test exception capture functionality."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create manager with custom config dir
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.ALWAYS
mock_config.is_enabled.return_value = True
mock_config.should_send_current.return_value = True
mock_config.get_email.return_value = "test@example.com"
mock_config.update_from_env.return_value = None
MockConfig.return_value = mock_config
# Mock the provider setup
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as MockSentryProvider:
mock_provider = Mock()
mock_provider.initialize.return_value = True
mock_provider.send_exception.return_value = True
MockSentryProvider.return_value = mock_provider
manager = TelemetryManager()
# Test exception capture
test_exception = ValueError("Test error")
result = manager.capture_exception(test_exception, {'test': 'context'})
# Verify the exception was processed
assert mock_config.should_send_current.called
def test_null_provider_when_disabled(self):
"""Test that NullProvider is used when telemetry is disabled."""
with tempfile.TemporaryDirectory() as tmpdir:
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.DENIED
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider)
class TestConsentManager:
"""Test consent management functionality."""
def test_docker_default_enabled(self):
"""Test that Docker environment has telemetry enabled by default."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch('os.environ.get') as mock_env_get:
# Mock os.environ.get to return None for CRAWL4AI_TELEMETRY
mock_env_get.return_value = None
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent_manager.check_and_prompt()
# Should be enabled by default in Docker
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.ALWAYS
def test_docker_disabled_by_env(self):
"""Test that Docker telemetry can be disabled via environment variable."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0'}):
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent = consent_manager.check_and_prompt()
# Should be disabled
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.DENIED
class TestPublicAPI:
"""Test the public API functions."""
@patch('crawl4ai.telemetry.get_telemetry')
def test_enable_function(self, mock_get_telemetry):
"""Test the enable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
enable(email="test@example.com", always=True)
mock_manager.enable.assert_called_once_with(
email="test@example.com",
always=True,
once=False
)
@patch('crawl4ai.telemetry.get_telemetry')
def test_disable_function(self, mock_get_telemetry):
"""Test the disable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
disable()
mock_manager.disable.assert_called_once()
@patch('crawl4ai.telemetry.get_telemetry')
def test_status_function(self, mock_get_telemetry):
"""Test the status() function."""
mock_manager = Mock()
mock_manager.status.return_value = {
'enabled': True,
'consent': 'always',
'email': 'test@example.com'
}
mock_get_telemetry.return_value = mock_manager
result = status()
assert result['enabled'] is True
assert result['consent'] == 'always'
assert result['email'] == 'test@example.com'
class TestIntegration:
"""Integration tests for telemetry with AsyncWebCrawler."""
@pytest.mark.asyncio
async def test_crawler_exception_capture(self):
"""Test that AsyncWebCrawler captures exceptions."""
from crawl4ai import AsyncWebCrawler
with patch('crawl4ai.telemetry.capture_exception') as mock_capture:
# This should trigger an exception for invalid URL
async with AsyncWebCrawler() as crawler:
try:
# Use an invalid URL that will cause an error
result = await crawler.arun(url="not-a-valid-url")
except Exception:
pass
# Check if exception was captured (may not be called if error is handled)
# This is more of a smoke test to ensure the integration doesn't break
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,175 +0,0 @@
#!/usr/bin/env python3
"""
Final test and demo for HTTPS preservation feature (Issue #1410)
This demonstrates how the preserve_https_for_internal_links flag
prevents HTTPS downgrade when servers redirect to HTTP.
"""
import sys
import os
from urllib.parse import urljoin, urlparse
def demonstrate_issue():
"""Show the problem: HTTPS -> HTTP redirect causes HTTP links"""
print("=" * 60)
print("DEMONSTRATING THE ISSUE")
print("=" * 60)
# Simulate what happens during crawling
original_url = "https://quotes.toscrape.com/tag/deep-thoughts"
redirected_url = "http://quotes.toscrape.com/tag/deep-thoughts/" # Server redirects to HTTP
# Extract a relative link
relative_link = "/author/Albert-Einstein"
# Standard URL joining uses the redirected (HTTP) base
resolved_url = urljoin(redirected_url, relative_link)
print(f"Original URL: {original_url}")
print(f"Redirected to: {redirected_url}")
print(f"Relative link: {relative_link}")
print(f"Resolved link: {resolved_url}")
print(f"\n❌ Problem: Link is now HTTP instead of HTTPS!")
return resolved_url
def demonstrate_solution():
"""Show the solution: preserve HTTPS for internal links"""
print("\n" + "=" * 60)
print("DEMONSTRATING THE SOLUTION")
print("=" * 60)
# Our normalize_url with HTTPS preservation
def normalize_url_with_preservation(href, base_url, preserve_https=False, original_scheme=None):
"""Normalize URL with optional HTTPS preservation"""
# Standard resolution
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only for same-domain links
if parsed_full.scheme == 'http' and parsed_full.netloc == parsed_base.netloc:
full_url = full_url.replace('http://', 'https://', 1)
print(f" → Preserved HTTPS for {parsed_full.netloc}")
return full_url
# Same scenario as before
original_url = "https://quotes.toscrape.com/tag/deep-thoughts"
redirected_url = "http://quotes.toscrape.com/tag/deep-thoughts/"
relative_link = "/author/Albert-Einstein"
# Without preservation (current behavior)
resolved_without = normalize_url_with_preservation(
relative_link, redirected_url,
preserve_https=False, original_scheme='https'
)
print(f"\nWithout preservation:")
print(f" Result: {resolved_without}")
# With preservation (new feature)
resolved_with = normalize_url_with_preservation(
relative_link, redirected_url,
preserve_https=True, original_scheme='https'
)
print(f"\nWith preservation (preserve_https_for_internal_links=True):")
print(f" Result: {resolved_with}")
print(f"\n✅ Solution: Internal link stays HTTPS!")
return resolved_with
def test_edge_cases():
"""Test important edge cases"""
print("\n" + "=" * 60)
print("EDGE CASES")
print("=" * 60)
from urllib.parse import urljoin, urlparse
def preserve_https(href, base_url, original_scheme):
"""Helper to test preservation logic"""
full_url = urljoin(base_url, href)
if original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Fixed: check for protocol-relative URLs
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
return full_url
test_cases = [
# (description, href, base_url, original_scheme, should_be_https)
("External link", "http://other.com/page", "http://example.com", "https", False),
("Already HTTPS", "/page", "https://example.com", "https", True),
("No original HTTPS", "/page", "http://example.com", "http", False),
("Subdomain", "/page", "http://sub.example.com", "https", True),
("Protocol-relative", "//example.com/page", "http://example.com", "https", False),
]
for desc, href, base_url, orig_scheme, should_be_https in test_cases:
result = preserve_https(href, base_url, orig_scheme)
is_https = result.startswith('https://')
status = "" if is_https == should_be_https else ""
print(f"\n{status} {desc}:")
print(f" Input: {href} + {base_url}")
print(f" Result: {result}")
print(f" Expected HTTPS: {should_be_https}, Got: {is_https}")
def usage_example():
"""Show how to use the feature in crawl4ai"""
print("\n" + "=" * 60)
print("USAGE IN CRAWL4AI")
print("=" * 60)
print("""
To enable HTTPS preservation in your crawl4ai code:
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
preserve_https_for_internal_links=True # Enable HTTPS preservation
)
result = await crawler.arun(
url="https://example.com",
config=config
)
# All internal links will maintain HTTPS even if
# the server redirects to HTTP
```
This is especially useful for:
- Sites that redirect HTTPS to HTTP but still support HTTPS
- Security-conscious crawling where you want to stay on HTTPS
- Avoiding mixed content issues in downstream processing
""")
if __name__ == "__main__":
# Run all demonstrations
demonstrate_issue()
demonstrate_solution()
test_edge_cases()
usage_example()
print("\n" + "=" * 60)
print("✅ All tests complete!")
print("=" * 60)

View File

@@ -1,849 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive test suite for URL normalization functions in utils.py
Tests all scenarios and edge cases for the updated normalize_url functions.
"""
import sys
import os
import time
from pathlib import Path
from urllib.parse import urljoin, urlparse, urlunparse, parse_qsl, urlencode
# Add the crawl4ai package to the path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import only the specific functions we need to test
from crawl4ai.utils import get_base_domain, is_external_url
# ANSI Color codes for beautiful console output
class Colors:
# Basic colors
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
# Bright colors
BRIGHT_RED = '\033[91;1m'
BRIGHT_GREEN = '\033[92;1m'
BRIGHT_YELLOW = '\033[93;1m'
BRIGHT_BLUE = '\033[94;1m'
BRIGHT_MAGENTA = '\033[95;1m'
BRIGHT_CYAN = '\033[96;1m'
BRIGHT_WHITE = '\033[97;1m'
# Background colors
BG_RED = '\033[41m'
BG_GREEN = '\033[42m'
BG_YELLOW = '\033[43m'
BG_BLUE = '\033[44m'
# Text styles
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
# Icons
CHECK = ''
CROSS = ''
WARNING = ''
INFO = ''
STAR = ''
FIRE = '🔥'
ROCKET = '🚀'
TARGET = '🎯'
def colorize(text, color):
"""Apply color to text"""
return f"{color}{text}{Colors.RESET}"
def print_header(title, icon=""):
"""Print a formatted header"""
width = 80
print(f"\n{Colors.BG_BLUE}{Colors.WHITE}{Colors.BOLD}{'=' * width}{Colors.RESET}")
if icon:
print(f"{Colors.BG_BLUE}{Colors.WHITE}{Colors.BOLD}{' ' * ((width - len(title) - len(icon) - 1) // 2)}{icon} {title}{' ' * ((width - len(title) - len(icon) - 1) // 2)}{Colors.RESET}")
else:
print(f"{Colors.BG_BLUE}{Colors.WHITE}{Colors.BOLD}{' ' * ((width - len(title)) // 2)}{title}{' ' * ((width - len(title)) // 2)}{Colors.RESET}")
print(f"{Colors.BG_BLUE}{Colors.WHITE}{Colors.BOLD}{'=' * width}{Colors.RESET}")
def print_section(title, icon=""):
"""Print a formatted section header"""
if icon:
print(f"\n{Colors.CYAN}{Colors.BOLD}{icon} {title}{Colors.RESET}")
else:
print(f"\n{Colors.CYAN}{Colors.BOLD}{title}{Colors.RESET}")
print(f"{Colors.CYAN}{'-' * (len(title) + (len(icon) + 1 if icon else 0))}{Colors.RESET}")
def print_success(message):
"""Print success message"""
print(f"{Colors.GREEN}{Colors.CHECK} {message}{Colors.RESET}")
def print_error(message):
"""Print error message"""
print(f"{Colors.RED}{Colors.CROSS} {message}{Colors.RESET}")
def print_warning(message):
"""Print warning message"""
print(f"{Colors.YELLOW}{Colors.WARNING} {message}{Colors.RESET}")
def print_info(message):
"""Print info message"""
print(f"{Colors.BLUE}{Colors.INFO} {message}{Colors.RESET}")
def print_test_result(test_name, passed, expected=None, actual=None):
"""Print formatted test result"""
if passed:
print(f" {Colors.GREEN}{Colors.CHECK} {test_name}{Colors.RESET}")
else:
print(f" {Colors.RED}{Colors.CROSS} {test_name}{Colors.RESET}")
if expected is not None and actual is not None:
print(f" {Colors.BRIGHT_RED}Expected: {expected}{Colors.RESET}")
print(f" {Colors.BRIGHT_RED}Actual: {actual}{Colors.RESET}")
def print_progress(current, total, test_name=""):
"""Print progress indicator"""
percentage = (current / total) * 100
bar_length = 40
filled_length = int(bar_length * current // total)
bar = '' * filled_length + '' * (bar_length - filled_length)
sys.stdout.write(f'\r{Colors.CYAN}Progress: [{bar}] {percentage:.1f}% ({current}/{total}) {test_name}{Colors.RESET}')
sys.stdout.flush()
if current == total:
print() # New line when complete
# Copy the normalize_url functions directly to avoid import issues
def normalize_url(
href: str,
base_url: str,
*,
drop_query_tracking=True,
sort_query=True,
keep_fragment=False,
extra_drop_params=None,
preserve_https=False,
original_scheme=None
):
"""
Extended URL normalizer with fixes for edge cases - copied from utils.py for testing
"""
if not href or not href.strip():
return None
# Resolve relative paths first
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse once, edit parts, then rebuild
parsed = urlparse(full_url)
# ── netloc ──
netloc = parsed.netloc.lower()
# Remove default ports
if ':' in netloc:
host, port = netloc.rsplit(':', 1)
if (parsed.scheme == 'http' and port == '80') or (parsed.scheme == 'https' and port == '443'):
netloc = host
else:
netloc = f"{host}:{port}"
# ── path ──
# Strip duplicate slashes and trailing "/" (except root)
# IMPORTANT: Don't use quote(unquote()) as it mangles + signs in URLs
# The path from urlparse is already properly encoded
path = parsed.path
if path.endswith('/') and path != '/':
path = path.rstrip('/')
# ── query ──
query = parsed.query
if query:
# explode, mutate, then rebuild
params = list(parse_qsl(query, keep_blank_values=True)) # Parse query string into key-value pairs, preserving blank values
if drop_query_tracking:
# Define default tracking parameters to remove for cleaner URLs
default_tracking = {
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term',
'utm_content', 'gclid', 'fbclid', 'ref', 'ref_src'
}
if extra_drop_params:
default_tracking |= {p.lower() for p in extra_drop_params} # Add any extra parameters to drop, case-insensitive
params = [(k, v) for k, v in params if k not in default_tracking] # Filter out tracking parameters
# Normalize parameter keys to lowercase
params = [(k.lower(), v) for k, v in params]
if sort_query:
params.sort(key=lambda kv: kv[0]) # Sort parameters alphabetically by key (now lowercase)
query = urlencode(params, doseq=True) if params else '' # Rebuild query string, handling sequences properly
# ── fragment ──
fragment = parsed.fragment if keep_fragment else ''
# Re-assemble
normalized = urlunparse((
parsed.scheme,
netloc,
path,
parsed.params,
query,
fragment
))
return normalized
def normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
"""Normalize URLs for deep crawling - copied from utils.py for testing"""
if not href:
return None
# Use urljoin to handle relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse the URL for normalization
parsed = urlparse(full_url)
# Convert hostname to lowercase
netloc = parsed.netloc.lower()
# Remove fragment entirely
fragment = ''
# Normalize query parameters if needed
query = parsed.query
if query:
# Parse query parameters
params = parse_qsl(query)
# Remove tracking parameters (example - customize as needed)
tracking_params = ['utm_source', 'utm_medium', 'utm_campaign', 'ref', 'fbclid']
params = [(k, v) for k, v in params if k not in tracking_params]
# Rebuild query string, sorted for consistency
query = urlencode(params, doseq=True) if params else ''
# Build normalized URL
normalized = urlunparse((
parsed.scheme,
netloc,
parsed.path.rstrip('/'), # Normalize trailing slash
parsed.params,
query,
fragment
))
return normalized
def efficient_normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
"""Efficient URL normalization with proper parsing - copied from utils.py for testing"""
if not href:
return None
# Resolve relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Use proper URL parsing
parsed = urlparse(full_url)
# Only perform the most critical normalizations
# 1. Lowercase hostname
# 2. Remove fragment
normalized = urlunparse((
parsed.scheme,
parsed.netloc.lower(),
parsed.path.rstrip('/'),
parsed.params,
parsed.query,
'' # Remove fragment
))
return normalized
class URLNormalizationTestSuite:
"""Comprehensive test suite for URL normalization functions"""
def __init__(self):
self.base_url = "https://example.com/path/page.html"
self.https_base_url = "https://example.com/path/page.html"
self.http_base_url = "http://example.com/path/page.html"
self.tests_run = 0
self.tests_passed = 0
self.tests_failed = []
self.test_start_time = None
self.section_stats = {}
self.current_section = None
def start_section(self, section_name, icon=""):
"""Start a new test section"""
self.current_section = section_name
if section_name not in self.section_stats:
self.section_stats[section_name] = {'run': 0, 'passed': 0, 'failed': 0}
print_section(section_name, icon)
def assert_equal(self, actual, expected, test_name):
"""Assert that actual equals expected"""
self.tests_run += 1
if self.current_section:
self.section_stats[self.current_section]['run'] += 1
if actual == expected:
self.tests_passed += 1
if self.current_section:
self.section_stats[self.current_section]['passed'] += 1
print_test_result(test_name, True)
else:
self.tests_failed.append({
'name': test_name,
'expected': expected,
'actual': actual,
'section': self.current_section
})
if self.current_section:
self.section_stats[self.current_section]['failed'] += 1
print_test_result(test_name, False, expected, actual)
def assert_none(self, actual, test_name):
"""Assert that actual is None"""
self.assert_equal(actual, None, test_name)
def test_basic_url_resolution(self):
"""Test basic relative and absolute URL resolution"""
self.start_section("Basic URL Resolution", Colors.TARGET)
# Absolute URLs should remain unchanged
self.assert_equal(
normalize_url("https://other.com/page.html", self.base_url),
"https://other.com/page.html",
"Absolute URL unchanged"
)
# Relative URLs
self.assert_equal(
normalize_url("relative.html", self.base_url),
"https://example.com/path/relative.html",
"Relative URL resolution"
)
self.assert_equal(
normalize_url("./relative.html", self.base_url),
"https://example.com/path/relative.html",
"Relative URL with dot"
)
self.assert_equal(
normalize_url("../relative.html", self.base_url),
"https://example.com/relative.html",
"Parent directory resolution"
)
# Root-relative URLs
self.assert_equal(
normalize_url("/root.html", self.base_url),
"https://example.com/root.html",
"Root-relative URL"
)
# Protocol-relative URLs
self.assert_equal(
normalize_url("//cdn.example.com/asset.js", self.base_url),
"https://cdn.example.com/asset.js",
"Protocol-relative URL"
)
def test_query_parameter_handling(self):
"""Test query parameter sorting and tracking removal"""
self.start_section("Query Parameter Handling", Colors.STAR)
# Basic query parameters
self.assert_equal(
normalize_url("https://example.com?page=1&sort=name", self.base_url),
"https://example.com?page=1&sort=name",
"Basic query parameters sorted"
)
# Tracking parameters removal
self.assert_equal(
normalize_url("https://example.com?utm_source=google&utm_medium=email&page=1", self.base_url),
"https://example.com?page=1",
"Tracking parameters removed"
)
# Mixed tracking and valid parameters
self.assert_equal(
normalize_url("https://example.com?fbclid=123&utm_campaign=test&category=news&id=456", self.base_url),
"https://example.com?category=news&id=456",
"Mixed tracking and valid parameters"
)
# Empty query values
self.assert_equal(
normalize_url("https://example.com?page=&sort=name", self.base_url),
"https://example.com?page=&sort=name",
"Empty query values preserved"
)
# Disable tracking removal
self.assert_equal(
normalize_url("https://example.com?utm_source=google&page=1", self.base_url, drop_query_tracking=False),
"https://example.com?page=1&utm_source=google",
"Tracking parameters preserved when disabled"
)
# Disable sorting
self.assert_equal(
normalize_url("https://example.com?z=1&a=2", self.base_url, sort_query=False),
"https://example.com?z=1&a=2",
"Query parameters not sorted when disabled"
)
def test_fragment_handling(self):
"""Test fragment/hash handling"""
self.start_section("Fragment Handling", Colors.FIRE)
# Fragments removed by default
self.assert_equal(
normalize_url("https://example.com/page.html#section", self.base_url),
"https://example.com/page.html",
"Fragment removed by default"
)
# Fragments preserved when requested
self.assert_equal(
normalize_url("https://example.com/page.html#section", self.base_url, keep_fragment=True),
"https://example.com/page.html#section",
"Fragment preserved when requested"
)
# Fragments with query parameters
self.assert_equal(
normalize_url("https://example.com?page=1#section", self.base_url, keep_fragment=True),
"https://example.com?page=1#section",
"Fragment with query parameters"
)
def test_https_preservation(self):
"""Test HTTPS preservation logic"""
self.start_section("HTTPS Preservation", Colors.ROCKET)
# Same domain HTTP to HTTPS
self.assert_equal(
normalize_url("http://example.com/page.html", self.https_base_url, preserve_https=True, original_scheme='https'),
"https://example.com/page.html",
"HTTP to HTTPS for same domain"
)
# Different domain should not change
self.assert_equal(
normalize_url("http://other.com/page.html", self.https_base_url, preserve_https=True, original_scheme='https'),
"http://other.com/page.html",
"Different domain HTTP unchanged"
)
# Protocol-relative should follow base
self.assert_equal(
normalize_url("//example.com/page.html", self.https_base_url, preserve_https=True, original_scheme='https'),
"https://example.com/page.html",
"Protocol-relative follows base scheme"
)
def test_edge_cases(self):
"""Test edge cases and error conditions"""
self.start_section("Edge Cases", Colors.WARNING)
# None and empty inputs
result = normalize_url(None, self.base_url) # type: ignore
self.assert_none(result, "None input")
self.assert_none(normalize_url("", self.base_url), "Empty string input")
self.assert_none(normalize_url(" ", self.base_url), "Whitespace only input")
# Malformed URLs
try:
normalize_url("not-a-url", "invalid-base")
print("✗ Should have raised ValueError for invalid base URL")
except ValueError:
print("✓ Correctly raised ValueError for invalid base URL")
# Special protocols
self.assert_equal(
normalize_url("mailto:test@example.com", self.base_url),
"mailto:test@example.com",
"Mailto protocol preserved"
)
self.assert_equal(
normalize_url("tel:+1234567890", self.base_url),
"tel:+1234567890",
"Tel protocol preserved"
)
self.assert_equal(
normalize_url("javascript:void(0)", self.base_url),
"javascript:void(0)",
"JavaScript protocol preserved"
)
def test_case_sensitivity(self):
"""Test case sensitivity handling"""
self.start_section("Case Sensitivity", Colors.INFO)
# Domain case normalization
self.assert_equal(
normalize_url("https://EXAMPLE.COM/page.html", self.base_url),
"https://example.com/page.html",
"Domain case normalization"
)
# Mixed case paths
self.assert_equal(
normalize_url("https://example.com/PATH/Page.HTML", self.base_url),
"https://example.com/PATH/Page.HTML",
"Path case preserved"
)
# Query parameter case
self.assert_equal(
normalize_url("https://example.com?PARAM=value", self.base_url),
"https://example.com?param=value",
"Query parameter case normalization"
)
def test_unicode_and_special_chars(self):
"""Test Unicode and special characters"""
self.start_section("Unicode & Special Characters", "🌍")
# Unicode in path
self.assert_equal(
normalize_url("https://example.com/café.html", self.base_url),
"https://example.com/café.html",
"Unicode characters in path"
)
# Encoded characters
self.assert_equal(
normalize_url("https://example.com/caf%C3%A9.html", self.base_url),
"https://example.com/caf%C3%A9.html",
"URL-encoded characters preserved"
)
# Spaces in URLs
self.assert_equal(
normalize_url("https://example.com/page with spaces.html", self.base_url),
"https://example.com/page with spaces.html",
"Spaces in URLs handled"
)
def test_port_numbers(self):
"""Test port number handling"""
self.start_section("Port Numbers", "🔌")
# Default ports
self.assert_equal(
normalize_url("https://example.com:443/page.html", self.base_url),
"https://example.com/page.html",
"Default HTTPS port removed"
)
self.assert_equal(
normalize_url("http://example.com:80/page.html", self.base_url),
"http://example.com/page.html",
"Default HTTP port removed"
)
# Non-default ports
self.assert_equal(
normalize_url("https://example.com:8443/page.html", self.base_url),
"https://example.com:8443/page.html",
"Non-default port preserved"
)
def test_trailing_slashes(self):
"""Test trailing slash normalization"""
self.start_section("Trailing Slashes", "📁")
# Remove trailing slash from paths
self.assert_equal(
normalize_url("https://example.com/path/", self.base_url),
"https://example.com/path",
"Trailing slash removed from path"
)
# Preserve root trailing slash
self.assert_equal(
normalize_url("https://example.com/", self.base_url),
"https://example.com/",
"Root trailing slash preserved"
)
# Multiple trailing slashes
self.assert_equal(
normalize_url("https://example.com/path//", self.base_url),
"https://example.com/path",
"Multiple trailing slashes normalized"
)
def test_deep_crawl_functions(self):
"""Test deep crawl specific normalization functions"""
self.start_section("Deep Crawl Functions", "🔍")
# Test normalize_url_for_deep_crawl
result = normalize_url_for_deep_crawl("https://EXAMPLE.COM/path/?utm_source=test&page=1", self.base_url)
expected = "https://example.com/path?page=1"
self.assert_equal(result, expected, "Deep crawl normalization")
# Test efficient version
result = efficient_normalize_url_for_deep_crawl("https://EXAMPLE.COM/path/#fragment", self.base_url)
expected = "https://example.com/path"
self.assert_equal(result, expected, "Efficient deep crawl normalization")
def test_base_domain_extraction(self):
"""Test base domain extraction"""
self.start_section("Base Domain Extraction", "🏠")
self.assert_equal(
get_base_domain("https://www.example.com/path"),
"example.com",
"WWW prefix removed"
)
self.assert_equal(
get_base_domain("https://sub.example.co.uk/path"),
"example.co.uk",
"Special TLD handled"
)
self.assert_equal(
get_base_domain("https://example.com:8080/path"),
"example.com",
"Port removed"
)
def test_external_url_detection(self):
"""Test external URL detection"""
self.start_section("External URL Detection", "🌐")
self.assert_equal(
is_external_url("https://other.com/page.html", "example.com"),
True,
"Different domain is external"
)
self.assert_equal(
is_external_url("https://www.example.com/page.html", "example.com"),
False,
"Same domain with www is internal"
)
self.assert_equal(
is_external_url("mailto:test@example.com", "example.com"),
True,
"Special protocol is external"
)
def run_all_tests(self):
"""Run all test suites"""
print_header("🚀 URL Normalization Test Suite", Colors.ROCKET)
self.test_start_time = time.time()
# Run all test sections
sections = [
("Basic URL Resolution", Colors.TARGET, self.test_basic_url_resolution),
("Query Parameter Handling", Colors.STAR, self.test_query_parameter_handling),
("Fragment Handling", Colors.FIRE, self.test_fragment_handling),
("HTTPS Preservation", Colors.ROCKET, self.test_https_preservation),
("Edge Cases", Colors.WARNING, self.test_edge_cases),
("Case Sensitivity", Colors.INFO, self.test_case_sensitivity),
("Unicode & Special Characters", "🌍", self.test_unicode_and_special_chars),
("Port Numbers", "🔌", self.test_port_numbers),
("Trailing Slashes", "📁", self.test_trailing_slashes),
("Deep Crawl Functions", "🔍", self.test_deep_crawl_functions),
("Base Domain Extraction", "🏠", self.test_base_domain_extraction),
("External URL Detection", "🌐", self.test_external_url_detection),
]
total_sections = len(sections)
for i, (section_name, icon, test_method) in enumerate(sections, 1):
print_progress(i - 1, total_sections, f"Running {section_name}")
test_method()
print_progress(i, total_sections, f"Completed {section_name}")
# Calculate execution time
execution_time = time.time() - self.test_start_time
# Print comprehensive statistics
self.print_comprehensive_stats(execution_time)
return len(self.tests_failed) == 0
def print_comprehensive_stats(self, execution_time):
"""Print comprehensive test statistics"""
print_header("📊 Test Results Summary", "📈")
# Overall statistics
success_rate = (self.tests_passed / self.tests_run * 100) if self.tests_run > 0 else 0
print(f"{Colors.BOLD}Overall Statistics:{Colors.RESET}")
print(f" Total Tests: {Colors.CYAN}{self.tests_run}{Colors.RESET}")
print(f" Passed: {Colors.GREEN}{self.tests_passed}{Colors.RESET}")
print(f" Failed: {Colors.RED}{len(self.tests_failed)}{Colors.RESET}")
print(f" Success Rate: {Colors.BRIGHT_CYAN}{success_rate:.1f}%{Colors.RESET}")
print(f" Execution Time: {Colors.YELLOW}{execution_time:.2f}s{Colors.RESET}")
# Performance indicator
if success_rate == 100:
print_success("🎉 Perfect! All tests passed!")
elif success_rate >= 90:
print_success("✅ Excellent! Nearly perfect results!")
elif success_rate >= 75:
print_warning("⚠️ Good results, but some improvements needed")
else:
print_error("❌ Significant issues detected - review failures below")
# Section-by-section breakdown
if self.section_stats:
print(f"\n{Colors.BOLD}Section Breakdown:{Colors.RESET}")
for section_name, stats in self.section_stats.items():
section_success_rate = (stats['passed'] / stats['run'] * 100) if stats['run'] > 0 else 0
status_icon = Colors.CHECK if stats['failed'] == 0 else Colors.CROSS
status_color = Colors.GREEN if stats['failed'] == 0 else Colors.RED
print(f" {status_icon} {section_name}: {Colors.CYAN}{stats['run']}{Colors.RESET} tests, "
f"{status_color}{stats['passed']} passed{Colors.RESET}, "
f"{Colors.RED}{stats['failed']} failed{Colors.RESET} "
f"({Colors.BRIGHT_CYAN}{section_success_rate:.1f}%{Colors.RESET})")
# Failed tests details
if self.tests_failed:
print(f"\n{Colors.BOLD}{Colors.RED}Failed Tests Details:{Colors.RESET}")
for i, failure in enumerate(self.tests_failed, 1):
print(f" {Colors.RED}{i}. {failure['name']}{Colors.RESET}")
if 'section' in failure and failure['section']:
print(f" Section: {Colors.YELLOW}{failure['section']}{Colors.RESET}")
print(f" Expected: {Colors.BRIGHT_RED}{failure['expected']}{Colors.RESET}")
print(f" Actual: {Colors.BRIGHT_RED}{failure['actual']}{Colors.RESET}")
print()
# Recommendations
if self.tests_failed:
print(f"{Colors.BOLD}{Colors.YELLOW}Recommendations:{Colors.RESET}")
print(f" • Review the {len(self.tests_failed)} failed test(s) above")
print(" • Check URL normalization logic for edge cases")
print(" • Verify query parameter handling")
print(" • Test with real-world URLs")
else:
print(f"\n{Colors.BOLD}{Colors.GREEN}Recommendations:{Colors.RESET}")
print(" • All tests passed! URL normalization is working correctly")
print(" • Consider adding more edge cases for future robustness")
print(" • Monitor performance with large-scale crawling")
def test_crawling_integration():
"""Test integration with crawling scripts"""
print_section("Crawling Integration Test", "🔗")
# Test URLs that would be encountered in real crawling
test_urls = [
"https://example.com/blog/post?utm_source=newsletter&utm_medium=email",
"https://example.com/products?page=1&sort=price&ref=search",
"/about.html",
"../contact.html",
"//cdn.example.com/js/main.js",
"mailto:support@example.com",
"#top",
"",
None,
]
base_url = "https://example.com/current/page.html"
print("Testing real-world URL scenarios:")
for url in test_urls:
try:
normalized = normalize_url(url, base_url)
print(f" {url} -> {normalized}")
except (ValueError, TypeError) as e:
print(f" {url} -> ERROR: {e}")
if __name__ == "__main__":
print_header("🧪 URL Normalization Comprehensive Test Suite", "🧪")
print_info("Testing URL normalization functions with comprehensive scenarios and edge cases")
print()
# Run the test suite
test_suite = URLNormalizationTestSuite()
success = test_suite.run_all_tests()
# Run integration tests
print()
test_crawling_integration()
# Final summary
print()
print_header("🏁 Final Test Summary", "🏁")
if success:
print_success("🎉 ALL TESTS PASSED! URL normalization is working perfectly!")
print_info("The updated URL normalization functions are ready for production use.")
else:
print_error("❌ SOME TESTS FAILED! Please review the issues above.")
print_warning("URL normalization may have issues that need to be addressed before deployment.")
print()
print_info("Test suite completed. Check the results above for detailed analysis.")
# Exit with appropriate code
sys.exit(0 if success else 1)