Compare commits

..

48 Commits

Author SHA1 Message Date
AHMET YILMAZ
e3467c08f6 #1490 feat(ManagedBrowser): add viewport size configuration for browser launch 2025-09-17 17:40:38 +08:00
Nasrin
3899ac3d3b Merge pull request #1464 from unclecode/fix/proxy_deprecation
Fix/proxy deprecation
2025-09-16 15:48:45 +08:00
Nasrin
23431d8109 Merge pull request #1389 from unclecode/fix/deep-crawl-scoring
fix(deep-crawl): BestFirst priority inversion
2025-09-16 15:45:54 +08:00
AHMET YILMAZ
1717827732 refactor(BrowserConfig): change deprecation warning for 'proxy' parameter to UserWarning 2025-09-12 11:10:38 +08:00
Nasrin
f8eaf01ed1 Merge pull request #1467 from unclecode/fix/request-crawl-stream
Fix: request /crawl with stream: true issue
2025-09-11 17:40:43 +08:00
Nasrin
14b42b1f9a Merge pull request #1471 from unclecode/fix/adaptive-crawler-llm-config
Fix: allow custom LLM providers for adaptive crawler embedding config…
2025-09-09 12:56:33 +08:00
ntohidi
3bc56dd028 fix: allow custom LLM providers for adaptive crawler embedding config. ref: #1291
- Change embedding_llm_config from Dict to Union[LLMConfig, Dict] for type safety
  - Add backward-compatible conversion property _embedding_llm_config_dict
  - Replace all hardcoded OpenAI embedding configs with configurable options
  - Fix LLMConfig object attribute access in query expansion logic
  - Add comprehensive example demonstrating multiple provider configurations
  - Update documentation with both LLMConfig object and dictionary usage patterns

  Users can now specify any LLM provider for query expansion in embedding strategy:
  - New: embedding_llm_config=LLMConfig(provider='anthropic/claude-3', api_token='key')
  - Old: embedding_llm_config={'provider': 'openai/gpt-4', 'api_token': 'key'} (still works)
2025-09-09 12:49:55 +08:00
AHMET YILMAZ
1874a7b8d2 fix: update option labels in request builder for clarity 2025-09-05 17:06:25 +08:00
Nasrin
0482c1eafc Merge pull request #1469 from unclecode/fix/docker-jwt
Fix(auth): Fixed Docker JWT authentication
2025-09-04 15:00:15 +08:00
AHMET YILMAZ
6a3b3e9d38 Commit without API 2025-09-03 17:02:40 +08:00
Nasrin
1eacea1d2d Merge pull request #1432 from unclecode/example/web2api-example
feat: Add comprehensive website to API example with frontend
2025-09-03 16:30:39 +08:00
Nasrin
bc6d8147d2 Merge pull request #1451 from unclecode/fix/remove-python3.9-version
Remove python 3.9 from supported versions and require Python >= 3.10
2025-09-02 16:50:40 +08:00
ntohidi
487839640f fix: raise error on last attempt failure in perform_completion_with_backoff. ref #989 2025-09-02 16:49:01 +08:00
ntohidi
6772134a3a remove: delete unused yoyo snapshot subproject 2025-09-02 12:07:08 +08:00
Nasrin
ae67d66b81 Merge pull request #1454 from nafeqq-1306/docstring-changes
issue #1329: Docs are not detected due to triplequotes not being first line
2025-09-02 11:59:59 +08:00
Nasrin
af28e84a21 Merge pull request #1441 from unclecode/fix/improve-docker-error-handling
Improve docker error handling
2025-09-02 11:56:01 +08:00
Nasrin
5e7fcb17e1 Merge pull request #1448 from unclecode/fix/https-reditrect
feat: add preserve_https_for_internal_links flag to maintain HTTPS during crawling
2025-09-01 16:11:25 +08:00
ntohidi
6e728096fa fix(auth): fixed Docker JWT authentication. ref #1442 2025-09-01 12:48:16 +08:00
Nasrin
2de200c1ba Merge pull request #1433 from Thermofish/fix/excluded_selector
fix(deps): reintroduce cssselect to restore excluded_selector support (#1405)
2025-08-29 16:08:24 +08:00
nafeqq-1306
9749e2832d issue #1329 refactor(crawler): move unwanted properties to CrawlerRunConfig class 2025-08-29 10:20:47 +08:00
Soham Kukreti
70f473b84d fix: drop Python 3.9 support and require Python >=3.10.
The library no longer supports Python 3.9 and so it was important to drop all references to python 3.9.
Following changes have been made:
- pyproject.toml: set requires-python to ">=3.10"; remove 3.9 classifier
- setup.py: set python_requires to ">=3.10"; remove 3.9 classifier
- docs: update Python version mentions
  - deploy/docker/c4ai-doc-context.md: options -> 3.10, 3.11, 3.12, 3.13
2025-08-28 19:31:19 +05:30
ntohidi
bdacf61ca9 feat: update documentation for preserve_https_for_internal_links. ref #1410 2025-08-28 17:48:12 +08:00
ntohidi
f566c5a376 feat: add preserve_https_for_internal_links flag to maintain HTTPS during crawling. Ref #1410
Added a new `preserve_https_for_internal_links` configuration flag that preserves the original HTTPS scheme for same-domain links even when the server redirects to HTTP.
2025-08-28 17:38:40 +08:00
AHMET YILMAZ
4ed33fce9e Remove deprecated test for 'proxy' parameter in BrowserConfig and update .gitignore to include test_scripts directory. 2025-08-28 17:26:10 +08:00
AHMET YILMAZ
f7a3366f72 #1375 : refactor(proxy) Deprecate 'proxy' parameter in BrowserConfig and enhance proxy string parsing
- Updated ProxyConfig.from_string to support multiple proxy formats, including URLs with credentials.
- Deprecated the 'proxy' parameter in BrowserConfig, replacing it with 'proxy_config' for better flexibility.
- Added warnings for deprecated usage and clarified behavior when both parameters are provided.
- Updated documentation and tests to reflect changes in proxy configuration handling.
2025-08-28 17:21:49 +08:00
Nasrin
4e1c4bd24e Merge pull request #1436 from unclecode/fix/docker-filter
fix(docker): resolve filter serialization and JSON encoding errors in deep crawl strategy
2025-08-27 11:08:42 +08:00
Soham Kukreti
2ad3fb5fc8 feat(docker): improve docker error handling
- Return comprehensive error messages along with status codes for api internal errors.
- Fix fit_html property serialization issue in both /crawl and /crawl/stream endpoints
- Add sanitization to ensure fit_html is always JSON-serializable (string or None)
- Add comprehensive error handling test suite.
2025-08-26 23:18:35 +05:30
Nasrin
cce3390a2d Merge pull request #1426 from unclecode/fix/update-quickstart-and-adaptive-strategies-docs
Update Quickstart and Adaptive Strategies documentation
2025-08-26 16:53:47 +08:00
Nasrin
4fe2d01361 Merge pull request #1440 from unclecode/feature/docker-llm-parameters
feat(docker): Add temperature and base_url parameters for LLM configuration
2025-08-26 16:48:17 +08:00
ntohidi
159207b86f feat(docker): Add temperature and base_url parameters for LLM configuration. ref #1035
Implement hierarchical configuration for LLM parameters with support for:
  - Temperature control (0.0-2.0) to adjust response creativity
  - Custom base_url for proxy servers and alternative endpoints
  - 4-tier priority: request params > provider env > global env > defaults

  Add helper functions in utils.py, update API schemas and handlers,
  support environment variables (LLM_TEMPERATURE, OPENAI_TEMPERATURE, etc.),
  and provide comprehensive documentation with examples.
2025-08-26 16:44:07 +08:00
ntohidi
38f3ea42a7 fix(logger): ensure logger is a Logger instance in crawling strategies. ref #1437 2025-08-26 12:06:56 +08:00
ntohidi
102352eac4 fix(docker): resolve filter serialization and JSON encoding errors in deep crawl strategy (ref #1419)
- Fix URLPatternFilter serialization by preventing private __slots__ from being serialized as constructor params
  - Add public attributes to URLPatternFilter to store original constructor parameters for proper serialization
  - Handle property descriptors in CrawlResult.model_dump() to prevent JSON serialization errors
  - Ensure filter chains work correctly with Docker client and REST API

  The issue occurred because:
  1. Private implementation details (_simple_suffixes, etc.) were being serialized and passed as constructor arguments during deserialization
  2. Property descriptors were being included in the serialized output, causing "Object of type property is not JSON serializable" errors

  Changes:
  - async_configs.py: Comment out __slots__ serialization logic (lines 100-109)
  - filters.py: Add patterns, use_glob, reverse to URLPatternFilter __slots__ and store as public attributes
  - models.py: Convert property descriptors to strings in model_dump() instead of including them directly
2025-08-25 14:04:08 +08:00
James T. Wood
f2da460bb9 fix(dependencies): add cssselect to project dependencies
Fixes bug reported in issue #1405
[Bug]: Excluded selector (excluded_selector) doesn't work

This commit reintroduces the cssselect library which was removed by PR (https://github.com/unclecode/crawl4ai/pull/1368) and merged via (437395e490).

Integration tested against 0.7.4 Docker container. Reintroducing cssselector package eliminated errors seen in logs and excluded_selector functionality was restored.

Refs: #1405
2025-08-24 22:12:20 -04:00
Soham Kukreti
b1dff5a4d3 feat: Add comprehensive website to API example with frontend
This commit adds a complete, web scraping API example that demonstrates how to get structured data from any website and use it like an API using the crawl4ai library with a minimalist frontend interface.

Core Functionality
- AI-powered web scraping with plain English queries
- Dual scraping approaches: Schema-based (faster) and LLM-based (flexible)
- Intelligent schema caching for improved performance
- Custom LLM model support with API key management
- Automatic duplicate request prevention

Modern Frontend Interface
- Minimalist black-and-white design inspired by modern web apps
- Responsive layout with smooth animations and transitions
- Three main pages: Scrape Data, Models Management, API Request History
- Real-time results display with JSON formatting
- Copy-to-clipboard functionality for extracted data
- Toast notifications for user feedback
- Auto-scroll to results when scraping starts

Model Management System
- Web-based model configuration interface
- Support for any LLM provider (OpenAI, Gemini, Anthropic, etc.)
- Simplified configuration requiring only provider and API token
- Add, list, and delete model configurations
- Secure storage of API keys in local JSON files

API Request History
- Automatic saving of all API requests and responses
- Display of request history with URL, query, and cURL commands
- Duplicate prevention (same URL + query combinations)
- Request deletion functionality
- Clean, simplified display focusing on essential information

Technical Implementation

Backend (FastAPI)
- RESTful API with comprehensive endpoints
- Pydantic models for request/response validation
- Async web scraping with crawl4ai library
- Error handling with detailed error messages
- File-based storage for models and request history

Frontend (Vanilla JS/CSS/HTML)
- No framework dependencies - pure HTML, CSS, JavaScript
- Modern CSS Grid and Flexbox layouts
- Custom dropdown styling with SVG arrows
- Responsive design for mobile and desktop
- Smooth scrolling and animations

Core Library Integration
- WebScraperAgent class for orchestration
- ModelConfig class for LLM configuration management
- Schema generation and caching system
- LLM extraction strategy support
- Browser configuration with headless mode
2025-08-24 18:52:37 +05:30
ntohidi
40ab287c90 fix(utils): Improve URL normalization by avoiding quote/unquote to preserve '+' signs. ref #1332 2025-08-22 12:05:21 +08:00
Soham Kukreti
c09a57644f docs: update adaptive crawler docs and cache defaults; remove deprecated examples (#1330)
- Replace BaseStrategy with CrawlStrategy in custom strategy examples (DomainSpecificStrategy, HybridStrategy)
- Remove “Custom Link Scoring” and “Caching Strategy” sections no longer aligned with current library
- Revise memory pruning example to use adaptive.get_relevant_content and index-based retention of top 500 docs
- Correct Quickstart note: default cache mode is CacheMode.BYPASS; instruct enabling with CacheMode.ENABLED
2025-08-21 19:11:31 +05:30
ntohidi
90af453506 Merge branch 'develop' of https://github.com/unclecode/crawl4ai into develop 2025-08-21 14:10:01 +08:00
Nasrin
8bb0e68cce Merge pull request #1422 from unclecode/fix/docker-llmEnvFile
fix(docker): Fix LLM API key handling for multi-provider support
2025-08-21 14:05:06 +08:00
ntohidi
95051020f4 fix(docker): Fix LLM API key handling for multi-provider support
Previously, the system incorrectly used OPENAI_API_KEY for all LLM providers
due to a hardcoded api_key_env fallback in config.yml. This caused authentication
errors when using non-OpenAI providers like Gemini.

Changes:
- Remove api_key_env from config.yml to let litellm handle provider-specific env vars
- Simplify get_llm_api_key() to return None, allowing litellm to auto-detect keys
- Update validate_llm_provider() to trust litellm's built-in key detection
- Update documentation to reflect the new automatic key handling

The fix leverages litellm's existing capability to automatically find the correct
environment variable for each provider (OPENAI_API_KEY, GEMINI_API_TOKEN, etc.)
without manual configuration.

ref #1291
2025-08-21 14:01:04 +08:00
ntohidi
69961cf40b Merge branch 'develop' of https://github.com/unclecode/crawl4ai into develop 2025-08-20 16:56:19 +08:00
Nasrin
ef174a4c7a Merge pull request #1104 from emmanuel-ferdman/main
fix(docker-api): migrate to modern datetime library API
2025-08-20 10:57:39 +08:00
Nasrin
f4206d6ba1 Merge pull request #1369 from NezarAli/main
Fix examples in README.md
2025-08-18 14:22:54 +08:00
Nasrin
dad7c51481 Merge pull request #1398 from unclecode/fix/update-url-seeding-docs
Update URL seeding examples to use proper async context managers
2025-08-18 13:00:26 +08:00
Soham Kukreti
ecbe5ffb84 docs: Update URL seeding examples to use proper async context managers
- Wrap all AsyncUrlSeeder usage with async context managers
- Update URL seeding adventure example to use "sitemap+cc" source, focus on course posts, and add stream=True parameter to fix runtime error
2025-08-13 18:16:46 +05:30
ntohidi
88a9fbbb7e fix(deep-crawl): BestFirst priority inversion; remove pre-scoring truncation. ref #1253
Use negative scores in PQ to visit high-score URLs first and drop link cap prior to scoring; add test for ordering.
2025-08-11 18:16:57 +08:00
Nezar Ali
7a8190ecb6 Fix examples in README.md 2025-08-06 11:58:29 +03:00
Emmanuel Ferdman
8e3c411a3e Merge branch 'main' into main 2025-07-29 14:05:35 +03:00
Emmanuel Ferdman
1e1c887a2f fix(docker-api): migrate to modern datetime library API
Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
2025-05-13 00:04:58 -07:00
77 changed files with 4943 additions and 3489 deletions

2
.gitignore vendored
View File

@@ -265,7 +265,7 @@ CLAUDE.md
tests/**/test_site
tests/**/reports
tests/**/benchmark_reports
test_scripts/
docs/**/data
.codecat/

View File

@@ -5,6 +5,16 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **🔒 HTTPS Preservation for Internal Links**: New `preserve_https_for_internal_links` configuration flag
- Maintains HTTPS scheme for internal links even when servers redirect to HTTP
- Prevents security downgrades during deep crawling
- Useful for security-conscious crawling and sites supporting both protocols
- Fully backward compatible with opt-in flag (default: `False`)
- Fixes issue #1410 where HTTPS URLs were being downgraded to HTTP
## [0.7.3] - 2025-08-09
### Added

View File

@@ -1,136 +0,0 @@
# Makefile for Crawl4AI Telemetry Testing
# Usage: make test-telemetry, make test-unit, make test-integration, etc.
.PHONY: help test-all test-telemetry test-unit test-integration test-privacy test-performance test-slow test-coverage test-verbose clean
# Default Python executable
PYTHON := .venv/bin/python
PYTEST := $(PYTHON) -m pytest
help:
@echo "Crawl4AI Telemetry Testing Commands:"
@echo ""
@echo " test-all Run all telemetry tests"
@echo " test-telemetry Run all telemetry tests (same as test-all)"
@echo " test-unit Run unit tests only"
@echo " test-integration Run integration tests only"
@echo " test-privacy Run privacy compliance tests only"
@echo " test-performance Run performance tests only"
@echo " test-slow Run slow tests only"
@echo " test-coverage Run tests with coverage report"
@echo " test-verbose Run tests with verbose output"
@echo " test-specific TEST= Run specific test (e.g., make test-specific TEST=test_telemetry.py::TestTelemetryConfig)"
@echo " clean Clean test artifacts"
@echo ""
@echo "Environment Variables:"
@echo " CRAWL4AI_TELEMETRY_TEST_REAL=1 Enable real telemetry during tests"
@echo " PYTEST_ARGS Additional pytest arguments"
# Run all telemetry tests
test-all test-telemetry:
$(PYTEST) tests/telemetry/ -v
# Run unit tests only
test-unit:
$(PYTEST) tests/telemetry/ -m "unit" -v
# Run integration tests only
test-integration:
$(PYTEST) tests/telemetry/ -m "integration" -v
# Run privacy compliance tests only
test-privacy:
$(PYTEST) tests/telemetry/ -m "privacy" -v
# Run performance tests only
test-performance:
$(PYTEST) tests/telemetry/ -m "performance" -v
# Run slow tests only
test-slow:
$(PYTEST) tests/telemetry/ -m "slow" -v
# Run tests with coverage
test-coverage:
$(PYTEST) tests/telemetry/ --cov=crawl4ai.telemetry --cov-report=html --cov-report=term-missing -v
# Run tests with verbose output
test-verbose:
$(PYTEST) tests/telemetry/ -vvv --tb=long
# Run specific test
test-specific:
$(PYTEST) tests/telemetry/$(TEST) -v
# Run tests excluding slow ones
test-fast:
$(PYTEST) tests/telemetry/ -m "not slow" -v
# Run tests in parallel
test-parallel:
$(PYTEST) tests/telemetry/ -n auto -v
# Clean test artifacts
clean:
rm -rf .pytest_cache/
rm -rf htmlcov/
rm -rf .coverage
find tests/ -name "*.pyc" -delete
find tests/ -name "__pycache__" -type d -exec rm -rf {} +
rm -rf tests/telemetry/__pycache__/
# Lint test files
lint-tests:
$(PYTHON) -m flake8 tests/telemetry/
$(PYTHON) -m pylint tests/telemetry/
# Type check test files
typecheck-tests:
$(PYTHON) -m mypy tests/telemetry/
# Run all quality checks
check-tests: lint-tests typecheck-tests test-unit
# Install test dependencies
install-test-deps:
$(PYTHON) -m pip install pytest pytest-asyncio pytest-mock pytest-cov pytest-xdist
# Setup development environment for testing
setup-dev:
$(PYTHON) -m pip install -e .
$(MAKE) install-test-deps
# Generate test report
test-report:
$(PYTEST) tests/telemetry/ --html=test-report.html --self-contained-html -v
# Run performance benchmarks
benchmark:
$(PYTEST) tests/telemetry/test_privacy_performance.py::TestTelemetryPerformance -v --benchmark-only
# Test different environments
test-docker-env:
CRAWL4AI_DOCKER=true $(PYTEST) tests/telemetry/ -k "docker" -v
test-cli-env:
$(PYTEST) tests/telemetry/ -k "cli" -v
# Validate telemetry implementation
validate:
@echo "Running telemetry validation suite..."
$(MAKE) test-unit
$(MAKE) test-privacy
$(MAKE) test-performance
@echo "Validation complete!"
# Debug failing tests
debug:
$(PYTEST) tests/telemetry/ --pdb -x -v
# Show test markers
show-markers:
$(PYTEST) --markers
# Show test collection (dry run)
show-tests:
$(PYTEST) tests/telemetry/ --collect-only -q

View File

@@ -373,7 +373,7 @@ async def main():
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://docs.micronaut.io/4.7.6/guide/",
url="https://docs.micronaut.io/4.9.9/guide/",
config=run_config
)
print(len(result.markdown.raw_markdown))
@@ -425,7 +425,7 @@ async def main():
"type": "attribute",
"attribute": "src"
}
}
]
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)

View File

@@ -1,190 +0,0 @@
# Crawl4AI Telemetry Testing Implementation
## Overview
This document summarizes the comprehensive testing strategy implementation for Crawl4AI's opt-in telemetry system. The implementation provides thorough test coverage across unit tests, integration tests, privacy compliance tests, and performance tests.
## Implementation Summary
### 📊 Test Statistics
- **Total Tests**: 40 tests
- **Success Rate**: 100% (40/40 passing)
- **Test Categories**: 4 categories (Unit, Integration, Privacy, Performance)
- **Code Coverage**: 51% (625 statements, 308 missing)
### 🗂️ Test Structure
#### 1. **Unit Tests** (`tests/telemetry/test_telemetry.py`)
- `TestTelemetryConfig`: Configuration management and persistence
- `TestEnvironmentDetection`: CLI, Docker, API server environment detection
- `TestTelemetryManager`: Singleton pattern and exception capture
- `TestConsentManager`: Docker default behavior and environment overrides
- `TestPublicAPI`: Public enable/disable/status functions
- `TestIntegration`: Crawler exception capture integration
#### 2. **Integration Tests** (`tests/telemetry/test_integration.py`)
- `TestTelemetryCLI`: CLI command testing (status, enable, disable)
- `TestAsyncWebCrawlerIntegration`: Real crawler integration with decorators
- `TestDockerIntegration`: Docker environment-specific behavior
- `TestTelemetryProviderIntegration`: Sentry provider initialization and fallbacks
#### 3. **Privacy & Performance Tests** (`tests/telemetry/test_privacy_performance.py`)
- `TestTelemetryPrivacy`: Data sanitization and PII protection
- `TestTelemetryPerformance`: Decorator overhead measurement
- `TestTelemetryScalability`: Multiple and concurrent exception handling
#### 4. **Hello World Test** (`tests/telemetry/test_hello_world_telemetry.py`)
- Basic telemetry functionality validation
### 🔧 Testing Infrastructure
#### **Pytest Configuration** (`pytest.ini`)
```ini
[pytest]
testpaths = tests/telemetry
markers =
unit: Unit tests
integration: Integration tests
privacy: Privacy compliance tests
performance: Performance tests
asyncio_mode = auto
```
#### **Test Fixtures** (`tests/conftest.py`)
- `temp_config_dir`: Temporary configuration directory
- `enabled_telemetry_config`: Pre-configured enabled telemetry
- `disabled_telemetry_config`: Pre-configured disabled telemetry
- `mock_sentry_provider`: Mocked Sentry provider for testing
#### **Makefile Targets** (`Makefile.telemetry`)
```makefile
test-all: Run all telemetry tests
test-unit: Run unit tests only
test-integration: Run integration tests only
test-privacy: Run privacy tests only
test-performance: Run performance tests only
test-coverage: Run tests with coverage report
test-watch: Run tests in watch mode
test-parallel: Run tests in parallel
```
## 🎯 Key Features Tested
### Privacy Compliance
- ✅ No URLs captured in telemetry data
- ✅ No content captured in telemetry data
- ✅ No PII (personally identifiable information) captured
- ✅ Sanitized context only (error types, stack traces without content)
### Performance Impact
- ✅ Telemetry decorator overhead < 1ms
- ✅ Async decorator overhead < 1ms
- ✅ Disabled telemetry has minimal performance impact
- ✅ Configuration loading performance acceptable
- ✅ Multiple exception capture scalability
- ✅ Concurrent exception capture handling
### Integration Points
- ✅ CLI command integration (status, enable, disable)
- ✅ AsyncWebCrawler decorator integration
- ✅ Docker environment auto-detection
- ✅ Sentry provider initialization
- ✅ Graceful degradation without Sentry
- ✅ Environment variable overrides
### Core Functionality
- ✅ Configuration persistence and loading
- ✅ Consent management (Docker defaults, user prompts)
- ✅ Environment detection (CLI, Docker, Jupyter, etc.)
- ✅ Singleton pattern for TelemetryManager
- ✅ Exception capture and forwarding
- ✅ Provider abstraction (Sentry, Null)
## 🚀 Usage Examples
### Run All Tests
```bash
make -f Makefile.telemetry test-all
```
### Run Specific Test Categories
```bash
# Unit tests only
make -f Makefile.telemetry test-unit
# Integration tests only
make -f Makefile.telemetry test-integration
# Privacy tests only
make -f Makefile.telemetry test-privacy
# Performance tests only
make -f Makefile.telemetry test-performance
```
### Coverage Report
```bash
make -f Makefile.telemetry test-coverage
```
### Parallel Execution
```bash
make -f Makefile.telemetry test-parallel
```
## 📁 File Structure
```
tests/
├── conftest.py # Shared pytest fixtures
└── telemetry/
├── test_hello_world_telemetry.py # Basic functionality test
├── test_telemetry.py # Unit tests
├── test_integration.py # Integration tests
└── test_privacy_performance.py # Privacy & performance tests
# Configuration
pytest.ini # Pytest configuration with markers
Makefile.telemetry # Convenient test execution targets
```
## 🔍 Test Isolation & Mocking
### Environment Isolation
- Tests run in isolated temporary directories
- Environment variables are properly mocked/isolated
- No interference between test runs
- Clean state for each test
### Mock Strategies
- `unittest.mock` for external dependencies
- Temporary file systems for configuration testing
- Subprocess mocking for CLI command testing
- Time measurement for performance testing
## 📈 Coverage Analysis
Current test coverage: **51%** (625 statements)
### Well-Covered Areas:
- Core configuration management (78%)
- Telemetry initialization (69%)
- Environment detection (64%)
### Areas for Future Enhancement:
- Consent management UI (20% - interactive prompts)
- Sentry provider implementation (25% - network calls)
- Base provider abstractions (49% - error handling paths)
## 🎉 Implementation Success
The comprehensive testing strategy has been **successfully implemented** with:
-**100% test pass rate** (40/40 tests passing)
-**Complete test infrastructure** (fixtures, configuration, targets)
-**Privacy compliance verification** (no PII, URLs, or content captured)
-**Performance validation** (minimal overhead confirmed)
-**Integration testing** (CLI, Docker, AsyncWebCrawler)
-**CI/CD ready** (Makefile targets for automation)
The telemetry system now has robust test coverage ensuring reliability, privacy compliance, and performance characteristics while maintaining comprehensive validation of all core functionality.

View File

@@ -19,7 +19,7 @@ import re
from pathlib import Path
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig, LinkPreviewConfig
from crawl4ai.async_configs import CrawlerRunConfig, LinkPreviewConfig, LLMConfig
from crawl4ai.models import Link, CrawlResult
import numpy as np
@@ -178,7 +178,7 @@ class AdaptiveConfig:
# Embedding strategy parameters
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_llm_config: Optional[Dict] = None # Separate config for embeddings
embedding_llm_config: Optional[Union[LLMConfig, Dict]] = None # Separate config for embeddings
n_query_variations: int = 10
coverage_threshold: float = 0.85
alpha_shape_alpha: float = 0.5
@@ -250,6 +250,30 @@ class AdaptiveConfig:
assert 0 <= self.embedding_quality_max_confidence <= 1, "embedding_quality_max_confidence must be between 0 and 1"
assert self.embedding_quality_scale_factor > 0, "embedding_quality_scale_factor must be positive"
assert 0 <= self.embedding_min_confidence_threshold <= 1, "embedding_min_confidence_threshold must be between 0 and 1"
@property
def _embedding_llm_config_dict(self) -> Optional[Dict]:
"""Convert LLMConfig to dict format for backward compatibility."""
if self.embedding_llm_config is None:
return None
if isinstance(self.embedding_llm_config, dict):
# Already a dict - return as-is for backward compatibility
return self.embedding_llm_config
# Convert LLMConfig object to dict format
return {
'provider': self.embedding_llm_config.provider,
'api_token': self.embedding_llm_config.api_token,
'base_url': getattr(self.embedding_llm_config, 'base_url', None),
'temperature': getattr(self.embedding_llm_config, 'temperature', None),
'max_tokens': getattr(self.embedding_llm_config, 'max_tokens', None),
'top_p': getattr(self.embedding_llm_config, 'top_p', None),
'frequency_penalty': getattr(self.embedding_llm_config, 'frequency_penalty', None),
'presence_penalty': getattr(self.embedding_llm_config, 'presence_penalty', None),
'stop': getattr(self.embedding_llm_config, 'stop', None),
'n': getattr(self.embedding_llm_config, 'n', None),
}
class CrawlStrategy(ABC):
@@ -593,7 +617,7 @@ class StatisticalStrategy(CrawlStrategy):
class EmbeddingStrategy(CrawlStrategy):
"""Embedding-based adaptive crawling using semantic space coverage"""
def __init__(self, embedding_model: str = None, llm_config: Dict = None):
def __init__(self, embedding_model: str = None, llm_config: Union[LLMConfig, Dict] = None):
self.embedding_model = embedding_model or "sentence-transformers/all-MiniLM-L6-v2"
self.llm_config = llm_config
self._embedding_cache = {}
@@ -605,14 +629,24 @@ class EmbeddingStrategy(CrawlStrategy):
self._kb_embeddings_hash = None # Track KB changes
self._validation_embeddings_cache = None # Cache validation query embeddings
self._kb_similarity_threshold = 0.95 # Threshold for deduplication
def _get_embedding_llm_config_dict(self) -> Dict:
"""Get embedding LLM config as dict with fallback to default."""
if hasattr(self, 'config') and self.config:
config_dict = self.config._embedding_llm_config_dict
if config_dict:
return config_dict
# Fallback to default if no config provided
return {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
async def _get_embeddings(self, texts: List[str]) -> Any:
"""Get embeddings using configured method"""
from .utils import get_text_embeddings
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
embedding_llm_config = self._get_embedding_llm_config_dict()
return await get_text_embeddings(
texts,
embedding_llm_config,
@@ -679,8 +713,20 @@ class EmbeddingStrategy(CrawlStrategy):
Return as a JSON array of strings."""
# Use the LLM for query generation
provider = self.llm_config.get('provider', 'openai/gpt-4o-mini') if self.llm_config else 'openai/gpt-4o-mini'
api_token = self.llm_config.get('api_token') if self.llm_config else None
# Convert LLMConfig to dict if needed
llm_config_dict = None
if self.llm_config:
if isinstance(self.llm_config, dict):
llm_config_dict = self.llm_config
else:
# Convert LLMConfig object to dict
llm_config_dict = {
'provider': self.llm_config.provider,
'api_token': self.llm_config.api_token
}
provider = llm_config_dict.get('provider', 'openai/gpt-4o-mini') if llm_config_dict else 'openai/gpt-4o-mini'
api_token = llm_config_dict.get('api_token') if llm_config_dict else None
# response = perform_completion_with_backoff(
# provider=provider,
@@ -843,10 +889,7 @@ class EmbeddingStrategy(CrawlStrategy):
# Batch embed only uncached links
if texts_to_embed:
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
embedding_llm_config = self._get_embedding_llm_config_dict()
new_embeddings = await get_text_embeddings(texts_to_embed, embedding_llm_config, self.embedding_model)
# Cache the new embeddings
@@ -1184,10 +1227,7 @@ class EmbeddingStrategy(CrawlStrategy):
return
# Get embeddings for new texts
embedding_llm_config = {
'provider': 'openai/text-embedding-3-small',
'api_token': os.getenv('OPENAI_API_KEY')
}
embedding_llm_config = self._get_embedding_llm_config_dict()
new_embeddings = await get_text_embeddings(new_texts, embedding_llm_config, self.embedding_model)
# Deduplicate embeddings before adding to KB
@@ -1256,10 +1296,12 @@ class AdaptiveCrawler:
if strategy_name == "statistical":
return StatisticalStrategy()
elif strategy_name == "embedding":
return EmbeddingStrategy(
strategy = EmbeddingStrategy(
embedding_model=self.config.embedding_model,
llm_config=self.config.embedding_llm_config
)
strategy.config = self.config # Pass config to strategy
return strategy
else:
raise ValueError(f"Unknown strategy: {strategy_name}")

View File

@@ -1,5 +1,6 @@
import os
from typing import Union
import warnings
from .config import (
DEFAULT_PROVIDER,
DEFAULT_PROVIDER_API_KEY,
@@ -97,13 +98,16 @@ def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict:
if value != param.default and not ignore_default_value:
current_values[name] = to_serializable_dict(value)
if hasattr(obj, '__slots__'):
for slot in obj.__slots__:
if slot.startswith('_'): # Handle private slots
attr_name = slot[1:] # Remove leading '_'
value = getattr(obj, slot, None)
if value is not None:
current_values[attr_name] = to_serializable_dict(value)
# Don't serialize private __slots__ - they're internal implementation details
# not constructor parameters. This was causing URLPatternFilter to fail
# because _simple_suffixes was being serialized as 'simple_suffixes'
# if hasattr(obj, '__slots__'):
# for slot in obj.__slots__:
# if slot.startswith('_'): # Handle private slots
# attr_name = slot[1:] # Remove leading '_'
# value = getattr(obj, slot, None)
# if value is not None:
# current_values[attr_name] = to_serializable_dict(value)
@@ -254,24 +258,39 @@ class ProxyConfig:
@staticmethod
def from_string(proxy_str: str) -> "ProxyConfig":
"""Create a ProxyConfig from a string in the format 'ip:port:username:password'."""
parts = proxy_str.split(":")
if len(parts) == 4: # ip:port:username:password
"""Create a ProxyConfig from a string.
Supported formats:
- 'http://username:password@ip:port'
- 'http://ip:port'
- 'socks5://ip:port'
- 'ip:port:username:password'
- 'ip:port'
"""
s = (proxy_str or "").strip()
# URL with credentials
if "@" in s and "://" in s:
auth_part, server_part = s.split("@", 1)
protocol, credentials = auth_part.split("://", 1)
if ":" in credentials:
username, password = credentials.split(":", 1)
return ProxyConfig(
server=f"{protocol}://{server_part}",
username=username,
password=password,
)
# URL without credentials (keep scheme)
if "://" in s and "@" not in s:
return ProxyConfig(server=s)
# Colon separated forms
parts = s.split(":")
if len(parts) == 4:
ip, port, username, password = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
username=username,
password=password,
ip=ip
)
elif len(parts) == 2: # ip:port only
return ProxyConfig(server=f"http://{ip}:{port}", username=username, password=password)
if len(parts) == 2:
ip, port = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
ip=ip
)
else:
raise ValueError(f"Invalid proxy string format: {proxy_str}")
return ProxyConfig(server=f"http://{ip}:{port}")
raise ValueError(f"Invalid proxy string format: {proxy_str}")
@staticmethod
def from_dict(proxy_dict: Dict) -> "ProxyConfig":
@@ -435,6 +454,7 @@ class BrowserConfig:
host: str = "localhost",
enable_stealth: bool = False,
):
self.browser_type = browser_type
self.headless = headless
self.browser_mode = browser_mode
@@ -447,13 +467,22 @@ class BrowserConfig:
if self.browser_type in ["firefox", "webkit"]:
self.channel = ""
self.chrome_channel = ""
if proxy:
warnings.warn("The 'proxy' parameter is deprecated and will be removed in a future release. Use 'proxy_config' instead.", UserWarning)
self.proxy = proxy
self.proxy_config = proxy_config
if isinstance(self.proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(self.proxy_config)
if isinstance(self.proxy_config, str):
self.proxy_config = ProxyConfig.from_string(self.proxy_config)
if self.proxy and self.proxy_config:
warnings.warn("Both 'proxy' and 'proxy_config' are provided. 'proxy_config' will take precedence.", UserWarning)
self.proxy = None
elif self.proxy:
# Convert proxy string to ProxyConfig if proxy_config is not provided
self.proxy_config = ProxyConfig.from_string(self.proxy)
self.proxy = None
self.viewport_width = viewport_width
self.viewport_height = viewport_height
@@ -831,12 +860,6 @@ class HTTPCrawlerConfig:
return HTTPCrawlerConfig.from_kwargs(config)
class CrawlerRunConfig():
_UNWANTED_PROPS = {
'disable_cache' : 'Instead, use cache_mode=CacheMode.DISABLED',
'bypass_cache' : 'Instead, use cache_mode=CacheMode.BYPASS',
'no_cache_read' : 'Instead, use cache_mode=CacheMode.WRITE_ONLY',
'no_cache_write' : 'Instead, use cache_mode=CacheMode.READ_ONLY',
}
"""
Configuration class for controlling how the crawler runs each crawl operation.
@@ -1043,6 +1066,12 @@ class CrawlerRunConfig():
url: str = None # This is not a compulsory parameter
"""
_UNWANTED_PROPS = {
'disable_cache' : 'Instead, use cache_mode=CacheMode.DISABLED',
'bypass_cache' : 'Instead, use cache_mode=CacheMode.BYPASS',
'no_cache_read' : 'Instead, use cache_mode=CacheMode.WRITE_ONLY',
'no_cache_write' : 'Instead, use cache_mode=CacheMode.READ_ONLY',
}
def __init__(
self,
@@ -1121,6 +1150,7 @@ class CrawlerRunConfig():
exclude_domains: list = None,
exclude_internal_links: bool = False,
score_links: bool = False,
preserve_https_for_internal_links: bool = False,
# Debugging and Logging Parameters
verbose: bool = True,
log_console: bool = False,
@@ -1244,6 +1274,7 @@ class CrawlerRunConfig():
self.exclude_domains = exclude_domains or []
self.exclude_internal_links = exclude_internal_links
self.score_links = score_links
self.preserve_https_for_internal_links = preserve_https_for_internal_links
# Debugging and Logging Parameters
self.verbose = verbose
@@ -1517,6 +1548,7 @@ class CrawlerRunConfig():
exclude_domains=kwargs.get("exclude_domains", []),
exclude_internal_links=kwargs.get("exclude_internal_links", False),
score_links=kwargs.get("score_links", False),
preserve_https_for_internal_links=kwargs.get("preserve_https_for_internal_links", False),
# Debugging and Logging Parameters
verbose=kwargs.get("verbose", True),
log_console=kwargs.get("log_console", False),
@@ -1623,6 +1655,7 @@ class CrawlerRunConfig():
"exclude_domains": self.exclude_domains,
"exclude_internal_links": self.exclude_internal_links,
"score_links": self.score_links,
"preserve_https_for_internal_links": self.preserve_https_for_internal_links,
"verbose": self.verbose,
"log_console": self.log_console,
"capture_network_requests": self.capture_network_requests,

View File

@@ -49,9 +49,6 @@ from .utils import (
preprocess_html_for_schema,
)
# Import telemetry
from .telemetry import capture_exception, telemetry_decorator, async_telemetry_decorator
class AsyncWebCrawler:
"""
@@ -204,7 +201,6 @@ class AsyncWebCrawler:
"""异步空上下文管理器"""
yield
@async_telemetry_decorator
async def arun(
self,
url: str,
@@ -358,6 +354,7 @@ class AsyncWebCrawler:
###############################################################
# Process the HTML content, Call CrawlerStrategy.process_html #
###############################################################
from urllib.parse import urlparse
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
@@ -368,6 +365,7 @@ class AsyncWebCrawler:
verbose=config.verbose,
is_raw_html=True if url.startswith("raw:") else False,
redirected_url=async_response.redirected_url,
original_scheme=urlparse(url).scheme,
**kwargs,
)
@@ -434,7 +432,6 @@ class AsyncWebCrawler:
)
)
@async_telemetry_decorator
async def aprocess_html(
self,
url: str,

View File

@@ -15,6 +15,7 @@ from .js_snippet import load_js_script
from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from .utils import get_chromium_path
import warnings
BROWSER_DISABLE_OPTIONS = [
@@ -368,6 +369,9 @@ class ManagedBrowser:
]
if self.headless:
flags.append("--headless=new")
# Add viewport flag if specified in config
if self.browser_config.viewport_height and self.browser_config.viewport_width:
flags.append(f"--window-size={self.browser_config.viewport_width},{self.browser_config.viewport_height}")
# merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config))
elif self.browser_type == "firefox":
@@ -741,17 +745,18 @@ class BrowserManager:
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.config.proxy or self.config.proxy_config:
if self.config.proxy:
warnings.warn(
"BrowserConfig.proxy is deprecated and ignored. Use proxy_config instead.",
DeprecationWarning,
)
if self.config.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
proxy_settings = ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
browser_args["proxy"] = proxy_settings

View File

@@ -1385,97 +1385,6 @@ def profiles_cmd():
# Run interactive profile manager
anyio.run(manage_profiles)
@cli.group("telemetry")
def telemetry_cmd():
"""Manage telemetry settings for Crawl4AI
Telemetry helps improve Crawl4AI by sending anonymous crash reports.
No personal data or crawled content is ever collected.
"""
pass
@telemetry_cmd.command("enable")
@click.option("--email", "-e", help="Optional email for follow-up on critical issues")
@click.option("--always/--once", default=True, help="Always send errors (default) or just once")
def telemetry_enable_cmd(email: Optional[str], always: bool):
"""Enable telemetry to help improve Crawl4AI
Examples:
crwl telemetry enable # Enable telemetry
crwl telemetry enable --email me@ex.com # Enable with email
crwl telemetry enable --once # Send only next error
"""
from crawl4ai.telemetry import enable
try:
enable(email=email, always=always, once=not always)
console.print("[green]✅ Telemetry enabled successfully[/green]")
if email:
console.print(f" Email: {email}")
console.print(f" Mode: {'Always send errors' if always else 'Send next error only'}")
except Exception as e:
console.print(f"[red]❌ Failed to enable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("disable")
def telemetry_disable_cmd():
"""Disable telemetry
Stop sending anonymous crash reports to help improve Crawl4AI.
"""
from crawl4ai.telemetry import disable
try:
disable()
console.print("[green]✅ Telemetry disabled successfully[/green]")
except Exception as e:
console.print(f"[red]❌ Failed to disable telemetry: {e}[/red]")
sys.exit(1)
@telemetry_cmd.command("status")
def telemetry_status_cmd():
"""Show current telemetry status
Display whether telemetry is enabled and current settings.
"""
from crawl4ai.telemetry import status
try:
info = status()
# Create status table
table = Table(title="Telemetry Status", show_header=False)
table.add_column("Setting", style="cyan")
table.add_column("Value")
# Status emoji
status_icon = "" if info['enabled'] else ""
table.add_row("Status", f"{status_icon} {'Enabled' if info['enabled'] else 'Disabled'}")
table.add_row("Consent", info['consent'].replace('_', ' ').title())
if info['email']:
table.add_row("Email", info['email'])
table.add_row("Environment", info['environment'])
table.add_row("Provider", info['provider'])
if info['errors_sent'] > 0:
table.add_row("Errors Sent", str(info['errors_sent']))
console.print(table)
# Add helpful messages
if not info['enabled']:
console.print("\n[yellow] Telemetry is disabled. Enable it to help improve Crawl4AI:[/yellow]")
console.print(" [dim]crwl telemetry enable[/dim]")
except Exception as e:
console.print(f"[red]❌ Failed to get telemetry status: {e}[/red]")
sys.exit(1)
@cli.command(name="")
@click.argument("url", required=False)
@click.option("--example", is_flag=True, help="Show usage examples")

View File

@@ -258,7 +258,11 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
continue
try:
normalized_href = normalize_url(href, url)
normalized_href = normalize_url(
href, url,
preserve_https=kwargs.get('preserve_https_for_internal_links', False),
original_scheme=kwargs.get('original_scheme')
)
link_data = {
"href": normalized_href,
"text": link.text_content().strip(),

View File

@@ -47,7 +47,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.url_scorer = url_scorer
self.include_external = include_external
self.max_pages = max_pages
self.logger = logger or logging.getLogger(__name__)
# self.logger = logger or logging.getLogger(__name__)
# Ensure logger is always a Logger instance, not a dict from serialization
if isinstance(logger, logging.Logger):
self.logger = logger
else:
# Create a new logger if logger is None, dict, or any other non-Logger type
self.logger = logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0
@@ -116,11 +122,6 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
valid_links.append(base_url)
# If we have more valid links than capacity, limit them
if len(valid_links) > remaining_capacity:
valid_links = valid_links[:remaining_capacity]
self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit")
# Record the new depths and add to next_links
for url in valid_links:
depths[url] = new_depth
@@ -140,7 +141,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
"""
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
# Push the initial URL with score 0 and depth 0.
await queue.put((0, 0, start_url, None))
initial_score = self.url_scorer.score(start_url) if self.url_scorer else 0
await queue.put((-initial_score, 0, start_url, None))
visited: Set[str] = set()
depths: Dict[str, int] = {start_url: 0}
@@ -187,7 +189,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
result.metadata["parent_url"] = parent_url
result.metadata["score"] = score
result.metadata["score"] = -score
# Count only successful crawls toward max_pages limit
if result.success:
@@ -208,7 +210,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
for new_url, new_parent in new_links:
new_depth = depths.get(new_url, depth + 1)
new_score = self.url_scorer.score(new_url) if self.url_scorer else 0
await queue.put((new_score, new_depth, new_url, new_parent))
await queue.put((-new_score, new_depth, new_url, new_parent))
# End of crawl.

View File

@@ -38,7 +38,13 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.include_external = include_external
self.score_threshold = score_threshold
self.max_pages = max_pages
self.logger = logger or logging.getLogger(__name__)
# self.logger = logger or logging.getLogger(__name__)
# Ensure logger is always a Logger instance, not a dict from serialization
if isinstance(logger, logging.Logger):
self.logger = logger
else:
# Create a new logger if logger is None, dict, or any other non-Logger type
self.logger = logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0

View File

@@ -120,6 +120,9 @@ class URLPatternFilter(URLFilter):
"""Pattern filter balancing speed and completeness"""
__slots__ = (
"patterns", # Store original patterns for serialization
"use_glob", # Store original use_glob for serialization
"reverse", # Store original reverse for serialization
"_simple_suffixes",
"_simple_prefixes",
"_domain_patterns",
@@ -142,6 +145,11 @@ class URLPatternFilter(URLFilter):
reverse: bool = False,
):
super().__init__()
# Store original constructor params for serialization
self.patterns = patterns
self.use_glob = use_glob
self.reverse = reverse
self._reverse = reverse
patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns

View File

@@ -253,6 +253,16 @@ class CrawlResult(BaseModel):
requirements change, this is where you would update the logic.
"""
result = super().model_dump(*args, **kwargs)
# Remove any property descriptors that might have been included
# These deprecated properties should not be in the serialized output
for key in ['fit_html', 'fit_markdown', 'markdown_v2']:
if key in result and isinstance(result[key], property):
# del result[key]
# Nasrin: I decided to convert it to string instead of removing it.
result[key] = str(result[key])
# Add the markdown field properly
if self._markdown is not None:
result["markdown"] = self._markdown.model_dump()
return result

View File

@@ -1,440 +0,0 @@
"""
Crawl4AI Telemetry Module.
Provides opt-in error tracking to improve stability.
"""
import os
import sys
import functools
import traceback
from typing import Optional, Any, Dict, Callable, Type
from contextlib import contextmanager, asynccontextmanager
from .base import TelemetryProvider, NullProvider
from .config import TelemetryConfig, TelemetryConsent
from .consent import ConsentManager
from .environment import Environment, EnvironmentDetector
class TelemetryManager:
"""
Main telemetry manager for Crawl4AI.
Coordinates provider, config, and consent management.
"""
_instance: Optional['TelemetryManager'] = None
def __init__(self):
"""Initialize telemetry manager."""
self.config = TelemetryConfig()
self.consent_manager = ConsentManager(self.config)
self.environment = EnvironmentDetector.detect()
self._provider: Optional[TelemetryProvider] = None
self._initialized = False
self._error_count = 0
self._max_errors = 100 # Prevent telemetry spam
# Load provider based on config
self._setup_provider()
@classmethod
def get_instance(cls) -> 'TelemetryManager':
"""
Get singleton instance of telemetry manager.
Returns:
TelemetryManager instance
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _setup_provider(self) -> None:
"""Setup telemetry provider based on configuration."""
# Update config from environment
self.config.update_from_env()
# Check if telemetry is enabled
if not self.config.is_enabled():
self._provider = NullProvider()
return
# Try to load Sentry provider
try:
from .providers.sentry import SentryProvider
# Get Crawl4AI version for release tracking
try:
from crawl4ai import __version__
release = f"crawl4ai@{__version__}"
except ImportError:
release = "crawl4ai@unknown"
self._provider = SentryProvider(
environment=self.environment.value,
release=release
)
# Initialize provider
if not self._provider.initialize():
# Fallback to null provider if init fails
self._provider = NullProvider()
except ImportError:
# Sentry not installed - use null provider
self._provider = NullProvider()
self._initialized = True
def capture_exception(
self,
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture and send an exception.
Args:
exception: The exception to capture
context: Optional additional context
Returns:
True if exception was sent
"""
# Check error count limit
if self._error_count >= self._max_errors:
return False
# Check consent on first error
if self._error_count == 0:
consent = self.consent_manager.check_and_prompt()
# Update provider if consent changed
if consent == TelemetryConsent.DENIED:
self._provider = NullProvider()
return False
elif consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]:
if isinstance(self._provider, NullProvider):
self._setup_provider()
# Check if we should send this error
if not self.config.should_send_current():
return False
# Prepare context
full_context = EnvironmentDetector.get_environment_context()
if context:
full_context.update(context)
# Add user email if available
email = self.config.get_email()
if email:
full_context['email'] = email
# Add source info
full_context['source'] = 'crawl4ai'
# Send exception
try:
if self._provider:
success = self._provider.send_exception(exception, full_context)
if success:
self._error_count += 1
return success
except Exception:
# Telemetry itself failed - ignore
pass
return False
def capture_message(
self,
message: str,
level: str = 'info',
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture a message event.
Args:
message: Message to send
level: Message level (info, warning, error)
context: Optional context
Returns:
True if message was sent
"""
if not self.config.is_enabled():
return False
payload = {
'level': level,
'message': message
}
if context:
payload.update(context)
try:
if self._provider:
return self._provider.send_event(message, payload)
except Exception:
pass
return False
def enable(
self,
email: Optional[str] = None,
always: bool = True,
once: bool = False
) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors
once: If True, send only next error
"""
if once:
consent = TelemetryConsent.ONCE
elif always:
consent = TelemetryConsent.ALWAYS
else:
consent = TelemetryConsent.ALWAYS
self.config.set_consent(consent, email)
self._setup_provider()
print("✅ Telemetry enabled")
if email:
print(f" Email: {email}")
print(f" Mode: {'once' if once else 'always'}")
def disable(self) -> None:
"""Disable telemetry."""
self.config.set_consent(TelemetryConsent.DENIED)
self._provider = NullProvider()
print("✅ Telemetry disabled")
def status(self) -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return {
'enabled': self.config.is_enabled(),
'consent': self.config.get_consent().value,
'email': self.config.get_email(),
'environment': self.environment.value,
'provider': type(self._provider).__name__ if self._provider else 'None',
'errors_sent': self._error_count
}
def flush(self) -> None:
"""Flush any pending telemetry data."""
if self._provider:
self._provider.flush()
def shutdown(self) -> None:
"""Shutdown telemetry."""
if self._provider:
self._provider.shutdown()
# Global instance
_telemetry_manager: Optional[TelemetryManager] = None
def get_telemetry() -> TelemetryManager:
"""
Get global telemetry manager instance.
Returns:
TelemetryManager instance
"""
global _telemetry_manager
if _telemetry_manager is None:
_telemetry_manager = TelemetryManager.get_instance()
return _telemetry_manager
def capture_exception(
exception: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Capture an exception for telemetry.
Args:
exception: Exception to capture
context: Optional context
Returns:
True if sent successfully
"""
try:
return get_telemetry().capture_exception(exception, context)
except Exception:
return False
def telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from a function.
Args:
func: Function to wrap
Returns:
Wrapped function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
def async_telemetry_decorator(func: Callable) -> Callable:
"""
Decorator to capture exceptions from an async function.
Args:
func: Async function to wrap
Returns:
Wrapped async function
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# Capture exception
capture_exception(e, {
'function': func.__name__,
'module': func.__module__
})
# Re-raise the exception
raise
return wrapper
@contextmanager
def telemetry_context(operation: str):
"""
Context manager for capturing exceptions.
Args:
operation: Name of the operation
Example:
with telemetry_context("web_crawl"):
# Your code here
pass
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
@asynccontextmanager
async def async_telemetry_context(operation: str):
"""
Async context manager for capturing exceptions in async code.
Args:
operation: Name of the operation
Example:
async with async_telemetry_context("async_crawl"):
# Your async code here
await something()
"""
try:
yield
except Exception as e:
capture_exception(e, {'operation': operation})
raise
def install_exception_handler():
"""Install global exception handler for uncaught exceptions."""
original_hook = sys.excepthook
def telemetry_exception_hook(exc_type, exc_value, exc_traceback):
"""Custom exception hook with telemetry."""
# Don't capture KeyboardInterrupt
if not issubclass(exc_type, KeyboardInterrupt):
capture_exception(exc_value, {
'uncaught': True,
'type': exc_type.__name__
})
# Call original hook
original_hook(exc_type, exc_value, exc_traceback)
sys.excepthook = telemetry_exception_hook
# Public API
def enable(email: Optional[str] = None, always: bool = True, once: bool = False) -> None:
"""
Enable telemetry.
Args:
email: Optional email for follow-up
always: If True, always send errors (default)
once: If True, send only the next error
"""
get_telemetry().enable(email=email, always=always, once=once)
def disable() -> None:
"""Disable telemetry."""
get_telemetry().disable()
def status() -> Dict[str, Any]:
"""
Get telemetry status.
Returns:
Dictionary with status information
"""
return get_telemetry().status()
# Auto-install exception handler on import
# (Only for main library usage, not for Docker/API)
if EnvironmentDetector.detect() not in [Environment.DOCKER, Environment.API_SERVER]:
install_exception_handler()
__all__ = [
'TelemetryManager',
'get_telemetry',
'capture_exception',
'telemetry_decorator',
'async_telemetry_decorator',
'telemetry_context',
'async_telemetry_context',
'enable',
'disable',
'status',
]

View File

@@ -1,140 +0,0 @@
"""
Base telemetry provider interface for Crawl4AI.
Provides abstraction for different telemetry backends.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Union
import traceback
class TelemetryProvider(ABC):
"""Abstract base class for telemetry providers."""
def __init__(self, **kwargs):
"""Initialize the provider with optional configuration."""
self.config = kwargs
self._initialized = False
@abstractmethod
def initialize(self) -> bool:
"""
Initialize the telemetry provider.
Returns True if initialization successful, False otherwise.
"""
pass
@abstractmethod
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send an exception to the telemetry backend.
Args:
exc: The exception to report
context: Optional context data (email, environment, etc.)
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send a generic telemetry event.
Args:
event_name: Name of the event
payload: Optional event data
Returns:
True if sent successfully, False otherwise
"""
pass
@abstractmethod
def flush(self) -> None:
"""Flush any pending telemetry data."""
pass
@abstractmethod
def shutdown(self) -> None:
"""Clean shutdown of the provider."""
pass
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove sensitive information from telemetry data.
Override in subclasses for custom sanitization.
Args:
data: Raw data dictionary
Returns:
Sanitized data dictionary
"""
# Default implementation - remove common sensitive fields
sensitive_keys = {
'password', 'token', 'api_key', 'secret', 'credential',
'auth', 'authorization', 'cookie', 'session'
}
def _sanitize_dict(d: Dict) -> Dict:
sanitized = {}
for key, value in d.items():
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
sanitized[key] = '[REDACTED]'
elif isinstance(value, dict):
sanitized[key] = _sanitize_dict(value)
elif isinstance(value, list):
sanitized[key] = [
_sanitize_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
sanitized[key] = value
return sanitized
return _sanitize_dict(data) if isinstance(data, dict) else data
class NullProvider(TelemetryProvider):
"""No-op provider for when telemetry is disabled."""
def initialize(self) -> bool:
"""No initialization needed for null provider."""
self._initialized = True
return True
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op exception sending."""
return True
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""No-op event sending."""
return True
def flush(self) -> None:
"""No-op flush."""
pass
def shutdown(self) -> None:
"""No-op shutdown."""
pass

View File

@@ -1,196 +0,0 @@
"""
Configuration management for Crawl4AI telemetry.
Handles user preferences and persistence.
"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
from enum import Enum
class TelemetryConsent(Enum):
"""Telemetry consent levels."""
NOT_SET = "not_set"
DENIED = "denied"
ONCE = "once" # Send current error only
ALWAYS = "always" # Send all errors
class TelemetryConfig:
"""Manages telemetry configuration and persistence."""
def __init__(self, config_dir: Optional[Path] = None):
"""
Initialize configuration manager.
Args:
config_dir: Optional custom config directory
"""
if config_dir:
self.config_dir = config_dir
else:
# Default to ~/.crawl4ai/
self.config_dir = Path.home() / '.crawl4ai'
self.config_file = self.config_dir / 'config.json'
self._config: Dict[str, Any] = {}
self._load_config()
def _ensure_config_dir(self) -> None:
"""Ensure configuration directory exists."""
self.config_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self) -> None:
"""Load configuration from disk."""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
self._config = json.load(f)
except (json.JSONDecodeError, IOError):
# Corrupted or inaccessible config - start fresh
self._config = {}
else:
self._config = {}
def _save_config(self) -> bool:
"""
Save configuration to disk.
Returns:
True if saved successfully
"""
try:
self._ensure_config_dir()
# Write to temporary file first
temp_file = self.config_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(self._config, f, indent=2)
# Atomic rename
temp_file.replace(self.config_file)
return True
except (IOError, OSError):
return False
def get_telemetry_settings(self) -> Dict[str, Any]:
"""
Get current telemetry settings.
Returns:
Dictionary with telemetry settings
"""
return self._config.get('telemetry', {
'consent': TelemetryConsent.NOT_SET.value,
'email': None
})
def get_consent(self) -> TelemetryConsent:
"""
Get current consent status.
Returns:
TelemetryConsent enum value
"""
settings = self.get_telemetry_settings()
consent_value = settings.get('consent', TelemetryConsent.NOT_SET.value)
# Handle legacy boolean values
if isinstance(consent_value, bool):
consent_value = TelemetryConsent.ALWAYS.value if consent_value else TelemetryConsent.DENIED.value
try:
return TelemetryConsent(consent_value)
except ValueError:
return TelemetryConsent.NOT_SET
def set_consent(
self,
consent: TelemetryConsent,
email: Optional[str] = None
) -> bool:
"""
Set telemetry consent and optional email.
Args:
consent: Consent level
email: Optional email for follow-up
Returns:
True if saved successfully
"""
if 'telemetry' not in self._config:
self._config['telemetry'] = {}
self._config['telemetry']['consent'] = consent.value
# Only update email if provided
if email is not None:
self._config['telemetry']['email'] = email
return self._save_config()
def get_email(self) -> Optional[str]:
"""
Get stored email if any.
Returns:
Email address or None
"""
settings = self.get_telemetry_settings()
return settings.get('email')
def is_enabled(self) -> bool:
"""
Check if telemetry is enabled.
Returns:
True if telemetry should send data
"""
consent = self.get_consent()
return consent in [TelemetryConsent.ONCE, TelemetryConsent.ALWAYS]
def should_send_current(self) -> bool:
"""
Check if current error should be sent.
Used for one-time consent.
Returns:
True if current error should be sent
"""
consent = self.get_consent()
if consent == TelemetryConsent.ONCE:
# After sending once, reset to NOT_SET
self.set_consent(TelemetryConsent.NOT_SET)
return True
return consent == TelemetryConsent.ALWAYS
def clear(self) -> bool:
"""
Clear all telemetry settings.
Returns:
True if cleared successfully
"""
if 'telemetry' in self._config:
del self._config['telemetry']
return self._save_config()
return True
def update_from_env(self) -> None:
"""Update configuration from environment variables."""
# Check for telemetry disable flag
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.set_consent(TelemetryConsent.DENIED)
# Check for email override
env_email = os.environ.get('CRAWL4AI_TELEMETRY_EMAIL')
if env_email and self.is_enabled():
current_settings = self.get_telemetry_settings()
self.set_consent(
TelemetryConsent(current_settings['consent']),
email=env_email
)

View File

@@ -1,314 +0,0 @@
"""
User consent handling for Crawl4AI telemetry.
Provides interactive prompts for different environments.
"""
import sys
from typing import Optional, Tuple
from .config import TelemetryConsent, TelemetryConfig
from .environment import Environment, EnvironmentDetector
class ConsentManager:
"""Manages user consent for telemetry."""
def __init__(self, config: Optional[TelemetryConfig] = None):
"""
Initialize consent manager.
Args:
config: Optional TelemetryConfig instance
"""
self.config = config or TelemetryConfig()
self.environment = EnvironmentDetector.detect()
def check_and_prompt(self) -> TelemetryConsent:
"""
Check consent status and prompt if needed.
Returns:
Current consent status
"""
current_consent = self.config.get_consent()
# If already set, return current value
if current_consent != TelemetryConsent.NOT_SET:
return current_consent
# Docker/API server: default enabled (check env var)
if self.environment in [Environment.DOCKER, Environment.API_SERVER]:
return self._handle_docker_consent()
# Interactive environments: prompt user
if EnvironmentDetector.is_interactive():
return self._prompt_for_consent()
# Non-interactive: default disabled
return TelemetryConsent.DENIED
def _handle_docker_consent(self) -> TelemetryConsent:
"""
Handle consent in Docker environment.
Default enabled unless disabled via env var.
"""
import os
if os.environ.get('CRAWL4AI_TELEMETRY') == '0':
self.config.set_consent(TelemetryConsent.DENIED)
return TelemetryConsent.DENIED
# Default enabled for Docker
self.config.set_consent(TelemetryConsent.ALWAYS)
return TelemetryConsent.ALWAYS
def _prompt_for_consent(self) -> TelemetryConsent:
"""
Prompt user for consent based on environment.
Returns:
User's consent choice
"""
if self.environment == Environment.CLI:
return self._cli_prompt()
elif self.environment in [Environment.JUPYTER, Environment.COLAB]:
return self._notebook_prompt()
else:
return TelemetryConsent.DENIED
def _cli_prompt(self) -> TelemetryConsent:
"""
Show CLI prompt for consent.
Returns:
User's consent choice
"""
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detection")
print("="*60)
print("\nWe noticed an error occurred. Help improve Crawl4AI by")
print("sending anonymous crash reports?")
print("\n[1] Yes, send this error only")
print("[2] Yes, always send errors")
print("[3] No, don't send")
print("\n" + "-"*60)
# Get choice
while True:
try:
choice = input("Your choice (1/2/3): ").strip()
if choice == '1':
consent = TelemetryConsent.ONCE
break
elif choice == '2':
consent = TelemetryConsent.ALWAYS
break
elif choice == '3':
consent = TelemetryConsent.DENIED
break
else:
print("Please enter 1, 2, or 3")
except (KeyboardInterrupt, EOFError):
# User cancelled - treat as denial
consent = TelemetryConsent.DENIED
break
# Optional email
email = None
if consent != TelemetryConsent.DENIED:
print("\nOptional: Enter email for follow-up (or press Enter to skip):")
try:
email_input = input("Email: ").strip()
if email_input and '@' in email_input:
email = email_input
except (KeyboardInterrupt, EOFError):
pass
# Save choice
self.config.set_consent(consent, email)
if consent != TelemetryConsent.DENIED:
print("\n✅ Thank you for helping improve Crawl4AI!")
else:
print("\n✅ Telemetry disabled. You can enable it anytime with:")
print(" crawl4ai telemetry enable")
print("="*60 + "\n")
return consent
def _notebook_prompt(self) -> TelemetryConsent:
"""
Show notebook prompt for consent.
Uses widgets if available, falls back to print + code.
Returns:
User's consent choice
"""
if EnvironmentDetector.supports_widgets():
return self._widget_prompt()
else:
return self._notebook_fallback_prompt()
def _widget_prompt(self) -> TelemetryConsent:
"""
Show interactive widget prompt in Jupyter/Colab.
Returns:
User's consent choice
"""
try:
import ipywidgets as widgets
from IPython.display import display, HTML
# Create styled HTML
html = HTML("""
<div style="padding: 15px; border: 2px solid #ff6b6b; border-radius: 8px; background: #fff5f5;">
<h3 style="color: #c92a2a; margin-top: 0;">🚨 Crawl4AI Error Detected</h3>
<p style="color: #495057;">Help us improve by sending anonymous crash reports?</p>
</div>
""")
display(html)
# Create buttons
btn_once = widgets.Button(
description='Send this error',
button_style='info',
icon='check'
)
btn_always = widgets.Button(
description='Always send',
button_style='success',
icon='check-circle'
)
btn_never = widgets.Button(
description='Don\'t send',
button_style='danger',
icon='times'
)
# Email input
email_input = widgets.Text(
placeholder='Optional: your@email.com',
description='Email:',
style={'description_width': 'initial'}
)
# Output area for feedback
output = widgets.Output()
# Container
button_box = widgets.HBox([btn_once, btn_always, btn_never])
container = widgets.VBox([button_box, email_input, output])
# Variable to store choice
consent_choice = {'value': None}
def on_button_click(btn):
"""Handle button click."""
with output:
output.clear_output()
if btn == btn_once:
consent_choice['value'] = TelemetryConsent.ONCE
print("✅ Sending this error only")
elif btn == btn_always:
consent_choice['value'] = TelemetryConsent.ALWAYS
print("✅ Always sending errors")
else:
consent_choice['value'] = TelemetryConsent.DENIED
print("✅ Telemetry disabled")
# Save with email if provided
email = email_input.value.strip() if email_input.value else None
self.config.set_consent(consent_choice['value'], email)
# Disable buttons after choice
btn_once.disabled = True
btn_always.disabled = True
btn_never.disabled = True
email_input.disabled = True
# Attach handlers
btn_once.on_click(on_button_click)
btn_always.on_click(on_button_click)
btn_never.on_click(on_button_click)
# Display widget
display(container)
# Wait for user choice (in notebook, this is non-blocking)
# Return NOT_SET for now, actual choice will be saved via callback
return consent_choice.get('value', TelemetryConsent.NOT_SET)
except Exception:
# Fallback if widgets fail
return self._notebook_fallback_prompt()
def _notebook_fallback_prompt(self) -> TelemetryConsent:
"""
Fallback prompt for notebooks without widget support.
Returns:
User's consent choice (defaults to DENIED)
"""
try:
from IPython.display import display, Markdown
markdown_content = """
### 🚨 Crawl4AI Error Detected
Help us improve by sending anonymous crash reports.
**Telemetry is currently OFF.** To enable, run:
```python
import crawl4ai
crawl4ai.telemetry.enable(email="your@email.com", always=True)
```
To send just this error:
```python
crawl4ai.telemetry.enable(once=True)
```
To keep telemetry disabled:
```python
crawl4ai.telemetry.disable()
```
"""
display(Markdown(markdown_content))
except ImportError:
# Pure print fallback
print("\n" + "="*60)
print("🚨 Crawl4AI Error Detected")
print("="*60)
print("\nTelemetry is OFF. To enable, run:")
print("\nimport crawl4ai")
print('crawl4ai.telemetry.enable(email="you@example.com", always=True)')
print("\n" + "="*60)
# Default to disabled in fallback mode
return TelemetryConsent.DENIED
def force_prompt(self) -> Tuple[TelemetryConsent, Optional[str]]:
"""
Force a consent prompt regardless of current settings.
Used for manual telemetry configuration.
Returns:
Tuple of (consent choice, optional email)
"""
# Temporarily reset consent to force prompt
original_consent = self.config.get_consent()
self.config.set_consent(TelemetryConsent.NOT_SET)
try:
new_consent = self._prompt_for_consent()
email = self.config.get_email()
return new_consent, email
except Exception:
# Restore original on error
self.config.set_consent(original_consent)
raise

View File

@@ -1,199 +0,0 @@
"""
Environment detection for Crawl4AI telemetry.
Detects whether we're running in CLI, Docker, Jupyter, etc.
"""
import os
import sys
from enum import Enum
from typing import Optional
class Environment(Enum):
"""Detected runtime environment."""
CLI = "cli"
DOCKER = "docker"
JUPYTER = "jupyter"
COLAB = "colab"
API_SERVER = "api_server"
UNKNOWN = "unknown"
class EnvironmentDetector:
"""Detects the current runtime environment."""
@staticmethod
def detect() -> Environment:
"""
Detect current runtime environment.
Returns:
Environment enum value
"""
# Check for Docker
if EnvironmentDetector._is_docker():
# Further check if it's API server
if EnvironmentDetector._is_api_server():
return Environment.API_SERVER
return Environment.DOCKER
# Check for Google Colab
if EnvironmentDetector._is_colab():
return Environment.COLAB
# Check for Jupyter
if EnvironmentDetector._is_jupyter():
return Environment.JUPYTER
# Check for CLI
if EnvironmentDetector._is_cli():
return Environment.CLI
return Environment.UNKNOWN
@staticmethod
def _is_docker() -> bool:
"""Check if running inside Docker container."""
# Check for Docker-specific files
if os.path.exists('/.dockerenv'):
return True
# Check cgroup for docker signature
try:
with open('/proc/1/cgroup', 'r') as f:
return 'docker' in f.read()
except (IOError, OSError):
pass
# Check environment variable (if set in Dockerfile)
return os.environ.get('CRAWL4AI_DOCKER', '').lower() == 'true'
@staticmethod
def _is_api_server() -> bool:
"""Check if running as API server."""
# Check for API server indicators
return (
os.environ.get('CRAWL4AI_API_SERVER', '').lower() == 'true' or
'deploy/docker/server.py' in ' '.join(sys.argv) or
'deploy/docker/api.py' in ' '.join(sys.argv)
)
@staticmethod
def _is_jupyter() -> bool:
"""Check if running in Jupyter notebook."""
try:
# Check for IPython
from IPython import get_ipython
ipython = get_ipython()
if ipython is None:
return False
# Check for notebook kernel
if 'IPKernelApp' in ipython.config:
return True
# Check for Jupyter-specific attributes
if hasattr(ipython, 'kernel'):
return True
except (ImportError, AttributeError):
pass
return False
@staticmethod
def _is_colab() -> bool:
"""Check if running in Google Colab."""
try:
import google.colab
return True
except ImportError:
pass
# Alternative check
return 'COLAB_GPU' in os.environ or 'COLAB_TPU_ADDR' in os.environ
@staticmethod
def _is_cli() -> bool:
"""Check if running from command line."""
# Check if we have a terminal
return (
hasattr(sys, 'ps1') or
sys.stdin.isatty() or
bool(os.environ.get('TERM'))
)
@staticmethod
def is_interactive() -> bool:
"""
Check if environment supports interactive prompts.
Returns:
True if interactive prompts are supported
"""
env = EnvironmentDetector.detect()
# Docker/API server are non-interactive
if env in [Environment.DOCKER, Environment.API_SERVER]:
return False
# CLI with TTY is interactive
if env == Environment.CLI:
return sys.stdin.isatty()
# Jupyter/Colab can be interactive with widgets
if env in [Environment.JUPYTER, Environment.COLAB]:
return True
return False
@staticmethod
def supports_widgets() -> bool:
"""
Check if environment supports IPython widgets.
Returns:
True if widgets are supported
"""
env = EnvironmentDetector.detect()
if env not in [Environment.JUPYTER, Environment.COLAB]:
return False
try:
import ipywidgets
from IPython.display import display
return True
except ImportError:
return False
@staticmethod
def get_environment_context() -> dict:
"""
Get environment context for telemetry.
Returns:
Dictionary with environment information
"""
env = EnvironmentDetector.detect()
context = {
'environment_type': env.value,
'python_version': f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
'platform': sys.platform,
}
# Add environment-specific context
if env == Environment.DOCKER:
context['docker'] = True
context['container_id'] = os.environ.get('HOSTNAME', 'unknown')
elif env == Environment.COLAB:
context['colab'] = True
context['gpu'] = bool(os.environ.get('COLAB_GPU'))
elif env == Environment.JUPYTER:
context['jupyter'] = True
return context

View File

@@ -1,15 +0,0 @@
"""
Telemetry providers for Crawl4AI.
"""
from ..base import TelemetryProvider, NullProvider
__all__ = ['TelemetryProvider', 'NullProvider']
# Try to import Sentry provider if available
try:
from .sentry import SentryProvider
__all__.append('SentryProvider')
except ImportError:
# Sentry SDK not installed
pass

View File

@@ -1,234 +0,0 @@
"""
Sentry telemetry provider for Crawl4AI.
"""
import os
from typing import Dict, Any, Optional
from ..base import TelemetryProvider
# Hardcoded DSN for Crawl4AI project
# This is safe to embed as it's the public part of the DSN
# TODO: Replace with actual Crawl4AI Sentry project DSN before release
# Format: "https://<public_key>@<organization>.ingest.sentry.io/<project_id>"
DEFAULT_SENTRY_DSN = "https://your-public-key@sentry.io/your-project-id"
class SentryProvider(TelemetryProvider):
"""Sentry implementation of telemetry provider."""
def __init__(self, dsn: Optional[str] = None, **kwargs):
"""
Initialize Sentry provider.
Args:
dsn: Optional DSN override (for testing/development)
**kwargs: Additional Sentry configuration
"""
super().__init__(**kwargs)
# Allow DSN override via environment variable or parameter
self.dsn = (
dsn or
os.environ.get('CRAWL4AI_SENTRY_DSN') or
DEFAULT_SENTRY_DSN
)
self._sentry_sdk = None
self.environment = kwargs.get('environment', 'production')
self.release = kwargs.get('release', None)
def initialize(self) -> bool:
"""Initialize Sentry SDK."""
try:
import sentry_sdk
from sentry_sdk.integrations.stdlib import StdlibIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
# Initialize Sentry with minimal integrations
sentry_sdk.init(
dsn=self.dsn,
environment=self.environment,
release=self.release,
# Performance monitoring disabled by default
traces_sample_rate=0.0,
# Only capture errors, not transactions
# profiles_sample_rate=0.0,
# Minimal integrations
integrations=[
StdlibIntegration(),
ExcepthookIntegration(always_run=False),
],
# Privacy settings
send_default_pii=False,
attach_stacktrace=True,
# Before send hook for additional sanitization
before_send=self._before_send,
# Disable automatic breadcrumbs
max_breadcrumbs=0,
# Disable request data collection
# request_bodies='never',
# # Custom transport options
# transport_options={
# 'keepalive': True,
# },
)
self._sentry_sdk = sentry_sdk
self._initialized = True
return True
except ImportError:
# Sentry SDK not installed
return False
except Exception:
# Initialization failed silently
return False
def _before_send(self, event: Dict[str, Any], hint: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Process event before sending to Sentry.
Provides additional privacy protection.
"""
# Remove sensitive data
if 'request' in event:
event['request'] = self._sanitize_request(event['request'])
# Remove local variables that might contain sensitive data
if 'exception' in event and 'values' in event['exception']:
for exc in event['exception']['values']:
if 'stacktrace' in exc and 'frames' in exc['stacktrace']:
for frame in exc['stacktrace']['frames']:
# Remove local variables from frames
frame.pop('vars', None)
# Apply general sanitization
event = self.sanitize_data(event)
return event
def _sanitize_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize request data to remove sensitive information."""
sanitized = request_data.copy()
# Remove sensitive fields
sensitive_fields = ['cookies', 'headers', 'data', 'query_string', 'env']
for field in sensitive_fields:
if field in sanitized:
sanitized[field] = '[REDACTED]'
# Keep only safe fields
safe_fields = ['method', 'url']
return {k: v for k, v in sanitized.items() if k in safe_fields}
def send_exception(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send exception to Sentry.
Args:
exc: Exception to report
context: Optional context (email, environment info)
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
with self._sentry_sdk.push_scope() as scope:
# Add user context if email provided
if context and 'email' in context:
scope.set_user({'email': context['email']})
# Add additional context
if context:
for key, value in context.items():
if key != 'email':
scope.set_context(key, value)
# Add tags for filtering
scope.set_tag('source', context.get('source', 'unknown'))
scope.set_tag('environment_type', context.get('environment_type', 'unknown'))
# Capture the exception
self._sentry_sdk.capture_exception(exc)
return True
except Exception:
# Silently fail - telemetry should never crash the app
return False
return False
def send_event(
self,
event_name: str,
payload: Optional[Dict[str, Any]] = None
) -> bool:
"""
Send custom event to Sentry.
Args:
event_name: Name of the event
payload: Event data
Returns:
True if sent successfully
"""
if not self._initialized:
if not self.initialize():
return False
try:
if self._sentry_sdk:
# Sanitize payload
safe_payload = self.sanitize_data(payload) if payload else {}
# Send as a message with extra data
self._sentry_sdk.capture_message(
event_name,
level='info',
extras=safe_payload
)
return True
except Exception:
return False
return False
def flush(self) -> None:
"""Flush pending events to Sentry."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
except Exception:
pass
def shutdown(self) -> None:
"""Shutdown Sentry client."""
if self._initialized and self._sentry_sdk:
try:
self._sentry_sdk.flush(timeout=2.0)
# Note: sentry_sdk doesn't have a shutdown method
# Flush is sufficient for cleanup
except Exception:
pass
finally:
self._initialized = False

View File

@@ -1790,6 +1790,10 @@ def perform_completion_with_backoff(
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
@@ -2146,7 +2150,9 @@ def normalize_url(
drop_query_tracking=True,
sort_query=True,
keep_fragment=False,
extra_drop_params=None
extra_drop_params=None,
preserve_https=False,
original_scheme=None
):
"""
Extended URL normalizer
@@ -2176,6 +2182,17 @@ def normalize_url(
# Resolve relative paths first
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse once, edit parts, then rebuild
parsed = urlparse(full_url)
@@ -2184,8 +2201,10 @@ def normalize_url(
netloc = parsed.netloc.lower()
# ── path ──
# Strip duplicate slashes and trailing “/” (except root)
path = quote(unquote(parsed.path))
# Strip duplicate slashes and trailing "/" (except root)
# IMPORTANT: Don't use quote(unquote()) as it mangles + signs in URLs
# The path from urlparse is already properly encoded
path = parsed.path
if path.endswith('/') and path != '/':
path = path.rstrip('/')
@@ -2225,7 +2244,7 @@ def normalize_url(
return normalized
def normalize_url_for_deep_crawl(href, base_url):
def normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
"""Normalize URLs to ensure consistent format"""
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
@@ -2236,6 +2255,17 @@ def normalize_url_for_deep_crawl(href, base_url):
# Use urljoin to handle relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Parse the URL for normalization
parsed = urlparse(full_url)
@@ -2273,7 +2303,7 @@ def normalize_url_for_deep_crawl(href, base_url):
return normalized
@lru_cache(maxsize=10000)
def efficient_normalize_url_for_deep_crawl(href, base_url):
def efficient_normalize_url_for_deep_crawl(href, base_url, preserve_https=False, original_scheme=None):
"""Efficient URL normalization with proper parsing"""
from urllib.parse import urljoin
@@ -2283,6 +2313,17 @@ def efficient_normalize_url_for_deep_crawl(href, base_url):
# Resolve relative URLs
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested and original scheme was HTTPS
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only preserve HTTPS for same-domain links (not protocol-relative URLs)
# Protocol-relative URLs (//example.com) should follow the base URL's scheme
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
# Use proper URL parsing
parsed = urlparse(full_url)

View File

@@ -10,4 +10,23 @@ GEMINI_API_TOKEN=your_gemini_key_here
# Optional: Override the default LLM provider
# Examples: "openai/gpt-4", "anthropic/claude-3-opus", "deepseek/chat", etc.
# If not set, uses the provider specified in config.yml (default: openai/gpt-4o-mini)
# LLM_PROVIDER=anthropic/claude-3-opus
# LLM_PROVIDER=anthropic/claude-3-opus
# Optional: Global LLM temperature setting (0.0-2.0)
# Controls randomness in responses. Lower = more focused, Higher = more creative
# LLM_TEMPERATURE=0.7
# Optional: Global custom API base URL
# Use this to point to custom endpoints or proxy servers
# LLM_BASE_URL=https://api.custom.com/v1
# Optional: Provider-specific temperature overrides
# These take precedence over the global LLM_TEMPERATURE
# OPENAI_TEMPERATURE=0.5
# ANTHROPIC_TEMPERATURE=0.3
# GROQ_TEMPERATURE=0.8
# Optional: Provider-specific base URL overrides
# Use for provider-specific proxy endpoints
# OPENAI_BASE_URL=https://custom-openai.company.com/v1
# GROQ_BASE_URL=https://custom-groq.company.com/v1

View File

@@ -692,8 +692,7 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# api_key: sk-... # If you pass the API key directly (not recommended)
# Redis Configuration (Used by internal Redis server managed by supervisord)
redis:

View File

@@ -4,7 +4,7 @@ import asyncio
from typing import List, Tuple, Dict
from functools import partial
from uuid import uuid4
from datetime import datetime
from datetime import datetime, timezone
from base64 import b64encode
import logging
@@ -42,7 +42,9 @@ from utils import (
should_cleanup_task,
decode_redis_hash,
get_llm_api_key,
validate_llm_provider
validate_llm_provider,
get_llm_temperature,
get_llm_base_url
)
import psutil, time
@@ -96,7 +98,9 @@ async def handle_llm_qa(
response = perform_completion_with_backoff(
provider=config["llm"]["provider"],
prompt_with_variables=prompt,
api_token=get_llm_api_key(config)
api_token=get_llm_api_key(config), # Returns None to let litellm handle it
temperature=get_llm_temperature(config),
base_url=get_llm_base_url(config)
)
return response.choices[0].message.content
@@ -115,7 +119,9 @@ async def process_llm_extraction(
instruction: str,
schema: Optional[str] = None,
cache: str = "0",
provider: Optional[str] = None
provider: Optional[str] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
) -> None:
"""Process LLM extraction in background."""
try:
@@ -127,11 +133,13 @@ async def process_llm_extraction(
"error": error_msg
})
return
api_key = get_llm_api_key(config, provider)
api_key = get_llm_api_key(config, provider) # Returns None to let litellm handle it
llm_strategy = LLMExtractionStrategy(
llm_config=LLMConfig(
provider=provider or config["llm"]["provider"],
api_token=api_key
api_token=api_key,
temperature=temperature or get_llm_temperature(config, provider),
base_url=base_url or get_llm_base_url(config, provider)
),
instruction=instruction,
schema=json.loads(schema) if schema else None,
@@ -178,7 +186,9 @@ async def handle_markdown_request(
query: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None
provider: Optional[str] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
) -> str:
"""Handle markdown generation requests."""
try:
@@ -203,7 +213,9 @@ async def handle_markdown_request(
FilterType.LLM: LLMContentFilter(
llm_config=LLMConfig(
provider=provider or config["llm"]["provider"],
api_token=get_llm_api_key(config, provider),
api_token=get_llm_api_key(config, provider), # Returns None to let litellm handle it
temperature=temperature or get_llm_temperature(config, provider),
base_url=base_url or get_llm_base_url(config, provider)
),
instruction=query or "Extract main content"
)
@@ -248,7 +260,9 @@ async def handle_llm_request(
schema: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None
provider: Optional[str] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
"""Handle LLM extraction requests."""
base_url = get_base_url(request)
@@ -279,7 +293,9 @@ async def handle_llm_request(
cache,
base_url,
config,
provider
provider,
temperature,
api_base_url
)
except Exception as e:
@@ -324,7 +340,9 @@ async def create_new_task(
cache: str,
base_url: str,
config: dict,
provider: Optional[str] = None
provider: Optional[str] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
"""Create and initialize a new task."""
decoded_url = unquote(input_path)
@@ -349,7 +367,9 @@ async def create_new_task(
query,
schema,
cache,
provider
provider,
temperature,
api_base_url
)
return JSONResponse({
@@ -393,6 +413,9 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# Ensure fit_html is JSON-serializable
if "fit_html" in result_dict and not (result_dict["fit_html"] is None or isinstance(result_dict["fit_html"], str)):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -473,6 +496,9 @@ async def handle_crawl_request(
processed_results = []
for result in results:
result_dict = result.model_dump()
# if fit_html is not a string, set it to None to avoid serialization errors
if "fit_html" in result_dict and not (result_dict["fit_html"] is None or isinstance(result_dict["fit_html"], str)):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -576,7 +602,7 @@ async def handle_crawl_job(
task_id = f"crawl_{uuid4().hex[:8]}"
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.utcnow().isoformat(),
"created_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",

View File

@@ -28,25 +28,43 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -
signing_key = get_jwk_from_secret(SECRET_KEY)
return instance.encode(to_encode, signing_key, alg='HS256')
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
"""Verify the JWT token from the Authorization header."""
if credentials is None:
return None
if not credentials or not credentials.credentials:
raise HTTPException(
status_code=401,
detail="No token provided",
headers={"WWW-Authenticate": "Bearer"}
)
token = credentials.credentials
verifying_key = get_jwk_from_secret(SECRET_KEY)
try:
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
return payload
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Invalid or expired token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
def get_token_dependency(config: Dict):
"""Return the token dependency if JWT is enabled, else a function that returns None."""
if config.get("security", {}).get("jwt_enabled", False):
return verify_token
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Enforce JWT authentication when enabled."""
if credentials is None:
raise HTTPException(
status_code=401,
detail="Authentication required. Please provide a valid Bearer token.",
headers={"WWW-Authenticate": "Bearer"}
)
return verify_token(credentials)
return jwt_required
else:
return lambda: None

View File

@@ -7520,17 +7520,18 @@ class BrowserManager:
)
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.config.proxy or self.config.proxy_config:
if self.config.proxy:
warnings.warn(
"BrowserConfig.proxy is deprecated and ignored. Use proxy_config instead.",
DeprecationWarning,
)
if self.config.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = (
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
proxy_settings = ProxySettings(
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
browser_args["proxy"] = proxy_settings

View File

@@ -2241,7 +2241,7 @@ docker build -t crawl4ai
| Argument | Description | Default | Options |
|----------|-------------|---------|----------|
| PYTHON_VERSION | Python version | 3.10 | 3.8, 3.9, 3.10 |
| PYTHON_VERSION | Python version | 3.10 | 3.10, 3.11, 3.12, 3.13 |
| INSTALL_TYPE | Feature set | default | default, all, torch, transformer |
| ENABLE_GPU | GPU support | false | true, false |
| APP_HOME | Install path | /app | any valid path |

View File

@@ -11,8 +11,7 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini"
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# api_key: sk-... # If you pass the API key directly (not recommended)
# Redis Configuration
redis:
@@ -39,8 +38,8 @@ rate_limiting:
# Security Configuration
security:
enabled: false
jwt_enabled: false
enabled: false
jwt_enabled: false
https_redirect: false
trusted_hosts: ["*"]
headers:

View File

@@ -37,6 +37,8 @@ class LlmJobPayload(BaseModel):
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
temperature: Optional[float] = None
base_url: Optional[str] = None
class CrawlJobPayload(BaseModel):
@@ -63,6 +65,8 @@ async def llm_job_enqueue(
cache=payload.cache,
config=_config,
provider=payload.provider,
temperature=payload.temperature,
api_base_url=payload.base_url,
)
@@ -72,7 +76,7 @@ async def llm_job_status(
task_id: str,
_td: Dict = Depends(lambda: _token_dep())
):
return await handle_task_status(_redis, task_id)
return await handle_task_status(_redis, task_id, base_url=str(request.base_url))
# ---------- CRAWL job -------------------------------------------------------

View File

@@ -15,4 +15,3 @@ PyJWT==2.10.1
mcp>=1.6.0
websockets>=15.0.1
httpx[http2]>=0.27.2
sentry-sdk>=2.0.0

View File

@@ -16,6 +16,8 @@ class MarkdownRequest(BaseModel):
q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters")
c: Optional[str] = Field("0", description="Cachebust / revision counter")
provider: Optional[str] = Field(None, description="LLM provider override (e.g., 'anthropic/claude-3-opus')")
temperature: Optional[float] = Field(None, description="LLM temperature override (0.0-2.0)")
base_url: Optional[str] = Field(None, description="LLM API base URL override")
class RawCode(BaseModel):

View File

@@ -74,32 +74,6 @@ setup_logging(config)
__version__ = "0.5.1-d1"
# ───────────────────── telemetry setup ────────────────────────
# Docker/API server telemetry: enabled by default unless CRAWL4AI_TELEMETRY=0
import os as _os
if _os.environ.get('CRAWL4AI_TELEMETRY') != '0':
# Set environment variable to indicate we're in API server mode
_os.environ['CRAWL4AI_API_SERVER'] = 'true'
# Import and enable telemetry for Docker/API environment
from crawl4ai.telemetry import enable as enable_telemetry
from crawl4ai.telemetry import capture_exception
# Enable telemetry automatically in Docker mode
enable_telemetry(always=True)
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("✅ Telemetry enabled for Docker/API server")
else:
# Define no-op for capture_exception if telemetry is disabled
def capture_exception(exc, context=None):
pass
import logging
telemetry_logger = logging.getLogger("telemetry")
telemetry_logger.info("❌ Telemetry disabled via CRAWL4AI_TELEMETRY=0")
# ── global page semaphore (hard cap) ─────────────────────────
MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30)
GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
@@ -267,7 +241,8 @@ async def get_markdown(
raise HTTPException(
400, "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)")
markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config, body.provider
body.url, body.f, body.q, body.c, config, body.provider,
body.temperature, body.base_url
)
return JSONResponse({
"url": body.url,
@@ -292,12 +267,26 @@ async def generate_html(
Use when you need sanitized HTML structures for building schemas or further processing.
"""
cfg = CrawlerRunConfig()
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
try:
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
# Check if the crawl was successful
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
except Exception as e:
# Log and raise as HTTP 500 for other exceptions
raise HTTPException(
status_code=500,
detail=str(e)
)
# Screenshot endpoint
@@ -315,18 +304,29 @@ async def generate_screenshot(
Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot.
Then in result instead of the screenshot you will get a path to the saved file.
"""
cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
try:
cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
# PDF endpoint
@@ -344,17 +344,28 @@ async def generate_pdf(
Use when you need a printable or archivable snapshot of the page. It is recommended to provide an output path to save the PDF.
Then in result instead of the PDF you will get a path to the saved file.
"""
cfg = CrawlerRunConfig(pdf=True)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
pdf_data = results[0].pdf
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(pdf_data)
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
try:
cfg = CrawlerRunConfig(pdf=True)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
pdf_data = results[0].pdf
if body.output_path:
abs_path = os.path.abspath(body.output_path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "wb") as f:
f.write(pdf_data)
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
@app.post("/execute_js")
@@ -410,12 +421,23 @@ async def execute_js(
```
"""
cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
# Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump()
return JSONResponse(data)
try:
cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
# Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump()
return JSONResponse(data)
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
@app.get("/llm/{url:path}")
@@ -460,16 +482,24 @@ async def crawl(
):
"""
Crawl a list of URLs and return the results as JSON.
For streaming responses, use /crawl/stream endpoint.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
res = await handle_crawl_request(
# Check whether it is a redirection for a streaming request
crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config)
if crawler_config.stream:
return await stream_process(crawl_request=crawl_request)
results = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
return JSONResponse(res)
# check if all of the results are not successful
if all(not result["success"] for result in results["results"]):
raise HTTPException(500, f"Crawl request failed: {results['results'][0]['error_message']}")
return JSONResponse(results)
@app.post("/crawl/stream")
@@ -481,12 +511,16 @@ async def crawl_stream(
):
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
return await stream_process(crawl_request=crawl_request)
async def stream_process(crawl_request: CrawlRequest):
crawler, gen = await handle_stream_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
)
return StreamingResponse(
stream_results(crawler, gen),
media_type="application/x-ndjson",

View File

@@ -371,7 +371,7 @@
<div class="flex items-center">
<input id="st-stream" type="checkbox" class="mr-2">
<label for="st-stream" class="text-sm">Use /crawl/stream</label>
<label for="st-stream" class="text-sm">Enable streaming mode</label>
<button id="st-run"
class="ml-auto bg-accent text-dark px-4 py-2 rounded hover:bg-opacity-90 font-medium">
Run Stress Test
@@ -596,6 +596,14 @@
forceHighlightElement(curlCodeEl);
}
// Detect if stream is requested inside payload
function shouldUseStream(payload) {
const toBool = (v) => v === true || (typeof v === 'string' && v.toLowerCase() === 'true');
const fromCrawler = payload && payload.crawler_config && payload.crawler_config.params && payload.crawler_config.params.stream;
const direct = payload && payload.stream;
return toBool(fromCrawler) || toBool(direct);
}
// Main run function
async function runCrawl() {
const endpoint = document.getElementById('endpoint').value;
@@ -611,16 +619,24 @@
: { browser_config: cfgJson };
}
} catch (err) {
updateStatus('error');
document.querySelector('#response-content code').textContent =
JSON.stringify({ error: err.message }, null, 2);
forceHighlightElement(document.querySelector('#response-content code'));
return; // stop run
const codeText = cm.getValue();
const streamFlag = /stream\s*=\s*True/i.test(codeText);
const isCrawlEndpoint = document.getElementById('endpoint').value === 'crawl';
if (isCrawlEndpoint && streamFlag) {
// Fallback: proceed with minimal config only for stream
advConfig = { crawler_config: { stream: true } };
} else {
updateStatus('error');
document.querySelector('#response-content code').textContent =
JSON.stringify({ error: err.message }, null, 2);
forceHighlightElement(document.querySelector('#response-content code'));
return; // stop run
}
}
const endpointMap = {
crawl: '/crawl',
// crawl_stream: '/crawl/stream',
crawl_stream: '/crawl/stream', // Keep for backward compatibility
md: '/md',
llm: '/llm'
};
@@ -647,7 +663,7 @@
// This will be handled directly in the fetch below
payload = null;
} else {
// Default payload for /crawl and /crawl/stream
// Default payload for /crawl (supports both streaming and batch modes)
payload = {
urls,
...advConfig
@@ -659,6 +675,7 @@
try {
const startTime = performance.now();
let response, responseData;
const useStreamOverride = (endpoint === 'crawl') && shouldUseStream(payload);
if (endpoint === 'llm') {
// Special handling for LLM endpoint which uses URL pattern: /llm/{encoded_url}?q={query}
@@ -681,8 +698,8 @@
document.querySelector('#response-content code').textContent = JSON.stringify(responseData, null, 2);
document.querySelector('#response-content code').className = 'json hljs';
forceHighlightElement(document.querySelector('#response-content code'));
} else if (endpoint === 'crawl_stream') {
// Stream processing
} else if (endpoint === 'crawl_stream' || useStreamOverride) {
// Stream processing - now handled directly by /crawl endpoint
response = await fetch(api, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -757,6 +774,7 @@
const question = document.getElementById('llm-question').value.trim() || "What is this page about?";
generateSnippets(`${api}/${encodedUrl}?q=${encodeURIComponent(question)}`, null, 'GET');
} else {
// Use the same API endpoint for both streaming and non-streaming
generateSnippets(api, payload);
}
} catch (error) {
@@ -786,7 +804,7 @@
document.getElementById('stress-avg-time').textContent = '0';
document.getElementById('stress-peak-mem').textContent = '0';
const api = useStream ? '/crawl/stream' : '/crawl';
const api = '/crawl'; // Always use /crawl - backend handles streaming internally
const urls = Array.from({ length: total }, (_, i) => `https://httpbin.org/anything/stress-${i}-${Date.now()}`);
const chunks = [];

View File

@@ -71,7 +71,7 @@ def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> str:
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]:
"""Get the appropriate API key based on the LLM provider.
Args:
@@ -79,19 +79,14 @@ def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> str:
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The API key for the provider, or empty string if not found
The API key if directly configured, otherwise None to let litellm handle it
"""
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Check if direct API key is configured
# Check if direct API key is configured (for backward compatibility)
if "api_key" in config["llm"]:
return config["llm"]["api_key"]
# Fall back to the configured api_key_env if no match
return os.environ.get(config["llm"].get("api_key_env", ""), "")
# Return None - litellm will automatically find the right environment variable
return None
def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple[bool, str]:
@@ -104,19 +99,78 @@ def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple
Returns:
Tuple of (is_valid, error_message)
"""
# Use provided provider or fall back to config
if not provider:
provider = config["llm"]["provider"]
# Get the API key for this provider
api_key = get_llm_api_key(config, provider)
if not api_key:
return False, f"No API key found for provider '{provider}'. Please set the appropriate environment variable."
# If a direct API key is configured, validation passes
if "api_key" in config["llm"]:
return True, ""
# Otherwise, trust that litellm will find the appropriate environment variable
# We can't easily validate this without reimplementing litellm's logic
return True, ""
def get_llm_temperature(config: Dict, provider: Optional[str] = None) -> Optional[float]:
"""Get temperature setting based on the LLM provider.
Priority order:
1. Provider-specific environment variable (e.g., OPENAI_TEMPERATURE)
2. Global LLM_TEMPERATURE environment variable
3. None (to use litellm/provider defaults)
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The temperature setting if configured, otherwise None
"""
# Check provider-specific temperature first
if provider:
provider_name = provider.split('/')[0].upper()
provider_temp = os.environ.get(f"{provider_name}_TEMPERATURE")
if provider_temp:
try:
return float(provider_temp)
except ValueError:
logging.warning(f"Invalid temperature value for {provider_name}: {provider_temp}")
# Check global LLM_TEMPERATURE
global_temp = os.environ.get("LLM_TEMPERATURE")
if global_temp:
try:
return float(global_temp)
except ValueError:
logging.warning(f"Invalid global temperature value: {global_temp}")
# Return None to use litellm/provider defaults
return None
def get_llm_base_url(config: Dict, provider: Optional[str] = None) -> Optional[str]:
"""Get base URL setting based on the LLM provider.
Priority order:
1. Provider-specific environment variable (e.g., OPENAI_BASE_URL)
2. Global LLM_BASE_URL environment variable
3. None (to use default endpoints)
Args:
config: The application configuration dictionary
provider: Optional provider override (e.g., "openai/gpt-4")
Returns:
The base URL if configured, otherwise None
"""
# Check provider-specific base URL first
if provider:
provider_name = provider.split('/')[0].upper()
provider_url = os.environ.get(f"{provider_name}_BASE_URL")
if provider_url:
return provider_url
# Check global LLM_BASE_URL
return os.environ.get("LLM_BASE_URL")
def verify_email_domain(email: str) -> bool:
try:
domain = email.split('@')[1]

View File

@@ -0,0 +1,154 @@
import asyncio
import os
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig, LLMConfig
async def test_configuration(name: str, config: AdaptiveConfig, url: str, query: str):
"""Test a specific configuration"""
print(f"\n{'='*60}")
print(f"Configuration: {name}")
print(f"{'='*60}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
result = await adaptive.digest(start_url=url, query=query)
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
print(f"\n{'='*50}")
print(f"Pages crawled: {len(result.crawled_urls)}")
print(f"Final confidence: {adaptive.confidence:.1%}")
print(f"Stopped reason: {result.metrics.get('stopped_reason', 'max_pages')}")
if result.metrics.get('is_irrelevant', False):
print("⚠️ Query detected as irrelevant!")
return result
async def llm_embedding():
"""Demonstrate various embedding configurations"""
print("EMBEDDING STRATEGY CONFIGURATION EXAMPLES")
print("=" * 60)
# Base URL and query for testing
test_url = "https://docs.python.org/3/library/asyncio.html"
openai_llm_config = LLMConfig(
provider='openai/text-embedding-3-small',
api_token=os.getenv('OPENAI_API_KEY'),
temperature=0.7,
max_tokens=2000
)
config_openai = AdaptiveConfig(
strategy="embedding",
max_pages=10,
# Use OpenAI embeddings
embedding_llm_config=openai_llm_config,
# embedding_llm_config={
# 'provider': 'openai/text-embedding-3-small',
# 'api_token': os.getenv('OPENAI_API_KEY')
# },
# OpenAI embeddings are high quality, can be stricter
embedding_k_exp=4.0,
n_query_variations=12
)
await test_configuration(
"OpenAI Embeddings",
config_openai,
test_url,
# "event-driven architecture patterns"
"async await context managers coroutines"
)
return
async def basic_adaptive_crawling():
"""Basic adaptive crawling example"""
# Initialize the crawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Create an adaptive crawler with default settings (statistical strategy)
adaptive = AdaptiveCrawler(crawler)
# Note: You can also use embedding strategy for semantic understanding:
# from crawl4ai import AdaptiveConfig
# config = AdaptiveConfig(strategy="embedding")
# adaptive = AdaptiveCrawler(crawler, config)
# Start adaptive crawling
print("Starting adaptive crawl for Python async programming information...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/library/asyncio.html",
query="async await context managers coroutines"
)
# Display crawl statistics
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
# Show final confidence
print(f"\n{'='*50}")
print(f"Final Confidence: {adaptive.confidence:.2%}")
print(f"Total Pages Crawled: {len(result.crawled_urls)}")
print(f"Knowledge Base Size: {len(adaptive.state.knowledge_base)} documents")
if adaptive.confidence >= 0.8:
print("✓ High confidence - can answer detailed questions about async Python")
elif adaptive.confidence >= 0.6:
print("~ Moderate confidence - can answer basic questions")
else:
print("✗ Low confidence - need more information")
if __name__ == "__main__":
asyncio.run(llm_embedding())
# asyncio.run(basic_adaptive_crawling())

221
docs/examples/website-to-api/.gitignore vendored Normal file
View File

@@ -0,0 +1,221 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
#directories
models
schemas
saved_requests

View File

@@ -0,0 +1,252 @@
# Web Scraper API with Custom Model Support
A powerful web scraping API that converts any website into structured data using AI. Features a beautiful minimalist frontend interface and support for custom LLM models!
## Features
- **AI-Powered Scraping**: Provide a URL and plain English query to extract structured data
- **Beautiful Frontend**: Modern minimalist black-and-white interface with smooth UX
- **Custom Model Support**: Use any LLM provider (OpenAI, Gemini, Anthropic, etc.) with your own API keys
- **Model Management**: Save, list, and manage multiple model configurations via web interface
- **Dual Scraping Approaches**: Choose between Schema-based (faster) or LLM-based (more flexible) extraction
- **API Request History**: Automatic saving and display of all API requests with cURL commands
- **Schema Caching**: Intelligent caching of generated schemas for faster subsequent requests
- **Duplicate Prevention**: Avoids saving duplicate requests (same URL + query)
- **RESTful API**: Easy-to-use HTTP endpoints for all operations
## Quick Start
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Start the API Server
```bash
python app.py
```
The server will start on `http://localhost:8000` with a beautiful web interface!
### 3. Using the Web Interface
Once the server is running, open your browser and go to `http://localhost:8000` to access the modern web interface!
#### Pages:
- **Scrape Data**: Enter URLs and queries to extract structured data
- **Models**: Manage your AI model configurations (add, list, delete)
- **API Requests**: View history of all scraping requests with cURL commands
#### Features:
- **Minimalist Design**: Clean black-and-white theme inspired by modern web apps
- **Real-time Results**: See extracted data in formatted JSON
- **Copy to Clipboard**: Easy copying of results
- **Toast Notifications**: User-friendly feedback
- **Dual Scraping Modes**: Choose between Schema-based and LLM-based approaches
## Model Management
### Adding Models via Web Interface
1. Go to the **Models** page
2. Enter your model details:
- **Provider**: LLM provider (e.g., `gemini/gemini-2.5-flash`, `openai/gpt-4o`)
- **API Token**: Your API key for the provider
3. Click "Add Model"
### API Usage for Model Management
#### Save a Model Configuration
```bash
curl -X POST "http://localhost:8000/models" \
-H "Content-Type: application/json" \
-d '{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}'
```
#### List Saved Models
```bash
curl -X GET "http://localhost:8000/models"
```
#### Delete a Model Configuration
```bash
curl -X DELETE "http://localhost:8000/models/my-gemini"
```
## Scraping Approaches
### 1. Schema-based Scraping (Faster)
- Generates CSS selectors for targeted extraction
- Caches schemas for repeated requests
- Faster execution for structured websites
### 2. LLM-based Scraping (More Flexible)
- Direct LLM extraction without schema generation
- More flexible for complex or dynamic content
- Better for unstructured data extraction
## Supported LLM Providers
The API supports any LLM provider that crawl4ai supports, including:
- **Google Gemini**: `gemini/gemini-2.5-flash`, `gemini/gemini-pro`
- **OpenAI**: `openai/gpt-4`, `openai/gpt-3.5-turbo`
- **Anthropic**: `anthropic/claude-3-opus`, `anthropic/claude-3-sonnet`
- **And more...**
## API Endpoints
### Core Endpoints
- `POST /scrape` - Schema-based scraping
- `POST /scrape-with-llm` - LLM-based scraping
- `GET /schemas` - List cached schemas
- `POST /clear-cache` - Clear schema cache
- `GET /health` - Health check
### Model Management Endpoints
- `GET /models` - List saved model configurations
- `POST /models` - Save a new model configuration
- `DELETE /models/{model_name}` - Delete a model configuration
### API Request History
- `GET /saved-requests` - List all saved API requests
- `DELETE /saved-requests/{request_id}` - Delete a saved request
## Request/Response Examples
### Scrape Request
```json
{
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"model_name": "my-custom-model"
}
```
### Scrape Response
```json
{
"success": true,
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"extracted_data": {
"product_name": "Example Product",
"price": "$99.99",
"description": "This is an example product description"
},
"schema_used": { ... },
"timestamp": "2024-01-01T12:00:00Z"
}
```
### Model Configuration Request
```json
{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}
```
## Testing
Run the test script to verify the model management functionality:
```bash
python test_models.py
```
## File Structure
```
parse_example/
├── api_server.py # FastAPI server with all endpoints
├── web_scraper_lib.py # Core scraping library
├── test_models.py # Test script for model management
├── requirements.txt # Dependencies
├── static/ # Frontend files
│ ├── index.html # Main HTML interface
│ ├── styles.css # CSS styles (minimalist theme)
│ └── script.js # JavaScript functionality
├── schemas/ # Cached schemas
├── models/ # Saved model configurations
├── saved_requests/ # API request history
└── README.md # This file
```
## Advanced Usage
### Using the Library Directly
```python
from web_scraper_lib import WebScraperAgent
# Initialize agent
agent = WebScraperAgent()
# Save a model configuration
agent.save_model_config(
model_name="my-model",
provider="openai/gpt-4",
api_token="your-api-key"
)
# Schema-based scraping
result = await agent.scrape_data(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
# LLM-based scraping
result = await agent.scrape_data_with_llm(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
```
### Schema Caching
The system automatically caches generated schemas based on URL and query combinations:
- **First request**: Generates schema using AI
- **Subsequent requests**: Uses cached schema for faster extraction
### API Request History
All API requests are automatically saved with:
- Request details (URL, query, model used)
- Response data
- Timestamp
- cURL command for re-execution
### Duplicate Prevention
The system prevents saving duplicate requests:
- Same URL + query combinations are not saved multiple times
- Returns existing request ID for duplicates
- Keeps the API request history clean
## Error Handling
The API provides detailed error messages for common issues:
- Invalid URLs
- Missing model configurations
- API key errors
- Network timeouts
- Parsing errors

View File

@@ -0,0 +1,363 @@
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import Dict, Any, Optional, Union, List
import uvicorn
import asyncio
import os
import json
from datetime import datetime
from web_scraper_lib import WebScraperAgent, scrape_website
app = FastAPI(
title="Web Scraper API",
description="Convert any website into a structured data API. Provide a URL and tell AI what data you need in plain English.",
version="1.0.0"
)
# Mount static files
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
# Mount assets directory
if os.path.exists("assets"):
app.mount("/assets", StaticFiles(directory="assets"), name="assets")
# Initialize the scraper agent
scraper_agent = WebScraperAgent()
# Create directory for saved API requests
os.makedirs("saved_requests", exist_ok=True)
class ScrapeRequest(BaseModel):
url: HttpUrl
query: str
model_name: Optional[str] = None
class ModelConfigRequest(BaseModel):
model_name: str
provider: str
api_token: str
class ScrapeResponse(BaseModel):
success: bool
url: str
query: str
extracted_data: Union[Dict[str, Any], list]
schema_used: Optional[Dict[str, Any]] = None
timestamp: Optional[str] = None
error: Optional[str] = None
class SavedApiRequest(BaseModel):
id: str
endpoint: str
method: str
headers: Dict[str, str]
body: Dict[str, Any]
timestamp: str
response: Optional[Dict[str, Any]] = None
def save_api_request(endpoint: str, method: str, headers: Dict[str, str], body: Dict[str, Any], response: Optional[Dict[str, Any]] = None) -> str:
"""Save an API request to a JSON file."""
# Check for duplicate requests (same URL and query)
if endpoint in ["/scrape", "/scrape-with-llm"] and "url" in body and "query" in body:
existing_requests = get_saved_requests()
for existing_request in existing_requests:
if (existing_request.endpoint == endpoint and
existing_request.body.get("url") == body["url"] and
existing_request.body.get("query") == body["query"]):
print(f"Duplicate request found for URL: {body['url']} and query: {body['query']}")
return existing_request.id # Return existing request ID instead of creating new one
request_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
saved_request = SavedApiRequest(
id=request_id,
endpoint=endpoint,
method=method,
headers=headers,
body=body,
timestamp=datetime.now().isoformat(),
response=response
)
file_path = os.path.join("saved_requests", f"{request_id}.json")
with open(file_path, "w") as f:
json.dump(saved_request.dict(), f, indent=2)
return request_id
def get_saved_requests() -> List[SavedApiRequest]:
"""Get all saved API requests."""
requests = []
if os.path.exists("saved_requests"):
for filename in os.listdir("saved_requests"):
if filename.endswith('.json'):
file_path = os.path.join("saved_requests", filename)
try:
with open(file_path, "r") as f:
data = json.load(f)
requests.append(SavedApiRequest(**data))
except Exception as e:
print(f"Error loading saved request {filename}: {e}")
# Sort by timestamp (newest first)
requests.sort(key=lambda x: x.timestamp, reverse=True)
return requests
@app.get("/")
async def root():
"""Serve the frontend interface."""
if os.path.exists("static/index.html"):
return FileResponse("static/index.html")
else:
return {
"message": "Web Scraper API",
"description": "Convert any website into structured data with AI",
"endpoints": {
"/scrape": "POST - Scrape data from a website",
"/schemas": "GET - List cached schemas",
"/clear-cache": "POST - Clear schema cache",
"/models": "GET - List saved model configurations",
"/models": "POST - Save a new model configuration",
"/models/{model_name}": "DELETE - Delete a model configuration",
"/saved-requests": "GET - List saved API requests"
}
}
@app.post("/scrape", response_model=ScrapeResponse)
async def scrape_website_endpoint(request: ScrapeRequest):
"""
Scrape structured data from any website.
This endpoint:
1. Takes a URL and plain English query
2. Generates a custom scraper using AI
3. Returns structured data
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
schema_used=result["schema_used"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.post("/scrape-with-llm", response_model=ScrapeResponse)
async def scrape_website_endpoint_with_llm(request: ScrapeRequest):
"""
Scrape structured data from any website using a custom LLM model.
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data_with_llm(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.get("/saved-requests")
async def list_saved_requests():
"""List all saved API requests."""
try:
requests = get_saved_requests()
return {
"success": True,
"requests": [req.dict() for req in requests],
"count": len(requests)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list saved requests: {str(e)}")
@app.delete("/saved-requests/{request_id}")
async def delete_saved_request(request_id: str):
"""Delete a saved API request."""
try:
file_path = os.path.join("saved_requests", f"{request_id}.json")
if os.path.exists(file_path):
os.remove(file_path)
return {
"success": True,
"message": f"Saved request '{request_id}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Saved request '{request_id}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete saved request: {str(e)}")
@app.get("/schemas")
async def list_cached_schemas():
"""List all cached schemas."""
try:
schemas = await scraper_agent.get_cached_schemas()
return {
"success": True,
"cached_schemas": schemas,
"count": len(schemas)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list schemas: {str(e)}")
@app.post("/clear-cache")
async def clear_schema_cache():
"""Clear all cached schemas."""
try:
scraper_agent.clear_cache()
return {
"success": True,
"message": "Schema cache cleared successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@app.get("/models")
async def list_models():
"""List all saved model configurations."""
try:
models = scraper_agent.list_saved_models()
return {
"success": True,
"models": models,
"count": len(models)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list models: {str(e)}")
@app.post("/models")
async def save_model_config(request: ModelConfigRequest):
"""Save a new model configuration."""
try:
success = scraper_agent.save_model_config(
model_name=request.model_name,
provider=request.provider,
api_token=request.api_token
)
if success:
return {
"success": True,
"message": f"Model configuration '{request.model_name}' saved successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to save model configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save model: {str(e)}")
@app.delete("/models/{model_name}")
async def delete_model_config(model_name: str):
"""Delete a model configuration."""
try:
success = scraper_agent.delete_model_config(model_name)
if success:
return {
"success": True,
"message": f"Model configuration '{model_name}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Model configuration '{model_name}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete model: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "web-scraper-api"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
Startup script for the Web Scraper API with frontend interface.
"""
import os
import sys
import uvicorn
from pathlib import Path
def main():
# Check if static directory exists
static_dir = Path("static")
if not static_dir.exists():
print("❌ Static directory not found!")
print("Please make sure the 'static' directory exists with the frontend files.")
sys.exit(1)
# Check if required frontend files exist
required_files = ["index.html", "styles.css", "script.js"]
missing_files = []
for file in required_files:
if not (static_dir / file).exists():
missing_files.append(file)
if missing_files:
print(f"❌ Missing frontend files: {', '.join(missing_files)}")
print("Please make sure all frontend files are present in the static directory.")
sys.exit(1)
print("🚀 Starting Web Scraper API with Frontend Interface")
print("=" * 50)
print("📁 Static files found and ready to serve")
print("🌐 Frontend will be available at: http://localhost:8000")
print("🔌 API endpoints available at: http://localhost:8000/docs")
print("=" * 50)
# Start the server
uvicorn.run(
"api_server:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

View File

@@ -0,0 +1,5 @@
crawl4ai
fastapi
uvicorn
pydantic
litellm

View File

@@ -0,0 +1,201 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Web2API Example</title>
<link rel="stylesheet" href="/static/styles.css">
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
</head>
<body>
<!-- Header -->
<header class="header">
<div class="header-content">
<div class="logo">
<img src="/assets/crawl4ai_logo.jpg" alt="Crawl4AI Logo" class="logo-image">
<span>Web2API Example</span>
</div>
<nav class="nav-links">
<a href="#" class="nav-link active" data-page="scrape">Scrape</a>
<a href="#" class="nav-link" data-page="models">Models</a>
<a href="#" class="nav-link" data-page="requests">API Requests</a>
</nav>
</div>
</header>
<!-- Main Content -->
<main class="main-content">
<!-- Scrape Page -->
<div id="scrape-page" class="page active">
<div class="hero-section">
<h1 class="hero-title">Turn Any Website Into An API</h1>
<p class="hero-subtitle">This example shows how to turn any website into an API using Crawl4AI.</p>
</div>
<!-- Workflow Demonstration -->
<div class="workflow-demo">
<div class="workflow-step">
<h3 class="step-title">1. Your Request</h3>
<div class="request-box">
<div class="input-group">
<label>URL:</label>
<input type="url" id="url" name="url" placeholder="https://example-bookstore.com/new-releases" required>
</div>
<div class="input-group">
<label>QUERY:</label>
<textarea id="query" name="query" placeholder="Extract all the book titles, their authors, and the biography of the author" required></textarea>
</div>
<div class="form-options">
<div class="option-group">
<label for="scraping-approach">Approach:</label>
<select id="scraping-approach" name="scraping_approach">
<option value="llm">LLM-based (More Flexible)</option>
<option value="schema">Schema-based (Uses LLM once!)</option>
</select>
</div>
<div class="option-group">
<label for="model-select">Model:</label>
<select id="model-select" name="model_name" required>
<option value="">Select a Model</option>
</select>
</div>
</div>
<button type="submit" id="extract-btn" class="extract-btn">
<i class="fas fa-magic"></i>
Extract Data
</button>
</div>
</div>
<div class="workflow-arrow"></div>
<div class="workflow-step">
<h3 class="step-title">2. Your Instant API & Data</h3>
<div class="response-container">
<div class="api-request-box">
<label>API Request (cURL):</label>
<pre id="curl-example">curl -X POST http://localhost:8000/scrape -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'
# Or for LLM-based approach:
curl -X POST http://localhost:8000/scrape-with-llm -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'</pre>
</div>
<div class="json-response-box">
<label>JSON Response:</label>
<pre id="json-output">{
"success": true,
"extracted_data": [
{
"title": "Example Book",
"author": "John Doe",
"description": "A great book..."
}
]
}</pre>
</div>
</div>
</div>
</div>
<!-- Results Section -->
<div id="results-section" class="results-section" style="display: none;">
<div class="results-header">
<h2>Extracted Data</h2>
<button id="copy-json" class="copy-btn">
<i class="fas fa-copy"></i>
Copy JSON
</button>
</div>
<div class="results-content">
<div class="result-info">
<div class="info-item">
<span class="label">URL:</span>
<span id="result-url" class="value"></span>
</div>
<div class="info-item">
<span class="label">Query:</span>
<span id="result-query" class="value"></span>
</div>
<div class="info-item">
<span class="label">Model Used:</span>
<span id="result-model" class="value"></span>
</div>
</div>
<div class="json-display">
<pre id="actual-json-output"></pre>
</div>
</div>
</div>
<!-- Loading State -->
<div id="loading" class="loading" style="display: none;">
<div class="spinner"></div>
<p>AI is analyzing the website and extracting data...</p>
</div>
</div>
<!-- Models Page -->
<div id="models-page" class="page">
<div class="models-header">
<h1>Model Configuration</h1>
<p>Configure and manage your AI model configurations</p>
</div>
<div class="models-container">
<!-- Add New Model Form -->
<div class="model-form-section">
<h3>Add New Model</h3>
<form id="model-form" class="model-form">
<div class="form-row">
<div class="input-group">
<label for="model-name">Model Name:</label>
<input type="text" id="model-name" name="model_name" placeholder="my-gemini" required>
</div>
<div class="input-group">
<label for="provider">Provider:</label>
<input type="text" id="provider" name="provider" placeholder="gemini/gemini-2.5-flash" required>
</div>
</div>
<div class="input-group">
<label for="api-token">API Token:</label>
<input type="password" id="api-token" name="api_token" placeholder="Enter your API token" required>
</div>
<button type="submit" class="save-btn">
<i class="fas fa-save"></i>
Save Model
</button>
</form>
</div>
<!-- Saved Models List -->
<div class="saved-models-section">
<h3>Saved Models</h3>
<div id="models-list" class="models-list">
<!-- Models will be loaded here -->
</div>
</div>
</div>
</div>
<!-- API Requests Page -->
<div id="requests-page" class="page">
<div class="requests-header">
<h1>Saved API Requests</h1>
<p>View and manage your previous API requests</p>
</div>
<div class="requests-container">
<div class="requests-list" id="requests-list">
<!-- Saved requests will be loaded here -->
</div>
</div>
</div>
</main>
<!-- Toast Notifications -->
<div id="toast-container" class="toast-container"></div>
<script src="/static/script.js"></script>
</body>
</html>

View File

@@ -0,0 +1,401 @@
// API Configuration
const API_BASE_URL = 'http://localhost:8000';
// DOM Elements
const navLinks = document.querySelectorAll('.nav-link');
const pages = document.querySelectorAll('.page');
const scrapeForm = document.getElementById('scrape-form');
const modelForm = document.getElementById('model-form');
const modelSelect = document.getElementById('model-select');
const modelsList = document.getElementById('models-list');
const resultsSection = document.getElementById('results-section');
const loadingSection = document.getElementById('loading');
const copyJsonBtn = document.getElementById('copy-json');
// Navigation
navLinks.forEach(link => {
link.addEventListener('click', (e) => {
e.preventDefault();
const targetPage = link.dataset.page;
// Update active nav link
navLinks.forEach(l => l.classList.remove('active'));
link.classList.add('active');
// Show target page
pages.forEach(page => page.classList.remove('active'));
document.getElementById(`${targetPage}-page`).classList.add('active');
// Load data for the page
if (targetPage === 'models') {
loadModels();
} else if (targetPage === 'requests') {
loadSavedRequests();
}
});
});
// Scrape Form Handler
document.getElementById('extract-btn').addEventListener('click', async (e) => {
e.preventDefault();
// Scroll to results section immediately when button is clicked
document.getElementById('results-section').scrollIntoView({
behavior: 'smooth',
block: 'start'
});
const url = document.getElementById('url').value;
const query = document.getElementById('query').value;
const headless = true; // Always use headless mode
const model_name = document.getElementById('model-select').value || null;
const scraping_approach = document.getElementById('scraping-approach').value;
if (!url || !query) {
showToast('Please fill in both URL and query fields', 'error');
return;
}
if (!model_name) {
showToast('Please select a model from the dropdown or add one from the Models page', 'error');
return;
}
const data = {
url: url,
query: query,
headless: headless,
model_name: model_name
};
// Show loading state
showLoading(true);
hideResults();
try {
// Choose endpoint based on scraping approach
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const response = await fetch(`${API_BASE_URL}${endpoint}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
displayResults(result);
showToast(`Data extracted successfully using ${scraping_approach === 'llm' ? 'LLM-based' : 'Schema-based'} approach!`, 'success');
} else {
throw new Error(result.detail || 'Failed to extract data');
}
} catch (error) {
console.error('Scraping error:', error);
showToast(`Error: ${error.message}`, 'error');
} finally {
showLoading(false);
}
});
// Model Form Handler
modelForm.addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData(modelForm);
const data = {
model_name: formData.get('model_name'),
provider: formData.get('provider'),
api_token: formData.get('api_token')
};
try {
const response = await fetch(`${API_BASE_URL}/models`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
showToast('Model saved successfully!', 'success');
modelForm.reset();
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to save model');
}
} catch (error) {
console.error('Model save error:', error);
showToast(`Error: ${error.message}`, 'error');
}
});
// Copy JSON Button
copyJsonBtn.addEventListener('click', () => {
const actualJsonOutput = document.getElementById('actual-json-output');
const textToCopy = actualJsonOutput.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
showToast('JSON copied to clipboard!', 'success');
}).catch(() => {
showToast('Failed to copy JSON', 'error');
});
});
// Load Models
async function loadModels() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
displayModels(result.models);
} else {
throw new Error(result.detail || 'Failed to load models');
}
} catch (error) {
console.error('Load models error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Models
function displayModels(models) {
if (models.length === 0) {
modelsList.innerHTML = '<p style="text-align: center; color: #7f8c8d; padding: 2rem;">No models saved yet. Add your first model above!</p>';
return;
}
modelsList.innerHTML = models.map(model => `
<div class="model-card">
<div class="model-info">
<div class="model-name">${model}</div>
<div class="model-provider">Model Configuration</div>
</div>
<div class="model-actions">
<button class="btn btn-danger" onclick="deleteModel('${model}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
`).join('');
}
// Delete Model
async function deleteModel(modelName) {
if (!confirm(`Are you sure you want to delete the model "${modelName}"?`)) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/models/${modelName}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Model deleted successfully!', 'success');
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to delete model');
}
} catch (error) {
console.error('Delete model error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Load Model Select Options
async function loadModelSelect() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
// Clear existing options
modelSelect.innerHTML = '<option value="">Select a Model</option>';
// Add model options
result.models.forEach(model => {
const option = document.createElement('option');
option.value = model;
option.textContent = model;
modelSelect.appendChild(option);
});
}
} catch (error) {
console.error('Load model select error:', error);
}
}
// Display Results
function displayResults(result) {
// Update result info
document.getElementById('result-url').textContent = result.url;
document.getElementById('result-query').textContent = result.query;
document.getElementById('result-model').textContent = result.model_name || 'Default Model';
// Display JSON in the actual results section
const actualJsonOutput = document.getElementById('actual-json-output');
actualJsonOutput.textContent = JSON.stringify(result.extracted_data, null, 2);
// Don't update the sample JSON in the workflow demo - keep it as example
// Update the cURL example based on the approach used
const scraping_approach = document.getElementById('scraping-approach').value;
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const curlExample = document.getElementById('curl-example');
curlExample.textContent = `curl -X POST http://localhost:8000${endpoint} -H "Content-Type: application/json" -d '{"url": "${result.url}", "query": "${result.query}"}'`;
// Show results section
resultsSection.style.display = 'block';
resultsSection.scrollIntoView({ behavior: 'smooth' });
}
// Show/Hide Loading
function showLoading(show) {
loadingSection.style.display = show ? 'block' : 'none';
}
// Hide Results
function hideResults() {
resultsSection.style.display = 'none';
}
// Toast Notifications
function showToast(message, type = 'info') {
const toastContainer = document.getElementById('toast-container');
const toast = document.createElement('div');
toast.className = `toast ${type}`;
const icon = type === 'success' ? 'fas fa-check-circle' :
type === 'error' ? 'fas fa-exclamation-circle' :
'fas fa-info-circle';
toast.innerHTML = `
<i class="${icon}"></i>
<span>${message}</span>
`;
toastContainer.appendChild(toast);
// Auto remove after 5 seconds
setTimeout(() => {
toast.remove();
}, 5000);
}
// Load Saved Requests
async function loadSavedRequests() {
try {
const response = await fetch(`${API_BASE_URL}/saved-requests`);
const result = await response.json();
if (response.ok) {
displaySavedRequests(result.requests);
} else {
throw new Error(result.detail || 'Failed to load saved requests');
}
} catch (error) {
console.error('Load saved requests error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Saved Requests
function displaySavedRequests(requests) {
const requestsList = document.getElementById('requests-list');
if (requests.length === 0) {
requestsList.innerHTML = '<p style="text-align: center; color: #CCCCCC; padding: 2rem;">No saved API requests yet. Make your first request from the Scrape page!</p>';
return;
}
requestsList.innerHTML = requests.map(request => {
const url = request.body.url;
const query = request.body.query;
const model = request.body.model_name || 'Default Model';
const endpoint = request.endpoint;
// Create curl command
const curlCommand = `curl -X POST http://localhost:8000${endpoint} \\
-H "Content-Type: application/json" \\
-d '{
"url": "${url}",
"query": "${query}",
"model_name": "${model}"
}'`;
return `
<div class="request-card">
<div class="request-header">
<div class="request-info">
<div class="request-url">${url}</div>
<div class="request-query">${query}</div>
</div>
<div class="request-actions">
<button class="btn-danger" onclick="deleteSavedRequest('${request.id}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
<div class="request-curl">
<h4>cURL Command:</h4>
<pre>${curlCommand}</pre>
</div>
</div>
`;
}).join('');
}
// Delete Saved Request
async function deleteSavedRequest(requestId) {
if (!confirm('Are you sure you want to delete this saved request?')) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/saved-requests/${requestId}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Saved request deleted successfully!', 'success');
loadSavedRequests();
} else {
throw new Error(result.detail || 'Failed to delete saved request');
}
} catch (error) {
console.error('Delete saved request error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Initialize
document.addEventListener('DOMContentLoaded', () => {
loadModelSelect();
// Check if API is available
fetch(`${API_BASE_URL}/health`)
.then(response => {
if (!response.ok) {
showToast('Warning: API server might not be running', 'error');
}
})
.catch(() => {
showToast('Warning: Cannot connect to API server. Make sure it\'s running on localhost:8000', 'error');
});
});

View File

@@ -0,0 +1,765 @@
/* Reset and Base Styles */
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #000000;
color: #FFFFFF;
line-height: 1.6;
font-size: 16px;
}
/* Header */
.header {
border-bottom: 1px solid #333;
padding: 1rem 0;
background: #000000;
position: sticky;
top: 0;
z-index: 100;
}
.header-content {
max-width: 1200px;
margin: 0 auto;
padding: 0 2rem;
display: flex;
justify-content: space-between;
align-items: center;
}
.logo {
display: flex;
align-items: center;
gap: 0.5rem;
font-size: 1.5rem;
font-weight: 600;
color: #FFFFFF;
}
.logo-image {
width: 40px;
height: 40px;
border-radius: 4px;
object-fit: contain;
}
.nav-links {
display: flex;
gap: 2rem;
}
.nav-link {
color: #CCCCCC;
text-decoration: none;
font-weight: 500;
transition: color 0.2s ease;
}
.nav-link:hover,
.nav-link.active {
color: #FFFFFF;
}
/* Main Content */
.main-content {
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
}
.page {
display: none;
}
.page.active {
display: block;
}
/* Hero Section */
.hero-section {
text-align: center;
margin-bottom: 4rem;
padding: 2rem 0;
}
.hero-title {
font-size: 3rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
line-height: 1.2;
}
.hero-subtitle {
font-size: 1.25rem;
color: #CCCCCC;
max-width: 600px;
margin: 0 auto;
}
/* Workflow Demo */
.workflow-demo {
display: grid;
grid-template-columns: 1fr auto 1fr;
gap: 2rem;
align-items: start;
margin-bottom: 4rem;
}
.workflow-step {
display: flex;
flex-direction: column;
gap: 1rem;
}
.step-title {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
text-align: center;
margin-bottom: 1rem;
}
.workflow-arrow {
font-size: 2rem;
font-weight: 700;
color: #09b5a5;
display: flex;
align-items: center;
justify-content: center;
margin-top: 20rem;
}
/* Request Box */
.request-box {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
background: #111111;
}
.input-group {
margin-bottom: 1.5rem;
}
.input-group label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.input-group input,
.input-group textarea,
.input-group select {
width: 100%;
padding: 0.75rem;
border: 1px solid #333;
border-radius: 4px;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
background: #1A1A1A;
color: #FFFFFF;
transition: border-color 0.2s ease;
}
.input-group input:focus,
.input-group textarea:focus,
.input-group select:focus {
outline: none;
border-color: #09b5a5;
}
.input-group textarea {
min-height: 80px;
resize: vertical;
}
.form-options {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
margin-bottom: 1.5rem;
}
.option-group {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.option-group label {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.option-group input[type="checkbox"] {
width: auto;
margin-right: 0.5rem;
}
.extract-btn {
width: 100%;
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.extract-btn:hover {
background: #09b5a5;
}
/* Dropdown specific styling */
select,
.input-group select,
.option-group select {
cursor: pointer !important;
appearance: none !important;
-webkit-appearance: none !important;
-moz-appearance: none !important;
-ms-appearance: none !important;
background-image: url("data:image/svg+xml;charset=UTF-8,%3csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 24 24' fill='none' stroke='%23FFFFFF' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3e%3cpolyline points='6,9 12,15 18,9'%3e%3c/polyline%3e%3c/svg%3e") !important;
background-repeat: no-repeat !important;
background-position: right 0.75rem center !important;
background-size: 1rem !important;
padding-right: 2.5rem !important;
border: 1px solid #333 !important;
border-radius: 4px !important;
font-family: 'Courier New', monospace !important;
font-size: 0.9rem !important;
background-color: #1A1A1A !important;
color: #FFFFFF !important;
}
select:hover,
.input-group select:hover,
.option-group select:hover {
border-color: #09b5a5 !important;
}
select:focus,
.input-group select:focus,
.option-group select:focus {
outline: none !important;
border-color: #09b5a5 !important;
}
select option,
.input-group select option,
.option-group select option {
background: #1A1A1A !important;
color: #FFFFFF !important;
padding: 0.5rem !important;
}
/* Response Container */
.response-container {
display: flex;
flex-direction: column;
gap: 1rem;
}
.api-request-box,
.json-response-box {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
}
.api-request-box label,
.json-response-box label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.api-request-box pre,
.json-response-box pre {
font-family: 'Courier New', monospace;
font-size: 0.85rem;
line-height: 1.5;
color: #FFFFFF;
background: #1A1A1A;
padding: 1rem;
border-radius: 4px;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
}
/* Results Section */
.results-section {
border: 2px solid #333;
border-radius: 8px;
overflow: hidden;
margin-top: 2rem;
background: #111111;
}
.results-header {
background: #1A1A1A;
color: #FFFFFF;
padding: 1rem 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid #333;
}
.results-header h2 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
}
.copy-btn {
background: #09b5a5;
color: #000000;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
display: flex;
align-items: center;
gap: 0.5rem;
transition: background-color 0.2s ease;
}
.copy-btn:hover {
background: #09b5a5;
}
.results-content {
padding: 1.5rem;
}
.result-info {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 1rem;
margin-bottom: 1.5rem;
padding: 1rem;
background: #1A1A1A;
border-radius: 4px;
border: 1px solid #333;
}
.info-item {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.info-item .label {
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.info-item .value {
color: #CCCCCC;
word-break: break-all;
}
.json-display {
background: #1A1A1A;
border-radius: 4px;
overflow: hidden;
border: 1px solid #333;
}
.json-display pre {
color: #FFFFFF;
padding: 1.5rem;
margin: 0;
overflow-x: auto;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
line-height: 1.5;
}
/* Loading State */
.loading {
text-align: center;
padding: 3rem;
}
.spinner {
width: 40px;
height: 40px;
border: 3px solid #333;
border-top: 3px solid #09b5a5;
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 0 auto 1rem;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
/* Models Page */
.models-header {
text-align: center;
margin-bottom: 3rem;
}
.models-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.models-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
/* API Requests Page */
.requests-header {
text-align: center;
margin-bottom: 3rem;
}
.requests-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.requests-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
.requests-container {
max-width: 1200px;
margin: 0 auto;
}
.requests-list {
display: grid;
gap: 1.5rem;
}
.request-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
transition: border-color 0.2s ease;
}
.request-card:hover {
border-color: #09b5a5;
}
.request-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding-bottom: 1rem;
border-bottom: 1px solid #333;
}
.request-info {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.request-url {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #09b5a5;
font-size: 1.1rem;
word-break: break-all;
}
.request-query {
color: #CCCCCC;
font-size: 0.9rem;
margin-top: 0.5rem;
word-break: break-all;
}
.request-actions {
display: flex;
gap: 0.5rem;
}
.request-curl {
background: #1A1A1A;
border: 1px solid #333;
border-radius: 4px;
padding: 1rem;
margin-top: 1rem;
}
.request-curl h4 {
color: #FFFFFF;
font-size: 0.9rem;
font-weight: 600;
margin-bottom: 0.5rem;
font-family: 'Courier New', monospace;
}
.request-curl pre {
color: #CCCCCC;
font-size: 0.8rem;
line-height: 1.4;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
background: #111111;
padding: 0.75rem;
border-radius: 4px;
border: 1px solid #333;
}
.models-container {
max-width: 800px;
margin: 0 auto;
}
.model-form-section {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
margin-bottom: 2rem;
background: #111111;
}
.model-form-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.model-form {
display: flex;
flex-direction: column;
gap: 1.5rem;
}
.form-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
}
.save-btn {
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.save-btn:hover {
background: #09b5a5;
}
.saved-models-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.models-list {
display: grid;
gap: 1rem;
}
.model-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
transition: border-color 0.2s ease;
background: #111111;
}
.model-card:hover {
border-color: #09b5a5;
}
.model-info {
flex: 1;
}
.model-name {
font-weight: 600;
color: #FFFFFF;
font-size: 1.1rem;
margin-bottom: 0.5rem;
}
.model-provider {
color: #CCCCCC;
font-size: 0.9rem;
}
.model-actions {
display: flex;
gap: 0.5rem;
}
.btn-danger {
background: #FF4444;
color: #FFFFFF;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
gap: 0.5rem;
}
.btn-danger:hover {
background: #CC3333;
}
/* Toast Notifications */
.toast-container {
position: fixed;
top: 20px;
right: 20px;
z-index: 1000;
}
.toast {
background: #111111;
border: 2px solid #333;
border-radius: 4px;
padding: 1rem 1.5rem;
margin-bottom: 0.5rem;
display: flex;
align-items: center;
gap: 0.5rem;
animation: slideIn 0.3s ease;
max-width: 400px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3);
color: #FFFFFF;
}
.toast.success {
border-color: #09b5a5;
background: #0A1A1A;
}
.toast.error {
border-color: #FF4444;
background: #1A0A0A;
}
.toast.info {
border-color: #09b5a5;
background: #0A1A1A;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
/* Responsive Design */
@media (max-width: 768px) {
.header-content {
padding: 0 1rem;
}
.main-content {
padding: 1rem;
}
.hero-title {
font-size: 2rem;
}
.workflow-demo {
grid-template-columns: 1fr;
gap: 1rem;
}
.workflow-arrow {
transform: rotate(90deg);
margin: 1rem 0;
}
.form-options {
grid-template-columns: 1fr;
}
.form-row {
grid-template-columns: 1fr;
}
.result-info {
grid-template-columns: 1fr;
}
.model-card {
flex-direction: column;
gap: 1rem;
text-align: center;
}
.model-actions {
width: 100%;
justify-content: center;
}
}

View File

@@ -0,0 +1,28 @@
import asyncio
from web_scraper_lib import scrape_website
import os
async def test_library():
"""Test the mini library directly."""
print("=== Testing Mini Library ===")
# Test 1: Scrape with a custom model
url = "https://marketplace.mainstreet.co.in/collections/adidas-yeezy/products/adidas-yeezy-boost-350-v2-yecheil-non-reflective"
query = "Extract the following data: Product name, Product price, Product description, Product size. DO NOT EXTRACT ANYTHING ELSE."
if os.path.exists("models"):
model_name = os.listdir("models")[0].split(".")[0]
else:
raise Exception("No models found in models directory")
print(f"Scraping: {url}")
print(f"Query: {query}")
try:
result = await scrape_website(url, query, model_name)
print("✅ Library test successful!")
print(f"Extracted data: {result['extracted_data']}")
except Exception as e:
print(f"❌ Library test failed: {e}")
if __name__ == "__main__":
asyncio.run(test_library())

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
Test script for the new model management functionality.
This script demonstrates how to save and use custom model configurations.
"""
import asyncio
import requests
import json
# API base URL
BASE_URL = "http://localhost:8000"
def test_model_management():
"""Test the model management endpoints."""
print("=== Testing Model Management ===")
# 1. List current models
print("\n1. Listing current models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 2. Save another model configuration (OpenAI example)
print("\n2. Saving OpenAI model configuration:")
openai_config = {
"model_name": "my-openai",
"provider": "openai",
"api_token": "your-openai-api-key-here"
}
response = requests.post(f"{BASE_URL}/models", json=openai_config)
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 3. List models again to see the new ones
print("\n3. Listing models after adding new ones:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 4. Delete a model configuration
print("\n4. Deleting a model configuration:")
response = requests.delete(f"{BASE_URL}/models/my-openai")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 5. Final list of models
print("\n5. Final list of models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
if __name__ == "__main__":
print("Model Management Test Script")
print("Make sure the API server is running on http://localhost:8000")
print("=" * 50)
try:
test_model_management()
except requests.exceptions.ConnectionError:
print("Error: Could not connect to the API server.")
print("Make sure the server is running with: python api_server.py")
except Exception as e:
print(f"Error: {e}")

View File

@@ -0,0 +1,397 @@
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CacheMode,
CrawlerRunConfig,
LLMConfig,
JsonCssExtractionStrategy,
LLMExtractionStrategy
)
import os
import json
import hashlib
from typing import Dict, Any, Optional, List
from litellm import completion
class ModelConfig:
"""Configuration for LLM models."""
def __init__(self, provider: str, api_token: str):
self.provider = provider
self.api_token = api_token
def to_dict(self) -> Dict[str, Any]:
return {
"provider": self.provider,
"api_token": self.api_token
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ModelConfig':
return cls(
provider=data["provider"],
api_token=data["api_token"]
)
class WebScraperAgent:
"""
A mini library that converts any website into a structured data API.
Features:
1. Provide a URL and tell AI what data you need in plain English
2. Generate: Agent reverse-engineers the site and deploys custom scraper
3. Integrate: Use private API endpoint to get structured data
4. Support for custom LLM models and API keys
"""
def __init__(self, schemas_dir: str = "schemas", models_dir: str = "models"):
self.schemas_dir = schemas_dir
self.models_dir = models_dir
os.makedirs(self.schemas_dir, exist_ok=True)
os.makedirs(self.models_dir, exist_ok=True)
def _generate_schema_key(self, url: str, query: str) -> str:
"""Generate a unique key for schema caching based on URL and query."""
content = f"{url}:{query}"
return hashlib.md5(content.encode()).hexdigest()
def save_model_config(self, model_name: str, provider: str, api_token: str) -> bool:
"""
Save a model configuration for later use.
Args:
model_name: User-friendly name for the model
provider: LLM provider (e.g., 'gemini', 'openai', 'anthropic')
api_token: API token for the provider
Returns:
True if saved successfully
"""
try:
model_config = ModelConfig(provider, api_token)
config_path = os.path.join(self.models_dir, f"{model_name}.json")
with open(config_path, "w") as f:
json.dump(model_config.to_dict(), f, indent=2)
print(f"Model configuration saved: {model_name}")
return True
except Exception as e:
print(f"Failed to save model configuration: {e}")
return False
def load_model_config(self, model_name: str) -> Optional[ModelConfig]:
"""
Load a saved model configuration.
Args:
model_name: Name of the saved model configuration
Returns:
ModelConfig object or None if not found
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if not os.path.exists(config_path):
return None
with open(config_path, "r") as f:
data = json.load(f)
return ModelConfig.from_dict(data)
except Exception as e:
print(f"Failed to load model configuration: {e}")
return None
def list_saved_models(self) -> List[str]:
"""List all saved model configurations."""
models = []
for filename in os.listdir(self.models_dir):
if filename.endswith('.json'):
models.append(filename[:-5]) # Remove .json extension
return models
def delete_model_config(self, model_name: str) -> bool:
"""
Delete a saved model configuration.
Args:
model_name: Name of the model configuration to delete
Returns:
True if deleted successfully
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if os.path.exists(config_path):
os.remove(config_path)
print(f"Model configuration deleted: {model_name}")
return True
return False
except Exception as e:
print(f"Failed to delete model configuration: {e}")
return False
async def _load_or_generate_schema(self, url: str, query: str, session_id: str = "schema_generator", model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Loads schema from cache if exists, otherwise generates using AI.
This is the "Generate" step - our agent reverse-engineers the site.
Args:
url: URL to scrape
query: Query for data extraction
session_id: Session identifier
model_name: Name of saved model configuration to use
"""
schema_key = self._generate_schema_key(url, query)
schema_path = os.path.join(self.schemas_dir, f"{schema_key}.json")
if os.path.exists(schema_path):
print(f"Schema found in cache for {url}")
with open(schema_path, "r") as f:
return json.load(f)
print(f"Generating new schema for {url}")
print(f"Query: {query}")
query += """
IMPORTANT:
GENERATE THE SCHEMA WITH ONLY THE FIELDS MENTIONED IN THE QUERY. MAKE SURE THE NUMBER OF FIELDS IN THE SCHEME MATCH THE NUMBER OF FIELDS IN THE QUERY.
"""
# Step 1: Fetch the page HTML
async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
session_id=session_id,
simulate_user=True,
remove_overlay_elements=True,
delay_before_return_html=5,
)
)
html = result.fit_html
# Step 2: Generate schema using AI with custom model if specified
print("AI is analyzing the page structure...")
# Use custom model configuration if provided
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
schema = JsonCssExtractionStrategy.generate_schema(
html=html,
llm_config=llm_config,
query=query
)
# Step 3: Cache the generated schema
print(f"Schema generated and cached: {json.dumps(schema, indent=2)}")
with open(schema_path, "w") as f:
json.dump(schema, f, indent=2)
return schema
def _generate_llm_schema(self, query: str, llm_config: LLMConfig) -> Dict[str, Any]:
"""
Generate a schema for a given query using a custom LLM model.
Args:
query: Plain English description of what data to extract
model_config: Model configuration to use
"""
# ask the model to generate a schema for the given query in the form of a json.
prompt = f"""
IDENTIFY THE FIELDS FOR EXTRACTION MENTIONED IN THE QUERY and GENERATE A JSON SCHEMA FOR THE FIELDS.
eg.
{{
"name": "str",
"age": "str",
"email": "str",
"product_name": "str",
"product_price": "str",
"product_description": "str",
"product_image": "str",
"product_url": "str",
"product_rating": "str",
"product_reviews": "str",
}}
Here is the query:
{query}
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
response = completion(
model=llm_config.provider,
messages=[{"role": "user", "content": prompt}],
api_key=llm_config.api_token,
result_type="json"
)
return response.json()["choices"][0]["message"]["content"]
async def scrape_data_with_llm(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
query += """\n
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT WITH THE ONLY THE FIELDS MENTIONED IN THE QUERY.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
schema = self._generate_llm_schema(query, llm_config)
print(f"Schema: {schema}")
llm_extraction_strategy = LLMExtractionStrategy(
llm_config=llm_config,
instruction=query,
result_type="json",
schema=schema
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
simulate_user=True,
extraction_strategy=llm_extraction_strategy,
)
)
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def scrape_data(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Main method to scrape structured data from any website.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Structured data extracted from the website
"""
# Step 1: Generate or load schema (reverse-engineer the site)
schema = await self._load_or_generate_schema(url=url, query=query, model_name=model_name)
# Step 2: Deploy custom high-speed scraper
print(f"Deploying custom scraper for {url}")
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
run_config = CrawlerRunConfig(
extraction_strategy=JsonCssExtractionStrategy(schema=schema),
)
result = await crawler.arun(url=url, config=run_config)
# Step 3: Return structured data
# Parse extracted_content if it's a JSON string
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"schema_used": schema,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def get_cached_schemas(self) -> Dict[str, str]:
"""Get list of cached schemas."""
schemas = {}
for filename in os.listdir(self.schemas_dir):
if filename.endswith('.json'):
schema_key = filename[:-5] # Remove .json extension
schemas[schema_key] = filename
return schemas
def clear_cache(self):
"""Clear all cached schemas."""
import shutil
if os.path.exists(self.schemas_dir):
shutil.rmtree(self.schemas_dir)
os.makedirs(self.schemas_dir, exist_ok=True)
print("Schema cache cleared")
# Convenience function for simple usage
async def scrape_website(url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Simple function to scrape any website with plain English instructions.
Args:
url: Website URL
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Extracted structured data
"""
agent = WebScraperAgent()
return await agent.scrape_data(url, query, model_name)
async def scrape_website_with_llm(url: str, query: str, model_name: Optional[str] = None):
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
agent = WebScraperAgent()
return await agent.scrape_data_with_llm(url, query, model_name)

View File

@@ -126,30 +126,6 @@ Factors:
- URL depth (fewer slashes = higher authority)
- Clean URL structure
### Custom Link Scoring
```python
class CustomLinkScorer:
def score(self, link: Link, query: str, state: CrawlState) -> float:
# Prioritize specific URL patterns
if "/api/reference/" in link.href:
return 2.0 # Double the score
# Deprioritize certain sections
if "/archive/" in link.href:
return 0.1 # Reduce score by 90%
# Default scoring
return 1.0
# Use with adaptive crawler
adaptive = AdaptiveCrawler(
crawler,
config=config,
link_scorer=CustomLinkScorer()
)
```
## Domain-Specific Configurations
### Technical Documentation
@@ -230,8 +206,12 @@ config = AdaptiveConfig(
# Periodically clean state
if len(state.knowledge_base) > 1000:
# Keep only most relevant
state.knowledge_base = get_top_relevant(state.knowledge_base, 500)
# Keep only the top 500 most relevant docs
top_content = adaptive.get_relevant_content(top_k=500)
keep_indices = {d["index"] for d in top_content}
state.knowledge_base = [
doc for i, doc in enumerate(state.knowledge_base) if i in keep_indices
]
```
### Parallel Processing
@@ -252,18 +232,6 @@ tasks = [
results = await asyncio.gather(*tasks)
```
### Caching Strategy
```python
# Enable caching for repeated crawls
async with AsyncWebCrawler(
config=BrowserConfig(
cache_mode=CacheMode.ENABLED
)
) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
```
## Debugging & Analysis
### Enable Verbose Logging
@@ -322,9 +290,9 @@ with open("crawl_analysis.json", "w") as f:
### Implementing a Custom Strategy
```python
from crawl4ai.adaptive_crawler import BaseStrategy
from crawl4ai.adaptive_crawler import CrawlStrategy
class DomainSpecificStrategy(BaseStrategy):
class DomainSpecificStrategy(CrawlStrategy):
def calculate_coverage(self, state: CrawlState) -> float:
# Custom coverage calculation
# e.g., weight certain terms more heavily
@@ -351,7 +319,7 @@ adaptive = AdaptiveCrawler(
### Combining Strategies
```python
class HybridStrategy(BaseStrategy):
class HybridStrategy(CrawlStrategy):
def __init__(self):
self.strategies = [
TechnicalDocStrategy(),

View File

@@ -7,13 +7,13 @@ Simple proxy configuration with `BrowserConfig`:
```python
from crawl4ai.async_configs import BrowserConfig
# Using proxy URL
browser_config = BrowserConfig(proxy="http://proxy.example.com:8080")
# Using HTTP proxy
browser_config = BrowserConfig(proxy_config={"server": "http://proxy.example.com:8080"})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
# Using SOCKS proxy
browser_config = BrowserConfig(proxy="socks5://proxy.example.com:1080")
browser_config = BrowserConfig(proxy_config={"server": "socks5://proxy.example.com:1080"})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```
@@ -25,7 +25,11 @@ Use an authenticated proxy with `BrowserConfig`:
```python
from crawl4ai.async_configs import BrowserConfig
browser_config = BrowserConfig(proxy="http://[username]:[password]@[host]:[port]")
browser_config = BrowserConfig(proxy_config={
"server": "http://[host]:[port]",
"username": "[username]",
"password": "[password]",
})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```

View File

@@ -23,7 +23,7 @@ browser_cfg = BrowserConfig(
| **`headless`** | `bool` (default: `True`) | Headless means no visible UI. `False` is handy for debugging. |
| **`viewport_width`** | `int` (default: `1080`) | Initial page width (in px). Useful for testing responsive layouts. |
| **`viewport_height`** | `int` (default: `600`) | Initial page height (in px). |
| **`proxy`** | `str` (default: `None`) | Single-proxy URL if you want all traffic to go through it, e.g. `"http://user:pass@proxy:8080"`. |
| **`proxy`** | `str` (deprecated) | Deprecated. Use `proxy_config` instead. If set, it will be auto-converted internally. |
| **`proxy_config`** | `dict` (default: `None`) | For advanced or multi-proxy needs, specify details like `{"server": "...", "username": "...", ...}`. |
| **`use_persistent_context`** | `bool` (default: `False`) | If `True`, uses a **persistent** browser context (keep cookies, sessions across runs). Also sets `use_managed_browser=True`. |
| **`user_data_dir`** | `str or None` (default: `None`) | Directory to store user data (profiles, cookies). Must be set if you want permanent sessions. |
@@ -155,6 +155,7 @@ If your page is a single-page app with repeated JS updates, set `js_only=True` i
| **`exclude_external_links`** | `bool` (False) | Removes all links pointing outside the current domain. |
| **`exclude_social_media_links`** | `bool` (False) | Strips links specifically to social sites (like Facebook or Twitter). |
| **`exclude_domains`** | `list` ([]) | Provide a custom list of domains to exclude (like `["ads.com", "trackers.io"]`). |
| **`preserve_https_for_internal_links`** | `bool` (False) | If `True`, preserves HTTPS scheme for internal links even when the server redirects to HTTP. Useful for security-conscious crawling. |
Use these for link-level content filtering (often to keep crawls “internal” or to remove spammy domains).

View File

@@ -108,7 +108,19 @@ config = AdaptiveConfig(
embedding_min_confidence_threshold=0.1 # Stop if completely irrelevant
)
# With custom embedding provider (e.g., OpenAI)
# With custom LLM provider for query expansion (recommended)
from crawl4ai import LLMConfig
config = AdaptiveConfig(
strategy="embedding",
embedding_llm_config=LLMConfig(
provider='openai/text-embedding-3-small',
api_token='your-api-key',
temperature=0.7
)
)
# Alternative: Dictionary format (backward compatible)
config = AdaptiveConfig(
strategy="embedding",
embedding_llm_config={

View File

@@ -472,6 +472,17 @@ Note that for BestFirstCrawlingStrategy, score_threshold is not needed since pag
5.**Balance breadth vs. depth.** Choose your strategy wisely - BFS for comprehensive coverage, DFS for deep exploration, BestFirst for focused relevance-based crawling.
6.**Preserve HTTPS for security.** If crawling HTTPS sites that redirect to HTTP, use `preserve_https_for_internal_links=True` to maintain secure connections:
```python
config = CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(max_depth=2),
preserve_https_for_internal_links=True # Keep HTTPS even if server redirects to HTTP
)
```
This is especially useful for security-conscious crawling or when dealing with sites that support both protocols.
---
## 10. Summary & Next Steps

View File

@@ -89,6 +89,16 @@ ANTHROPIC_API_KEY=your-anthropic-key
# TOGETHER_API_KEY=your-together-key
# MISTRAL_API_KEY=your-mistral-key
# GEMINI_API_TOKEN=your-gemini-token
# Optional: Global LLM settings
# LLM_PROVIDER=openai/gpt-4o-mini
# LLM_TEMPERATURE=0.7
# LLM_BASE_URL=https://api.custom.com/v1
# Optional: Provider-specific overrides
# OPENAI_TEMPERATURE=0.5
# OPENAI_BASE_URL=https://custom-openai.com/v1
# ANTHROPIC_TEMPERATURE=0.3
EOL
```
> 🔑 **Note**: Keep your API keys secure! Never commit `.llm.env` to version control.
@@ -156,27 +166,43 @@ cp deploy/docker/.llm.env.example .llm.env
**Flexible LLM Provider Configuration:**
The Docker setup now supports flexible LLM provider configuration through three methods:
The Docker setup now supports flexible LLM provider configuration through a hierarchical system:
1. **Environment Variable** (Highest Priority): Set `LLM_PROVIDER` to override the default
```bash
export LLM_PROVIDER="anthropic/claude-3-opus"
# Or in your .llm.env file:
# LLM_PROVIDER=anthropic/claude-3-opus
```
2. **API Request Parameter**: Specify provider per request
1. **API Request Parameters** (Highest Priority): Specify per request
```json
{
"url": "https://example.com",
"f": "llm",
"provider": "groq/mixtral-8x7b"
"provider": "groq/mixtral-8x7b",
"temperature": 0.7,
"base_url": "https://api.custom.com/v1"
}
```
3. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
2. **Provider-Specific Environment Variables**: Override for specific providers
```bash
# In your .llm.env file:
OPENAI_TEMPERATURE=0.5
OPENAI_BASE_URL=https://custom-openai.com/v1
ANTHROPIC_TEMPERATURE=0.3
```
The system automatically selects the appropriate API key based on the configured `api_key_env` in the config file.
3. **Global Environment Variables**: Set defaults for all providers
```bash
# In your .llm.env file:
LLM_PROVIDER=anthropic/claude-3-opus
LLM_TEMPERATURE=0.7
LLM_BASE_URL=https://api.proxy.com/v1
```
4. **Config File Default**: Falls back to `config.yml` (default: `openai/gpt-4o-mini`)
The system automatically selects the appropriate API key based on the provider. LiteLLM handles finding the correct environment variable for each provider (e.g., OPENAI_API_KEY for OpenAI, GEMINI_API_TOKEN for Google Gemini, etc.).
**Supported LLM Parameters:**
- `provider`: LLM provider and model (e.g., "openai/gpt-4", "anthropic/claude-3-opus")
- `temperature`: Controls randomness (0.0-2.0, lower = more focused, higher = more creative)
- `base_url`: Custom API endpoint for proxy servers or alternative endpoints
#### 3. Build and Run with Compose
@@ -555,6 +581,101 @@ Crucially, when sending configurations directly via JSON, they **must** follow t
**LLM Extraction Strategy** *(Keep example, ensure schema uses type/value wrapper)*
*(Keep Deep Crawler Example)*
### LLM Configuration Examples
The Docker API supports dynamic LLM configuration through multiple levels:
#### Temperature Control
Temperature affects the randomness of LLM responses (0.0 = deterministic, 2.0 = very creative):
```python
import requests
# Low temperature for factual extraction
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Extract all dates and numbers from this page",
"temperature": 0.2 # Very focused, deterministic
}
)
# High temperature for creative tasks
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Write a creative summary of this content",
"temperature": 1.2 # More creative, varied responses
}
)
```
#### Custom API Endpoints
Use custom base URLs for proxy servers or alternative API endpoints:
```python
# Using a local LLM server
response = requests.post(
"http://localhost:11235/md",
json={
"url": "https://example.com",
"f": "llm",
"q": "Extract key information",
"provider": "ollama/llama2",
"base_url": "http://localhost:11434/v1"
}
)
```
#### Dynamic Provider Selection
Switch between providers based on task requirements:
```python
async def smart_extraction(url: str, content_type: str):
"""Select provider and temperature based on content type"""
configs = {
"technical": {
"provider": "openai/gpt-4",
"temperature": 0.3,
"query": "Extract technical specifications and code examples"
},
"creative": {
"provider": "anthropic/claude-3-opus",
"temperature": 0.9,
"query": "Create an engaging narrative summary"
},
"quick": {
"provider": "groq/mixtral-8x7b",
"temperature": 0.5,
"query": "Quick summary in bullet points"
}
}
config = configs.get(content_type, configs["quick"])
response = await httpx.post(
"http://localhost:11235/md",
json={
"url": url,
"f": "llm",
"q": config["query"],
"provider": config["provider"],
"temperature": config["temperature"]
}
)
return response.json()
```
### REST API Examples
Update URLs to use port `11235`.
@@ -693,8 +814,8 @@ app:
# Default LLM Configuration
llm:
provider: "openai/gpt-4o-mini" # Can be overridden by LLM_PROVIDER env var
api_key_env: "OPENAI_API_KEY"
# api_key: sk-... # If you pass the API key directly then api_key_env will be ignored
# api_key: sk-... # If you pass the API key directly (not recommended)
# temperature and base_url are controlled via environment variables or request parameters
# Redis Configuration (Used by internal Redis server managed by supervisord)
redis:

View File

@@ -79,7 +79,7 @@ if __name__ == "__main__":
asyncio.run(main())
```
> IMPORTANT: By default cache mode is set to `CacheMode.ENABLED`. So to have fresh content, you need to set it to `CacheMode.BYPASS`
> IMPORTANT: By default cache mode is set to `CacheMode.BYPASS` to have fresh content. Set `CacheMode.ENABLED` to enable caching.
Well explore more advanced config in later tutorials (like enabling proxies, PDF output, multi-tab sessions, etc.). For now, just note how you pass these objects to manage crawling.

View File

@@ -1,242 +0,0 @@
# Telemetry
Crawl4AI includes **opt-in telemetry** to help improve stability by capturing anonymous crash reports. No personal data or crawled content is ever collected.
!!! info "Privacy First"
Telemetry is completely optional and respects your privacy. Only exception information is collected - no URLs, no personal data, no crawled content.
## Overview
- **Privacy-first**: Only exceptions and crashes are reported
- **Opt-in by default**: You control when telemetry is enabled (except in Docker where it's on by default)
- **No PII**: No URLs, request data, or personal information is collected
- **Provider-agnostic**: Currently uses Sentry, but designed to support multiple backends
## Installation
Telemetry requires the optional Sentry SDK:
```bash
# Install with telemetry support
pip install crawl4ai[telemetry]
# Or install Sentry SDK separately
pip install sentry-sdk>=2.0.0
```
## Environments
### 1. Python Library & CLI
On first exception, you'll see an interactive prompt:
```
🚨 Crawl4AI Error Detection
==============================================================
We noticed an error occurred. Help improve Crawl4AI by
sending anonymous crash reports?
[1] Yes, send this error only
[2] Yes, always send errors
[3] No, don't send
Your choice (1/2/3):
```
Control via CLI:
```bash
# Enable telemetry
crwl telemetry enable
crwl telemetry enable --email you@example.com
# Disable telemetry
crwl telemetry disable
# Check status
crwl telemetry status
```
### 2. Docker / API Server
!!! warning "Default Enabled in Docker"
Telemetry is **enabled by default** in Docker environments to help identify container-specific issues. This is different from the CLI where it's opt-in.
To disable:
```bash
# Via environment variable
docker run -e CRAWL4AI_TELEMETRY=0 ...
# In docker-compose.yml
environment:
- CRAWL4AI_TELEMETRY=0
```
### 3. Jupyter / Google Colab
In notebooks, you'll see an interactive widget (if available) or a code snippet:
```python
import crawl4ai
# Enable telemetry
crawl4ai.telemetry.enable(email="you@example.com", always=True)
# Send only next error
crawl4ai.telemetry.enable(once=True)
# Disable telemetry
crawl4ai.telemetry.disable()
# Check status
crawl4ai.telemetry.status()
```
## Python API
### Basic Usage
```python
from crawl4ai import telemetry
# Enable/disable telemetry
telemetry.enable(email="optional@email.com", always=True)
telemetry.disable()
# Check current status
status = telemetry.status()
print(f"Telemetry enabled: {status['enabled']}")
print(f"Consent: {status['consent']}")
```
### Manual Exception Capture
```python
from crawl4ai.telemetry import capture_exception
try:
# Your code here
risky_operation()
except Exception as e:
# Manually capture exception with context
capture_exception(e, {
'operation': 'custom_crawler',
'url': 'https://example.com' # Will be sanitized
})
raise
```
### Decorator Pattern
```python
from crawl4ai.telemetry import telemetry_decorator
@telemetry_decorator
def my_crawler_function():
# Exceptions will be automatically captured
pass
```
### Context Manager
```python
from crawl4ai.telemetry import telemetry_context
with telemetry_context("data_extraction"):
# Any exceptions in this block will be captured
result = extract_data(html)
```
## Configuration
Settings are stored in `~/.crawl4ai/config.json`:
```json
{
"telemetry": {
"consent": "always",
"email": "user@example.com"
}
}
```
Consent levels:
- `"not_set"` - No decision made yet
- `"denied"` - Telemetry disabled
- `"once"` - Send current error only
- `"always"` - Always send errors
## Environment Variables
- `CRAWL4AI_TELEMETRY=0` - Disable telemetry (overrides config)
- `CRAWL4AI_TELEMETRY_EMAIL=email@example.com` - Set email for follow-up
- `CRAWL4AI_SENTRY_DSN=https://...` - Override default DSN (for maintainers)
## What's Collected
### Collected ✅
- Exception type and traceback
- Crawl4AI version
- Python version
- Operating system
- Environment type (CLI, Docker, Jupyter)
- Optional email (if provided)
### NOT Collected ❌
- URLs being crawled
- HTML content
- Request/response data
- Cookies or authentication tokens
- IP addresses
- Any personally identifiable information
## Provider Architecture
Telemetry is designed to be provider-agnostic:
```python
from crawl4ai.telemetry.base import TelemetryProvider
class CustomProvider(TelemetryProvider):
def send_exception(self, exc, context=None):
# Your implementation
pass
```
## FAQ
### Q: Can I completely disable telemetry?
A: Yes! Use `crwl telemetry disable` or set `CRAWL4AI_TELEMETRY=0`
### Q: Is telemetry required?
A: No, it's completely optional (except enabled by default in Docker)
### Q: What if I don't install sentry-sdk?
A: Telemetry will gracefully degrade to a no-op state
### Q: Can I see what's being sent?
A: Yes, check the source code in `crawl4ai/telemetry/`
### Q: How do I remove my email?
A: Delete `~/.crawl4ai/config.json` or edit it to remove the email field
## Privacy Commitment
1. **Transparency**: All telemetry code is open source
2. **Control**: You can enable/disable at any time
3. **Minimal**: Only crash data, no user content
4. **Secure**: Data transmitted over HTTPS to Sentry
5. **Anonymous**: No tracking or user identification
## Contributing
Help improve telemetry:
- Report issues with telemetry itself
- Suggest privacy improvements
- Add new provider backends
## Support
If you have concerns about telemetry:
- Open an issue on GitHub
- Email the maintainers
- Review the code in `crawl4ai/telemetry/`

View File

@@ -102,16 +102,16 @@ async def smart_blog_crawler():
# Step 2: Configure discovery - let's find all blog posts
config = SeedingConfig(
source="sitemap", # Use the website's sitemap
pattern="*/blog/*.html", # Only blog posts
source="sitemap+cc", # Use the website's sitemap+cc
pattern="*/courses/*", # Only courses related posts
extract_head=True, # Get page metadata
max_urls=100 # Limit for this example
)
# Step 3: Discover URLs from the Python blog
print("🔍 Discovering blog posts...")
print("🔍 Discovering course posts...")
urls = await seeder.urls("realpython.com", config)
print(f"✅ Found {len(urls)} blog posts")
print(f"✅ Found {len(urls)} course posts")
# Step 4: Filter for Python tutorials (using metadata!)
tutorials = [
@@ -134,7 +134,8 @@ async def smart_blog_crawler():
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
only_text=True,
word_count_threshold=300 # Only substantial articles
word_count_threshold=300, # Only substantial articles
stream=True
)
# Extract URLs and crawl them
@@ -155,7 +156,7 @@ asyncio.run(smart_blog_crawler())
**What just happened?**
1. We discovered all blog URLs from the sitemap
1. We discovered all blog URLs from the sitemap+cc
2. We filtered using metadata (no crawling needed!)
3. We crawled only the relevant tutorials
4. We saved tons of time and bandwidth
@@ -282,8 +283,8 @@ config = SeedingConfig(
live_check=True, # Verify each URL is accessible
concurrency=20 # Check 20 URLs in parallel
)
urls = await seeder.urls("example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
# Now you can filter by status
live_urls = [u for u in urls if u["status"] == "valid"]
@@ -311,8 +312,8 @@ This is where URL seeding gets really powerful. Instead of crawling entire pages
config = SeedingConfig(
extract_head=True # Extract metadata from <head> section
)
urls = await seeder.urls("example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
# Now each URL has rich metadata
for url in urls[:3]:
@@ -387,8 +388,8 @@ config = SeedingConfig(
scoring_method="bm25",
score_threshold=0.3
)
urls = await seeder.urls("example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("example.com", config)
# URLs are scored based on:
# 1. Domain parts matching (e.g., 'python' in python.example.com)
@@ -429,8 +430,8 @@ config = SeedingConfig(
extract_head=True,
live_check=True
)
urls = await seeder.urls("blog.example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("blog.example.com", config)
# Analyze the results
for url in urls[:5]:
@@ -488,8 +489,8 @@ config = SeedingConfig(
scoring_method="bm25", # Use BM25 algorithm
score_threshold=0.3 # Minimum relevance score
)
urls = await seeder.urls("realpython.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("realpython.com", config)
# Results are automatically sorted by relevance!
for url in urls[:5]:
@@ -511,8 +512,8 @@ config = SeedingConfig(
score_threshold=0.5,
max_urls=20
)
urls = await seeder.urls("docs.example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("docs.example.com", config)
# The highest scoring URLs will be API docs!
```
@@ -529,8 +530,8 @@ config = SeedingConfig(
score_threshold=0.4,
pattern="*/product/*" # Combine with pattern matching
)
urls = await seeder.urls("shop.example.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("shop.example.com", config)
# Filter further by price (from metadata)
affordable = [
@@ -550,8 +551,8 @@ config = SeedingConfig(
scoring_method="bm25",
score_threshold=0.35
)
urls = await seeder.urls("technews.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("technews.com", config)
# Filter by date
from datetime import datetime, timedelta
@@ -591,8 +592,8 @@ for query in queries:
score_threshold=0.4,
max_urls=10 # Top 10 per topic
)
urls = await seeder.urls("learning-platform.com", config)
async with AsyncUrlSeeder() as seeder:
urls = await seeder.urls("learning-platform.com", config)
all_tutorials.extend(urls)
# Remove duplicates while preserving order
@@ -625,7 +626,8 @@ config = SeedingConfig(
)
# Returns a dictionary: {domain: [urls]}
results = await seeder.many_urls(domains, config)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(domains, config)
# Process results
for domain, urls in results.items():
@@ -654,8 +656,8 @@ config = SeedingConfig(
pattern="*/blog/*",
max_urls=100
)
results = await seeder.many_urls(competitors, config)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(competitors, config)
# Analyze content types
for domain, urls in results.items():
@@ -690,8 +692,8 @@ config = SeedingConfig(
score_threshold=0.3,
max_urls=20 # Per site
)
results = await seeder.many_urls(educational_sites, config)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(educational_sites, config)
# Find the best beginner tutorials
all_tutorials = []
@@ -731,8 +733,8 @@ config = SeedingConfig(
score_threshold=0.5, # High threshold for relevance
max_urls=10
)
results = await seeder.many_urls(news_sites, config)
async with AsyncUrlSeeder() as seeder:
results = await seeder.many_urls(news_sites, config)
# Collect all mentions
mentions = []

View File

@@ -35,7 +35,6 @@ nav:
- "Page Interaction": "core/page-interaction.md"
- "Content Selection": "core/content-selection.md"
- "Cache Modes": "core/cache-modes.md"
- "Telemetry": "core/telemetry.md"
- "Local Files & Raw HTML": "core/local-files.md"
- "Link & Media": "core/link-media.md"
- Advanced:

View File

@@ -7,7 +7,7 @@ name = "Crawl4AI"
dynamic = ["version"]
description = "🚀🤖 Crawl4AI: Open-source LLM Friendly Web Crawler & scraper"
readme = "README.md"
requires-python = ">=3.9"
requires-python = ">=3.10"
license = "Apache-2.0"
authors = [
{name = "Unclecode", email = "unclecode@kidocode.com"}
@@ -36,6 +36,7 @@ dependencies = [
"PyYAML>=6.0",
"nltk>=3.9.1",
"rich>=13.9.4",
"cssselect>=1.2.0",
"httpx>=0.27.2",
"httpx[http2]>=0.27.2",
"fake-useragent>=2.0.3",
@@ -51,7 +52,6 @@ classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
@@ -64,7 +64,6 @@ torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers", "sentence-transformers"]
cosine = ["torch", "transformers", "nltk", "sentence-transformers"]
sync = ["selenium"]
telemetry = ["sentry-sdk>=2.0.0", "ipywidgets>=8.0.0"]
all = [
"PyPDF2",
"torch",
@@ -73,9 +72,7 @@ all = [
"transformers",
"tokenizers",
"sentence-transformers",
"selenium",
"sentry-sdk>=2.0.0",
"ipywidgets>=8.0.0"
"selenium"
]
[project.scripts]

View File

@@ -1,16 +0,0 @@
[pytest]
testpaths = tests
python_paths = .
addopts = --maxfail=1 --disable-warnings -q --tb=short -v
asyncio_mode = auto
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
privacy: marks tests related to privacy compliance
performance: marks tests related to performance
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
env =
CRAWL4AI_TEST_MODE=1

View File

@@ -24,6 +24,7 @@ psutil>=6.1.1
PyYAML>=6.0
nltk>=3.9.1
rich>=13.9.4
cssselect>=1.2.0
chardet>=5.2.0
brotli>=1.1.0
httpx[http2]>=0.27.2

View File

@@ -56,11 +56,10 @@ setup(
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
],
python_requires=">=3.9",
python_requires=">=3.10",
)

View File

@@ -0,0 +1,154 @@
import asyncio
import os
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig, LLMConfig
async def test_configuration(name: str, config: AdaptiveConfig, url: str, query: str):
"""Test a specific configuration"""
print(f"\n{'='*60}")
print(f"Configuration: {name}")
print(f"{'='*60}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
result = await adaptive.digest(start_url=url, query=query)
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
print(f"\n{'='*50}")
print(f"Pages crawled: {len(result.crawled_urls)}")
print(f"Final confidence: {adaptive.confidence:.1%}")
print(f"Stopped reason: {result.metrics.get('stopped_reason', 'max_pages')}")
if result.metrics.get('is_irrelevant', False):
print("⚠️ Query detected as irrelevant!")
return result
async def llm_embedding():
"""Demonstrate various embedding configurations"""
print("EMBEDDING STRATEGY CONFIGURATION EXAMPLES")
print("=" * 60)
# Base URL and query for testing
test_url = "https://docs.python.org/3/library/asyncio.html"
openai_llm_config = LLMConfig(
provider='openai/text-embedding-3-small',
api_token=os.getenv('OPENAI_API_KEY'),
temperature=0.7,
max_tokens=2000
)
config_openai = AdaptiveConfig(
strategy="embedding",
max_pages=10,
# Use OpenAI embeddings
embedding_llm_config=openai_llm_config,
# embedding_llm_config={
# 'provider': 'openai/text-embedding-3-small',
# 'api_token': os.getenv('OPENAI_API_KEY')
# },
# OpenAI embeddings are high quality, can be stricter
embedding_k_exp=4.0,
n_query_variations=12
)
await test_configuration(
"OpenAI Embeddings",
config_openai,
test_url,
# "event-driven architecture patterns"
"async await context managers coroutines"
)
return
async def basic_adaptive_crawling():
"""Basic adaptive crawling example"""
# Initialize the crawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Create an adaptive crawler with default settings (statistical strategy)
adaptive = AdaptiveCrawler(crawler)
# Note: You can also use embedding strategy for semantic understanding:
# from crawl4ai import AdaptiveConfig
# config = AdaptiveConfig(strategy="embedding")
# adaptive = AdaptiveCrawler(crawler, config)
# Start adaptive crawling
print("Starting adaptive crawl for Python async programming information...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/library/asyncio.html",
query="async await context managers coroutines"
)
# Display crawl statistics
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
# Show final confidence
print(f"\n{'='*50}")
print(f"Final Confidence: {adaptive.confidence:.2%}")
print(f"Total Pages Crawled: {len(result.crawled_urls)}")
print(f"Knowledge Base Size: {len(adaptive.state.knowledge_base)} documents")
if adaptive.confidence >= 0.8:
print("✓ High confidence - can answer detailed questions about async Python")
elif adaptive.confidence >= 0.6:
print("~ Moderate confidence - can answer basic questions")
else:
print("✗ Low confidence - need more information")
if __name__ == "__main__":
asyncio.run(llm_embedding())
# asyncio.run(basic_adaptive_crawling())

View File

@@ -112,7 +112,7 @@ async def test_proxy_settings():
headless=True,
verbose=False,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
proxy="http://127.0.0.1:8080", # Assuming local proxy server for test
proxy_config={"server": "http://127.0.0.1:8080"}, # Assuming local proxy server for test
use_managed_browser=False,
use_persistent_context=False,
) as crawler:

View File

@@ -1,151 +0,0 @@
"""
Shared pytest fixtures for Crawl4AI tests.
"""
import pytest
import tempfile
import os
from pathlib import Path
from unittest.mock import Mock, patch
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment
@pytest.fixture
def temp_config_dir():
"""Provide a temporary directory for telemetry config testing."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def mock_telemetry_config(temp_config_dir):
"""Provide a mocked telemetry config for testing."""
config = TelemetryConfig(config_dir=temp_config_dir)
yield config
@pytest.fixture
def clean_environment():
"""Clean environment variables before and after test."""
# Store original environment
original_env = os.environ.copy()
# Clean telemetry-related env vars
telemetry_vars = [
'CRAWL4AI_TELEMETRY',
'CRAWL4AI_DOCKER',
'CRAWL4AI_API_SERVER',
'CRAWL4AI_TEST_MODE'
]
for var in telemetry_vars:
if var in os.environ:
del os.environ[var]
# Set test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
yield
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
@pytest.fixture
def mock_sentry_provider():
"""Provide a mocked Sentry provider for testing."""
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as mock:
provider_instance = Mock()
provider_instance.initialize.return_value = True
provider_instance.send_exception.return_value = True
provider_instance.is_initialized = True
mock.return_value = provider_instance
yield provider_instance
@pytest.fixture
def enabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry enabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.ALWAYS
config.is_enabled.return_value = True
config.should_send_current.return_value = True
config.get_email.return_value = "test@example.com"
config.update_from_env.return_value = None
yield config
@pytest.fixture
def disabled_telemetry_config(temp_config_dir): # noqa: F811
"""Provide a telemetry config with telemetry disabled."""
config = Mock()
config.get_consent.return_value = TelemetryConsent.DENIED
config.is_enabled.return_value = False
config.should_send_current.return_value = False
config.update_from_env.return_value = None
yield config
@pytest.fixture
def docker_environment():
"""Mock Docker environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
yield
@pytest.fixture
def cli_environment():
"""Mock CLI environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.CLI):
with patch('sys.stdin.isatty', return_value=True):
yield
@pytest.fixture
def jupyter_environment():
"""Mock Jupyter environment detection."""
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.JUPYTER):
yield
@pytest.fixture(autouse=True)
def reset_telemetry_singleton():
"""Reset telemetry singleton between tests."""
from crawl4ai.telemetry import TelemetryManager
# Reset the singleton instance
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
yield
# Clean up after test
if hasattr(TelemetryManager, '_instance'):
TelemetryManager._instance = None # noqa: SLF001
@pytest.fixture
def sample_exception():
"""Provide a sample exception for testing."""
try:
raise ValueError("Test exception for telemetry")
except ValueError as e:
return e
@pytest.fixture
def privacy_test_data():
"""Provide test data that should NOT be captured by telemetry."""
return {
'url': 'https://example.com/private-page',
'content': 'This is private content that should not be sent',
'user_data': {
'email': 'user@private.com',
'password': 'secret123',
'api_key': 'sk-1234567890abcdef'
},
'pii': {
'ssn': '123-45-6789',
'phone': '+1-555-123-4567',
'address': '123 Main St, Anytown, USA'
}
}

View File

@@ -0,0 +1,201 @@
"""
Test the complete fix for both the filter serialization and JSON serialization issues.
"""
import asyncio
import httpx
from crawl4ai import BrowserConfig, CacheMode, CrawlerRunConfig
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, FilterChain, URLPatternFilter
BASE_URL = "http://localhost:11234/" # Adjust port as needed
async def test_with_docker_client():
"""Test using the Docker client (same as 1419.py)."""
from crawl4ai.docker_client import Crawl4aiDockerClient
print("=" * 60)
print("Testing with Docker Client")
print("=" * 60)
try:
async with Crawl4aiDockerClient(
base_url=BASE_URL,
verbose=True,
) as client:
# Create filter chain - testing the serialization fix
filter_chain = [
URLPatternFilter(
# patterns=["*about*", "*privacy*", "*terms*"],
patterns=["*advanced*"],
reverse=True
),
]
crawler_config = CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(
max_depth=2, # Keep it shallow for testing
# max_pages=5, # Limit pages for testing
filter_chain=FilterChain(filter_chain)
),
cache_mode=CacheMode.BYPASS,
)
print("\n1. Testing crawl with filters...")
results = await client.crawl(
["https://docs.crawl4ai.com"], # Simple test page
browser_config=BrowserConfig(headless=True),
crawler_config=crawler_config,
)
if results:
print(f"✅ Crawl succeeded! Type: {type(results)}")
if hasattr(results, 'success'):
print(f"✅ Results success: {results.success}")
# Test that we can iterate results without JSON errors
if hasattr(results, '__iter__'):
for i, result in enumerate(results):
if hasattr(result, 'url'):
print(f" Result {i}: {result.url[:50]}...")
else:
print(f" Result {i}: {str(result)[:50]}...")
else:
# Handle list of results
print(f"✅ Got {len(results)} results")
for i, result in enumerate(results[:3]): # Show first 3
print(f" Result {i}: {result.url[:50]}...")
else:
print("❌ Crawl failed - no results returned")
return False
print("\n✅ Docker client test completed successfully!")
return True
except Exception as e:
print(f"❌ Docker client test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_with_rest_api():
"""Test using REST API directly."""
print("\n" + "=" * 60)
print("Testing with REST API")
print("=" * 60)
# Create filter configuration
deep_crawl_strategy_payload = {
"type": "BFSDeepCrawlStrategy",
"params": {
"max_depth": 2,
# "max_pages": 5,
"filter_chain": {
"type": "FilterChain",
"params": {
"filters": [
{
"type": "URLPatternFilter",
"params": {
"patterns": ["*advanced*"],
"reverse": True
}
}
]
}
}
}
}
crawl_payload = {
"urls": ["https://docs.crawl4ai.com"],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"deep_crawl_strategy": deep_crawl_strategy_payload,
"cache_mode": "bypass"
}
}
}
try:
async with httpx.AsyncClient() as client:
print("\n1. Sending crawl request to REST API...")
response = await client.post(
f"{BASE_URL}crawl",
json=crawl_payload,
timeout=30
)
if response.status_code == 200:
print(f"✅ REST API returned 200 OK")
data = response.json()
if data.get("success"):
results = data.get("results", [])
print(f"✅ Got {len(results)} results")
for i, result in enumerate(results[:3]):
print(f" Result {i}: {result.get('url', 'unknown')[:50]}...")
else:
print(f"❌ Crawl not successful: {data}")
return False
else:
print(f"❌ REST API returned {response.status_code}")
print(f" Response: {response.text[:500]}")
return False
print("\n✅ REST API test completed successfully!")
return True
except Exception as e:
print(f"❌ REST API test failed: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests."""
print("\n🧪 TESTING COMPLETE FIX FOR DOCKER FILTER AND JSON ISSUES")
print("=" * 60)
print("Make sure the server is running with the updated code!")
print("=" * 60)
results = []
# Test 1: Docker client
docker_passed = await test_with_docker_client()
results.append(("Docker Client", docker_passed))
# Test 2: REST API
rest_passed = await test_with_rest_api()
results.append(("REST API", rest_passed))
# Summary
print("\n" + "=" * 60)
print("FINAL TEST SUMMARY")
print("=" * 60)
all_passed = True
for test_name, passed in results:
status = "✅ PASSED" if passed else "❌ FAILED"
print(f"{test_name:20} {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("🎉 ALL TESTS PASSED! Both issues are fully resolved!")
print("\nThe fixes:")
print("1. Filter serialization: Fixed by not serializing private __slots__")
print("2. JSON serialization: Fixed by removing property descriptors from model_dump()")
else:
print("⚠️ Some tests failed. Please check the server logs for details.")
return 0 if all_passed else 1
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))

349
tests/docker/test_llm_params.py Executable file
View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Test script for LLM temperature and base_url parameters in Crawl4AI Docker API.
This demonstrates the new hierarchical configuration system:
1. Request-level parameters (highest priority)
2. Provider-specific environment variables
3. Global environment variables
4. System defaults (lowest priority)
"""
import asyncio
import httpx
import json
import os
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
console = Console()
# Configuration
BASE_URL = "http://localhost:11235" # Docker API endpoint
TEST_URL = "https://httpbin.org/html" # Simple test page
# --- Helper Functions ---
async def check_server_health(client: httpx.AsyncClient) -> bool:
"""Check if the server is healthy."""
console.print("[bold cyan]Checking server health...[/]", end="")
try:
response = await client.get("/health", timeout=10.0)
response.raise_for_status()
console.print(" [bold green]✓ Server is healthy![/]")
return True
except Exception as e:
console.print(f"\n[bold red]✗ Server health check failed: {e}[/]")
console.print(f"Is the server running at {BASE_URL}?")
return False
def print_request(endpoint: str, payload: dict, title: str = "Request"):
"""Pretty print the request."""
syntax = Syntax(json.dumps(payload, indent=2), "json", theme="monokai")
console.print(Panel.fit(
f"[cyan]POST {endpoint}[/cyan]\n{syntax}",
title=f"[bold blue]{title}[/]",
border_style="blue"
))
def print_response(response: dict, title: str = "Response"):
"""Pretty print relevant parts of the response."""
# Extract only the relevant parts
relevant = {}
if "markdown" in response:
relevant["markdown"] = response["markdown"][:200] + "..." if len(response.get("markdown", "")) > 200 else response.get("markdown", "")
if "success" in response:
relevant["success"] = response["success"]
if "url" in response:
relevant["url"] = response["url"]
if "filter" in response:
relevant["filter"] = response["filter"]
console.print(Panel.fit(
Syntax(json.dumps(relevant, indent=2), "json", theme="monokai"),
title=f"[bold green]{title}[/]",
border_style="green"
))
# --- Test Functions ---
async def test_default_no_params(client: httpx.AsyncClient):
"""Test 1: No temperature or base_url specified - uses defaults"""
console.rule("[bold yellow]Test 1: Default Configuration (No Parameters)[/]")
payload = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading of this page? Answer in exactly 5 words."
}
print_request("/md", payload, "Request without temperature/base_url")
try:
response = await client.post("/md", json=payload, timeout=30.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response (using system defaults)")
console.print("[dim]→ This used system defaults or environment variables if set[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_request_temperature(client: httpx.AsyncClient):
"""Test 2: Request-level temperature (highest priority)"""
console.rule("[bold yellow]Test 2: Request-Level Temperature[/]")
# Test with low temperature (more focused)
payload_low = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading? Be creative and poetic.",
"temperature": 0.1 # Very low - should be less creative
}
print_request("/md", payload_low, "Low Temperature (0.1)")
try:
response = await client.post("/md", json=payload_low, timeout=30.0)
response.raise_for_status()
data_low = response.json()
print_response(data_low, "Response with Low Temperature")
console.print("[dim]→ Low temperature (0.1) should produce focused, less creative output[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
console.print()
# Test with high temperature (more creative)
payload_high = {
"url": TEST_URL,
"f": "llm",
"q": "What is the main heading? Be creative and poetic.",
"temperature": 1.5 # High - should be more creative
}
print_request("/md", payload_high, "High Temperature (1.5)")
try:
response = await client.post("/md", json=payload_high, timeout=30.0)
response.raise_for_status()
data_high = response.json()
print_response(data_high, "Response with High Temperature")
console.print("[dim]→ High temperature (1.5) should produce more creative, varied output[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_provider_override(client: httpx.AsyncClient):
"""Test 3: Provider override with temperature"""
console.rule("[bold yellow]Test 3: Provider Override with Temperature[/]")
provider = "gemini/gemini-2.5-flash-lite"
payload = {
"url": TEST_URL,
"f": "llm",
"q": "Summarize this page in one sentence.",
"provider": provider, # Explicitly set provider
"temperature": 0.7
}
print_request("/md", payload, "Provider + Temperature Override")
try:
response = await client.post("/md", json=payload, timeout=30.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response with Provider Override")
console.print(f"[dim]→ This explicitly uses {provider} with temperature 0.7[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_base_url_custom(client: httpx.AsyncClient):
"""Test 4: Custom base_url (will fail unless you have a custom endpoint)"""
console.rule("[bold yellow]Test 4: Custom Base URL (Demo Only)[/]")
payload = {
"url": TEST_URL,
"f": "llm",
"q": "What is this page about?",
"base_url": "https://api.custom-endpoint.com/v1", # Custom endpoint
"temperature": 0.5
}
print_request("/md", payload, "Custom Base URL Request")
console.print("[yellow]Note: This will fail unless you have a custom endpoint set up[/]")
try:
response = await client.post("/md", json=payload, timeout=10.0)
response.raise_for_status()
data = response.json()
print_response(data, "Response from Custom Endpoint")
except httpx.HTTPStatusError as e:
console.print(f"[yellow]Expected failure (no custom endpoint): Status {e.response.status_code}[/]")
except Exception as e:
console.print(f"[yellow]Expected error: {e}[/]")
async def test_llm_job_endpoint(client: httpx.AsyncClient):
"""Test 5: Test the /llm/job endpoint with temperature and base_url"""
console.rule("[bold yellow]Test 5: LLM Job Endpoint with Parameters[/]")
payload = {
"url": TEST_URL,
"q": "Extract the main title and any key information",
"temperature": 0.3,
# "base_url": "https://api.openai.com/v1" # Optional
}
print_request("/llm/job", payload, "LLM Job with Temperature")
try:
# Submit the job
response = await client.post("/llm/job", json=payload, timeout=30.0)
response.raise_for_status()
job_data = response.json()
if "task_id" in job_data:
task_id = job_data["task_id"]
console.print(f"[green]Job created with task_id: {task_id}[/]")
# Poll for result (simplified - in production use proper polling)
await asyncio.sleep(3)
status_response = await client.get(f"/llm/job/{task_id}")
status_data = status_response.json()
if status_data.get("status") == "completed":
console.print("[green]Job completed successfully![/]")
if "result" in status_data:
console.print(Panel.fit(
Syntax(json.dumps(status_data["result"], indent=2), "json", theme="monokai"),
title="Extraction Result",
border_style="green"
))
else:
console.print(f"[yellow]Job status: {status_data.get('status', 'unknown')}[/]")
else:
console.print(f"[red]Unexpected response: {job_data}[/]")
except Exception as e:
console.print(f"[red]Error: {e}[/]")
async def test_llm_endpoint(client: httpx.AsyncClient):
"""
Quick QA round-trip with /llm.
Asks a trivial question against SIMPLE_URL just to show wiring.
"""
import time
import urllib.parse
page_url = "https://kidocode.com"
question = "What is the title of this page?"
enc = urllib.parse.quote_plus(page_url, safe="")
console.print(f"GET /llm/{enc}?q={question}")
try:
t0 = time.time()
resp = await client.get(f"/llm/{enc}", params={"q": question})
dt = time.time() - t0
console.print(
f"Response Status: [bold {'green' if resp.is_success else 'red'}]{resp.status_code}[/] (took {dt:.2f}s)")
resp.raise_for_status()
answer = resp.json().get("answer", "")
console.print(Panel(answer or "No answer returned",
title="LLM answer", border_style="magenta", expand=False))
except Exception as e:
console.print(f"[bold red]Error hitting /llm:[/] {e}")
async def show_environment_info():
"""Display current environment configuration"""
console.rule("[bold cyan]Current Environment Configuration[/]")
table = Table(title="LLM Environment Variables", show_header=True, header_style="bold magenta")
table.add_column("Variable", style="cyan", width=30)
table.add_column("Value", style="yellow")
table.add_column("Description", style="dim")
env_vars = [
("LLM_PROVIDER", "Global default provider"),
("LLM_TEMPERATURE", "Global default temperature"),
("LLM_BASE_URL", "Global custom API endpoint"),
("OPENAI_API_KEY", "OpenAI API key"),
("OPENAI_TEMPERATURE", "OpenAI-specific temperature"),
("OPENAI_BASE_URL", "OpenAI-specific endpoint"),
("ANTHROPIC_API_KEY", "Anthropic API key"),
("ANTHROPIC_TEMPERATURE", "Anthropic-specific temperature"),
("GROQ_API_KEY", "Groq API key"),
("GROQ_TEMPERATURE", "Groq-specific temperature"),
]
for var, desc in env_vars:
value = os.environ.get(var, "[not set]")
if "API_KEY" in var and value != "[not set]":
# Mask API keys for security
value = value[:10] + "..." if len(value) > 10 else "***"
table.add_row(var, value, desc)
console.print(table)
console.print()
# --- Main Test Runner ---
async def main():
"""Run all tests"""
console.print(Panel.fit(
"[bold cyan]Crawl4AI LLM Parameters Test Suite[/]\n" +
"Testing temperature and base_url configuration hierarchy",
border_style="cyan"
))
# Show current environment
# await show_environment_info()
# Create HTTP client
async with httpx.AsyncClient(base_url=BASE_URL, timeout=60.0) as client:
# Check server health
if not await check_server_health(client):
console.print("[red]Server is not available. Please ensure the Docker container is running.[/]")
return
# Run tests
tests = [
("Default Configuration", test_default_no_params),
("Request Temperature", test_request_temperature),
("Provider Override", test_provider_override),
("Custom Base URL", test_base_url_custom),
("LLM Job Endpoint", test_llm_job_endpoint),
("LLM Endpoint", test_llm_endpoint),
]
for i, (name, test_func) in enumerate(tests, 1):
if i > 1:
console.print() # Add spacing between tests
try:
await test_func(client)
except Exception as e:
console.print(f"[red]Test '{name}' failed with error: {e}[/]")
console.print_exception(show_locals=False)
console.rule("[bold green]All Tests Complete![/]", style="green")
# Summary
console.print("\n[bold cyan]Configuration Hierarchy Summary:[/]")
console.print("1. [yellow]Request parameters[/] - Highest priority (temperature, base_url in API call)")
console.print("2. [yellow]Provider-specific env[/] - e.g., OPENAI_TEMPERATURE, GROQ_BASE_URL")
console.print("3. [yellow]Global env variables[/] - LLM_TEMPERATURE, LLM_BASE_URL")
console.print("4. [yellow]System defaults[/] - Lowest priority (provider/litellm defaults)")
console.print()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n[yellow]Tests interrupted by user.[/]")
except Exception as e:
console.print(f"\n[bold red]An error occurred:[/]")
console.print_exception(show_locals=False)

View File

@@ -143,7 +143,40 @@ class TestCrawlEndpoints:
assert "<h1>Herman Melville - Moby-Dick</h1>" in result["html"]
# We don't specify a markdown generator in this test, so don't make assumptions about markdown field
# It might be null, missing, or populated depending on the server's default behavior
async def test_crawl_with_stream_direct(self, async_client: httpx.AsyncClient):
"""Test that /crawl endpoint handles stream=True directly without redirect."""
payload = {
"urls": [SIMPLE_HTML_URL],
"browser_config": {
"type": "BrowserConfig",
"params": {
"headless": True,
}
},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"stream": True, # Set stream to True for direct streaming
"screenshot": False,
"cache_mode": CacheMode.BYPASS.value
}
}
}
# Send a request to the /crawl endpoint - should handle streaming directly
async with async_client.stream("POST", "/crawl", json=payload) as response:
assert response.status_code == 200
assert response.headers["content-type"] == "application/x-ndjson"
assert response.headers.get("x-stream-status") == "active"
results = await process_streaming_response(response)
assert len(results) == 1
result = results[0]
await assert_crawl_result_structure(result)
assert result["success"] is True
assert result["url"] == SIMPLE_HTML_URL
assert "<h1>Herman Melville - Moby-Dick</h1>" in result["html"]
async def test_simple_crawl_single_url_streaming(self, async_client: httpx.AsyncClient):
"""Test /crawl/stream with a single URL and simple config values."""
payload = {
@@ -635,7 +668,209 @@ class TestCrawlEndpoints:
pytest.fail(f"LLM extracted content parsing or validation failed: {e}\nContent: {result['extracted_content']}")
except Exception as e: # Catch any other unexpected error
pytest.fail(f"An unexpected error occurred during LLM result processing: {e}\nContent: {result['extracted_content']}")
# 7. Error Handling Tests
async def test_invalid_url_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for invalid URLs."""
payload = {
"urls": ["invalid-url", "https://nonexistent-domain-12345.com"],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {"type": "CrawlerRunConfig", "params": {"cache_mode": CacheMode.BYPASS.value}}
}
response = await async_client.post("/crawl", json=payload)
# Should return 200 with failed results, not 500
print(f"Status code: {response.status_code}")
print(f"Response: {response.text}")
assert response.status_code == 500
data = response.json()
assert data["detail"].startswith("Crawl request failed:")
async def test_mixed_success_failure_urls(self, async_client: httpx.AsyncClient):
"""Test handling of mixed success/failure URLs."""
payload = {
"urls": [
SIMPLE_HTML_URL, # Should succeed
"https://nonexistent-domain-12345.com", # Should fail
"https://invalid-url-with-special-chars-!@#$%^&*()", # Should fail
],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"cache_mode": CacheMode.BYPASS.value,
"markdown_generator": {
"type": "DefaultMarkdownGenerator",
"params": {
"content_filter": {
"type": "PruningContentFilter",
"params": {"threshold": 0.5}
}
}
}
}
}
}
response = await async_client.post("/crawl", json=payload)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert len(data["results"]) == 3
success_count = 0
failure_count = 0
for result in data["results"]:
if result["success"]:
success_count += 1
else:
failure_count += 1
assert "error_message" in result
assert len(result["error_message"]) > 0
assert success_count >= 1 # At least one should succeed
assert failure_count >= 1 # At least one should fail
async def test_streaming_mixed_urls(self, async_client: httpx.AsyncClient):
"""Test streaming with mixed success/failure URLs."""
payload = {
"urls": [
SIMPLE_HTML_URL, # Should succeed
"https://nonexistent-domain-12345.com", # Should fail
],
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"stream": True,
"cache_mode": CacheMode.BYPASS.value
}
}
}
async with async_client.stream("POST", "/crawl/stream", json=payload) as response:
response.raise_for_status()
results = await process_streaming_response(response)
assert len(results) == 2
success_count = 0
failure_count = 0
for result in results:
if result["success"]:
success_count += 1
assert result["url"] == SIMPLE_HTML_URL
else:
failure_count += 1
assert "error_message" in result
assert result["error_message"] is not None
assert success_count == 1
assert failure_count == 1
async def test_markdown_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for markdown endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
response = await async_client.post("/md", json=invalid_payload)
# Should return 400 for invalid URL format
assert response.status_code == 400
# Test non-existent URL
nonexistent_payload = {"url": "https://nonexistent-domain-12345.com", "f": "fit"}
response = await async_client.post("/md", json=nonexistent_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_html_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for HTML endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/html", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_screenshot_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for screenshot endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/screenshot", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_pdf_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for PDF endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url"}
response = await async_client.post("/pdf", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_execute_js_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for execute_js endpoint."""
# Test invalid URL
invalid_payload = {"url": "invalid-url", "scripts": ["return document.title;"]}
response = await async_client.post("/execute_js", json=invalid_payload)
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_llm_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for LLM endpoint."""
# Test missing query parameter
response = await async_client.get("/llm/https://example.com")
assert response.status_code == 422 # FastAPI validation error, not 400
# Test invalid URL
response = await async_client.get("/llm/invalid-url?q=test")
# Should return 500 for crawl failure
assert response.status_code == 500
async def test_ask_endpoint_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for ask endpoint."""
# Test invalid context_type
response = await async_client.get("/ask?context_type=invalid")
assert response.status_code == 422 # Validation error
# Test invalid score_ratio
response = await async_client.get("/ask?score_ratio=2.0") # > 1.0
assert response.status_code == 422 # Validation error
# Test invalid max_results
response = await async_client.get("/ask?max_results=0") # < 1
assert response.status_code == 422 # Validation error
async def test_config_dump_error_handling(self, async_client: httpx.AsyncClient):
"""Test error handling for config dump endpoint."""
# Test invalid code
invalid_payload = {"code": "invalid_code"}
response = await async_client.post("/config/dump", json=invalid_payload)
assert response.status_code == 400
# Test nested function calls (not allowed)
nested_payload = {"code": "CrawlerRunConfig(BrowserConfig())"}
response = await async_client.post("/config/dump", json=nested_payload)
assert response.status_code == 400
async def test_malformed_request_handling(self, async_client: httpx.AsyncClient):
"""Test handling of malformed requests."""
# Test missing required fields
malformed_payload = {"urls": []} # Missing browser_config and crawler_config
response = await async_client.post("/crawl", json=malformed_payload)
print(f"Response: {response.text}")
assert response.status_code == 422 # Validation error
# Test empty URLs list
empty_urls_payload = {
"urls": [],
"browser_config": {"type": "BrowserConfig", "params": {}},
"crawler_config": {"type": "CrawlerRunConfig", "params": {}}
}
response = await async_client.post("/crawl", json=empty_urls_payload)
assert response.status_code == 422 # "At least one URL required"
if __name__ == "__main__":
# Define arguments for pytest programmatically
# -v: verbose output

View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""
Simple test to verify BestFirstCrawlingStrategy fixes.
This test crawls a real website and shows that:
1. Higher-scoring pages are crawled first (priority queue fix)
2. Links are scored before truncation (link discovery fix)
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.deep_crawling import BestFirstCrawlingStrategy
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
async def test_best_first_strategy():
"""Test BestFirstCrawlingStrategy with keyword scoring"""
print("=" * 70)
print("Testing BestFirstCrawlingStrategy with Real URL")
print("=" * 70)
print("\nThis test will:")
print("1. Crawl Python.org documentation")
print("2. Score pages based on keywords: 'tutorial', 'guide', 'reference'")
print("3. Show that higher-scoring pages are crawled first")
print("-" * 70)
# Create a keyword scorer that prioritizes tutorial/guide pages
scorer = KeywordRelevanceScorer(
keywords=["tutorial", "guide", "reference", "documentation"],
weight=1.0,
case_sensitive=False
)
# Create the strategy with scoring
strategy = BestFirstCrawlingStrategy(
max_depth=2, # Crawl 2 levels deep
max_pages=10, # Limit to 10 pages total
url_scorer=scorer, # Use keyword scoring
include_external=False # Only internal links
)
# Configure browser and crawler
browser_config = BrowserConfig(
headless=True, # Run in background
verbose=False # Reduce output noise
)
crawler_config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False
)
print("\nStarting crawl of https://docs.python.org/3/")
print("Looking for pages with keywords: tutorial, guide, reference, documentation")
print("-" * 70)
crawled_urls = []
async with AsyncWebCrawler(config=browser_config) as crawler:
# Crawl and collect results
results = await crawler.arun(
url="https://docs.python.org/3/",
config=crawler_config
)
# Process results
if isinstance(results, list):
for result in results:
score = result.metadata.get('score', 0) if result.metadata else 0
depth = result.metadata.get('depth', 0) if result.metadata else 0
crawled_urls.append({
'url': result.url,
'score': score,
'depth': depth,
'success': result.success
})
print("\n" + "=" * 70)
print("CRAWL RESULTS (in order of crawling)")
print("=" * 70)
for i, item in enumerate(crawled_urls, 1):
status = "" if item['success'] else ""
# Highlight high-scoring pages
if item['score'] > 0.5:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print(f" ^ HIGH SCORE - Contains keywords!")
else:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print("\n" + "=" * 70)
print("ANALYSIS")
print("=" * 70)
# Check if higher scores appear early in the crawl
scores = [item['score'] for item in crawled_urls[1:]] # Skip initial URL
high_score_indices = [i for i, s in enumerate(scores) if s > 0.3]
if high_score_indices and high_score_indices[0] < len(scores) / 2:
print("✅ SUCCESS: Higher-scoring pages (with keywords) were crawled early!")
print(" This confirms the priority queue fix is working.")
else:
print("⚠️ Check the crawl order above - higher scores should appear early")
# Show score distribution
print(f"\nScore Statistics:")
print(f" - Total pages crawled: {len(crawled_urls)}")
print(f" - Average score: {sum(item['score'] for item in crawled_urls) / len(crawled_urls):.2f}")
print(f" - Max score: {max(item['score'] for item in crawled_urls):.2f}")
print(f" - Pages with keywords: {sum(1 for item in crawled_urls if item['score'] > 0.3)}")
print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)
if __name__ == "__main__":
print("\n🔍 BestFirstCrawlingStrategy Simple Test\n")
asyncio.run(test_best_first_strategy())

View File

@@ -24,7 +24,7 @@ CASES = [
# --- BrowserConfig variants ---
"BrowserConfig()",
"BrowserConfig(headless=False, extra_args=['--disable-gpu'])",
"BrowserConfig(browser_mode='builtin', proxy='http://1.2.3.4:8080')",
"BrowserConfig(browser_mode='builtin', proxy_config={'server': 'http://1.2.3.4:8080'})",
]
for code in CASES:

View File

@@ -0,0 +1,42 @@
import warnings
import pytest
from crawl4ai.async_configs import BrowserConfig, ProxyConfig
def test_browser_config_proxy_string_emits_deprecation_and_autoconverts():
warnings.simplefilter("always", DeprecationWarning)
proxy_str = "23.95.150.145:6114:username:password"
with warnings.catch_warnings(record=True) as caught:
cfg = BrowserConfig(proxy=proxy_str, headless=True)
dep_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert dep_warnings, "Expected DeprecationWarning when using BrowserConfig(proxy=...)"
assert cfg.proxy is None, "cfg.proxy should be None after auto-conversion"
assert isinstance(cfg.proxy_config, ProxyConfig), "cfg.proxy_config should be ProxyConfig instance"
assert cfg.proxy_config.username == "username"
assert cfg.proxy_config.password == "password"
assert cfg.proxy_config.server.startswith("http://")
assert cfg.proxy_config.server.endswith(":6114")
def test_browser_config_with_proxy_config_emits_no_deprecation():
warnings.simplefilter("always", DeprecationWarning)
with warnings.catch_warnings(record=True) as caught:
cfg = BrowserConfig(
headless=True,
proxy_config={
"server": "http://127.0.0.1:8080",
"username": "u",
"password": "p",
},
)
dep_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert not dep_warnings, "Did not expect DeprecationWarning when using proxy_config"
assert cfg.proxy is None
assert isinstance(cfg.proxy_config, ProxyConfig)

View File

@@ -1,64 +0,0 @@
"""
Test configuration and utilities for telemetry testing.
"""
import os
import pytest
def pytest_configure(config): # noqa: ARG001
"""Configure pytest for telemetry tests."""
# Add custom markers
config.addinivalue_line("markers", "unit: Unit tests")
config.addinivalue_line("markers", "integration: Integration tests")
config.addinivalue_line("markers", "privacy: Privacy compliance tests")
config.addinivalue_line("markers", "performance: Performance tests")
config.addinivalue_line("markers", "slow: Slow running tests")
def pytest_collection_modifyitems(config, items): # noqa: ARG001
"""Modify test collection to add markers automatically."""
for item in items:
# Add markers based on test location and name
if "telemetry" in str(item.fspath):
if "integration" in item.name or "test_integration" in str(item.fspath):
item.add_marker(pytest.mark.integration)
elif "privacy" in item.name or "performance" in item.name:
if "privacy" in item.name:
item.add_marker(pytest.mark.privacy)
if "performance" in item.name:
item.add_marker(pytest.mark.performance)
else:
item.add_marker(pytest.mark.unit)
# Mark slow tests
if "slow" in item.name or any(mark.name == "slow" for mark in item.iter_markers()):
item.add_marker(pytest.mark.slow)
@pytest.fixture(autouse=True)
def setup_test_environment():
"""Set up test environment variables."""
# Ensure we're in test mode
os.environ['CRAWL4AI_TEST_MODE'] = '1'
# Disable actual telemetry during tests unless explicitly enabled
if 'CRAWL4AI_TELEMETRY_TEST_REAL' not in os.environ:
os.environ['CRAWL4AI_TELEMETRY'] = '0'
yield
# Clean up after tests
test_vars = ['CRAWL4AI_TEST_MODE', 'CRAWL4AI_TELEMETRY_TEST_REAL']
for var in test_vars:
if var in os.environ:
del os.environ[var]
def pytest_report_header(config): # noqa: ARG001
"""Add information to pytest header."""
return [
"Crawl4AI Telemetry Tests",
f"Test mode: {'ENABLED' if os.environ.get('CRAWL4AI_TEST_MODE') else 'DISABLED'}",
f"Real telemetry: {'ENABLED' if os.environ.get('CRAWL4AI_TELEMETRY_TEST_REAL') else 'DISABLED'}"
]

View File

@@ -1,216 +0,0 @@
"""
Integration tests for telemetry CLI commands.
"""
import pytest
import subprocess
import sys
import os
from unittest.mock import patch, Mock
@pytest.mark.integration
class TestTelemetryCLI:
"""Test telemetry CLI commands integration."""
def test_telemetry_status_command(self, clean_environment, temp_config_dir):
"""Test the telemetry status CLI command."""
# Import with mocked config
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = 'not_set'
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test status command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'status']):
try:
main()
except SystemExit:
pass # CLI commands often call sys.exit()
def test_telemetry_enable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry enable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test enable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'enable', '--email', 'test@example.com']):
try:
main()
except SystemExit:
pass
def test_telemetry_disable_command(self, clean_environment, temp_config_dir):
"""Test the telemetry disable CLI command."""
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
MockConfig.return_value = mock_config
from crawl4ai.cli import main
# Test disable command
with patch('sys.argv', ['crawl4ai', 'telemetry', 'disable']):
try:
main()
except SystemExit:
pass
@pytest.mark.slow
def test_cli_subprocess_integration(self, temp_config_dir):
"""Test CLI commands as subprocess calls."""
env = os.environ.copy()
env['CRAWL4AI_CONFIG_DIR'] = str(temp_config_dir)
# Test status command via subprocess
try:
result = subprocess.run(
[sys.executable, '-m', 'crawl4ai.cli', 'telemetry', 'status'],
env=env,
capture_output=True,
text=True,
timeout=10
)
# Should not crash, regardless of exit code
assert result.returncode in [0, 1] # May return 1 if not configured
except subprocess.TimeoutExpired:
pytest.skip("CLI command timed out")
except FileNotFoundError:
pytest.skip("CLI module not found")
@pytest.mark.integration
class TestAsyncWebCrawlerIntegration:
"""Test AsyncWebCrawler telemetry integration."""
@pytest.mark.asyncio
async def test_crawler_telemetry_decorator(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that AsyncWebCrawler methods are decorated with telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Check if the arun method has telemetry decoration
crawler = AsyncWebCrawler()
assert hasattr(crawler.arun, '__wrapped__') or callable(crawler.arun)
@pytest.mark.asyncio
async def test_crawler_exception_capture_integration(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that exceptions in AsyncWebCrawler are captured."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
with patch('crawl4ai.telemetry.capture_exception') as _mock_capture:
from crawl4ai import AsyncWebCrawler
async with AsyncWebCrawler() as crawler:
try:
# This should cause an exception
await crawler.arun(url="invalid://url")
except Exception:
pass # We expect this to fail
# The decorator should have attempted to capture the exception
# Note: This might not always be called depending on where the exception occurs
@pytest.mark.asyncio
async def test_crawler_with_disabled_telemetry(self, disabled_telemetry_config):
"""Test that AsyncWebCrawler works normally with disabled telemetry."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai import AsyncWebCrawler
# Should work normally even with telemetry disabled
async with AsyncWebCrawler() as crawler:
assert crawler is not None
@pytest.mark.integration
class TestDockerIntegration:
"""Test Docker environment telemetry integration."""
def test_docker_environment_detection(self, docker_environment, temp_config_dir):
"""Test that Docker environment is detected correctly."""
from crawl4ai.telemetry.environment import EnvironmentDetector
env = EnvironmentDetector.detect()
from crawl4ai.telemetry.environment import Environment
assert env == Environment.DOCKER
def test_docker_default_telemetry_enabled(self, temp_config_dir):
"""Test that telemetry is enabled by default in Docker."""
from crawl4ai.telemetry.environment import Environment
# Clear any existing environment variables that might interfere
with patch.dict(os.environ, {}, clear=True):
# Set only the Docker environment variable
os.environ['CRAWL4AI_DOCKER'] = 'true'
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to ALWAYS for Docker
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.ALWAYS
def test_docker_telemetry_can_be_disabled(self, temp_config_dir):
"""Test that Docker telemetry can be disabled via environment variable."""
from crawl4ai.telemetry.environment import Environment
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0', 'CRAWL4AI_DOCKER': 'true'}):
with patch('crawl4ai.telemetry.environment.EnvironmentDetector.detect', return_value=Environment.DOCKER):
from crawl4ai.telemetry.consent import ConsentManager
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
config = TelemetryConfig(config_dir=temp_config_dir)
consent_manager = ConsentManager(config)
# Should set consent to DENIED when env var is 0
consent_manager.check_and_prompt()
assert config.get_consent() == TelemetryConsent.DENIED
@pytest.mark.integration
class TestTelemetryProviderIntegration:
"""Test telemetry provider integration."""
def test_sentry_provider_initialization(self, enabled_telemetry_config):
"""Test that Sentry provider initializes correctly."""
try:
from crawl4ai.telemetry.providers.sentry import SentryProvider
provider = SentryProvider()
# Should not crash during initialization
assert provider is not None
except ImportError:
pytest.skip("Sentry provider not available")
def test_null_provider_fallback(self, disabled_telemetry_config):
"""Test that NullProvider is used when telemetry is disabled."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
def test_graceful_degradation_without_sentry(self, enabled_telemetry_config):
"""Test graceful degradation when sentry-sdk is not available."""
with patch.dict('sys.modules', {'sentry_sdk': None}):
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
from crawl4ai.telemetry import TelemetryManager
from crawl4ai.telemetry.base import NullProvider
# Should fall back to NullProvider when Sentry is not available
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider) # noqa: SLF001
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,283 +0,0 @@
"""
Privacy and performance tests for telemetry system.
"""
import pytest
import time
import asyncio
from unittest.mock import patch
from crawl4ai.telemetry import telemetry_decorator, async_telemetry_decorator, TelemetryManager
@pytest.mark.privacy
class TestTelemetryPrivacy:
"""Test privacy compliance of telemetry system."""
def test_no_url_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that URLs are not captured in telemetry data."""
# Ensure config is properly set for sending
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
# Mock the provider directly in the manager
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
# Create exception with URL in context
exception = ValueError("Test error")
context = {'url': privacy_test_data['url']}
manager.capture_exception(exception, context)
# Verify that the provider was called
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that context was passed to the provider (filtering happens in provider)
assert len(call_args) >= 2
def test_no_content_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that crawled content is not captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'content': privacy_test_data['content'],
'html': '<html><body>Private content</body></html>',
'text': 'Extracted private text'
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_no_pii_captured(self, enabled_telemetry_config, mock_sentry_provider, privacy_test_data):
"""Test that PII is not captured in telemetry."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = privacy_test_data['user_data'].copy()
context.update(privacy_test_data['pii'])
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Verify that the provider was called (actual filtering would happen in provider)
assert len(call_args) >= 2
def test_sanitized_context_captured(self, enabled_telemetry_config, mock_sentry_provider):
"""Test that only safe context is captured."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
exception = ValueError("Test error")
context = {
'operation': 'crawl', # Safe to capture
'status_code': 404, # Safe to capture
'retry_count': 3, # Safe to capture
'user_email': 'secret@example.com', # Should be in context (not filtered at this level)
'content': 'private content' # Should be in context (not filtered at this level)
}
manager.capture_exception(exception, context)
mock_sentry_provider.send_exception.assert_called_once()
call_args = mock_sentry_provider.send_exception.call_args
# Get the actual arguments passed to the mock
args, kwargs = call_args
assert len(args) >= 2, f"Expected at least 2 args, got {len(args)}"
# The second argument should be the context
captured_context = args[1]
# The basic context should be present (this tests the manager, not the provider filtering)
assert 'operation' in captured_context, f"operation not found in {captured_context}"
assert captured_context.get('operation') == 'crawl'
assert captured_context.get('status_code') == 404
assert captured_context.get('retry_count') == 3
@pytest.mark.performance
class TestTelemetryPerformance:
"""Test performance impact of telemetry system."""
def test_decorator_overhead_sync(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of sync telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with telemetry decorator."""
time.sleep(0.001) # Simulate small amount of work
return "success"
# Measure time with telemetry
start_time = time.time()
for _ in range(100):
test_function()
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead
assert telemetry_time < 1.0 # Should complete 100 calls in under 1 second
@pytest.mark.asyncio
async def test_decorator_overhead_async(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test performance overhead of async telemetry decorator."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
@async_telemetry_decorator
async def test_async_function():
"""Test async function with telemetry decorator."""
await asyncio.sleep(0.001) # Simulate small amount of async work
return "success"
# Measure time with telemetry
start_time = time.time()
tasks = [test_async_function() for _ in range(100)]
await asyncio.gather(*tasks)
telemetry_time = time.time() - start_time
# Telemetry should add minimal overhead to async operations
assert telemetry_time < 2.0 # Should complete 100 async calls in under 2 seconds
def test_disabled_telemetry_performance(self, disabled_telemetry_config):
"""Test that disabled telemetry has zero overhead."""
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=disabled_telemetry_config):
@telemetry_decorator
def test_function():
"""Test function with disabled telemetry."""
time.sleep(0.001)
return "success"
# Measure time with disabled telemetry
start_time = time.time()
for _ in range(100):
test_function()
disabled_time = time.time() - start_time
# Should be very fast when disabled
assert disabled_time < 0.5 # Should be faster than enabled telemetry
def test_telemetry_manager_initialization_performance(self):
"""Test that TelemetryManager initializes quickly."""
start_time = time.time()
# Initialize multiple managers (should use singleton)
for _ in range(10):
TelemetryManager.get_instance()
init_time = time.time() - start_time
# Initialization should be fast
assert init_time < 0.1 # Should initialize in under 100ms
def test_config_loading_performance(self, temp_config_dir):
"""Test that config loading is fast."""
from crawl4ai.telemetry.config import TelemetryConfig
# Create config with some data
config = TelemetryConfig(config_dir=temp_config_dir)
from crawl4ai.telemetry.config import TelemetryConsent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
start_time = time.time()
# Load config multiple times
for _ in range(100):
new_config = TelemetryConfig(config_dir=temp_config_dir)
new_config.get_consent()
load_time = time.time() - start_time
# Config loading should be fast
assert load_time < 0.5 # Should load 100 times in under 500ms
@pytest.mark.performance
class TestTelemetryScalability:
"""Test telemetry system scalability."""
def test_multiple_exception_capture(self, enabled_telemetry_config, mock_sentry_provider):
"""Test capturing multiple exceptions in sequence."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
start_time = time.time()
# Capture many exceptions
for i in range(50):
exception = ValueError(f"Test error {i}")
manager.capture_exception(exception, {'operation': f'test_{i}'})
capture_time = time.time() - start_time
# Should handle multiple exceptions efficiently
assert capture_time < 1.0 # Should capture 50 exceptions in under 1 second
assert mock_sentry_provider.send_exception.call_count <= 50 # May be less due to consent checks
@pytest.mark.asyncio
async def test_concurrent_exception_capture(self, enabled_telemetry_config, mock_sentry_provider): # noqa: ARG002
"""Test concurrent exception capture performance."""
# Ensure config is properly set
enabled_telemetry_config.is_enabled.return_value = True
enabled_telemetry_config.should_send_current.return_value = True
with patch('crawl4ai.telemetry.TelemetryConfig', return_value=enabled_telemetry_config):
manager = TelemetryManager()
manager._provider = mock_sentry_provider # noqa: SLF001
manager._initialized = True # noqa: SLF001
async def capture_exception_async(i):
exception = ValueError(f"Concurrent error {i}")
return manager.capture_exception(exception, {'operation': f'concurrent_{i}'})
start_time = time.time()
# Capture exceptions concurrently
tasks = [capture_exception_async(i) for i in range(20)]
await asyncio.gather(*tasks)
capture_time = time.time() - start_time
# Should handle concurrent exceptions efficiently
assert capture_time < 1.0 # Should capture 20 concurrent exceptions in under 1 second
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,241 +0,0 @@
"""
Tests for Crawl4AI telemetry functionality.
"""
import pytest
import os
import tempfile
from pathlib import Path
import json
from unittest.mock import Mock, patch, MagicMock
from crawl4ai.telemetry import (
TelemetryManager,
capture_exception,
enable,
disable,
status
)
from crawl4ai.telemetry.config import TelemetryConfig, TelemetryConsent
from crawl4ai.telemetry.environment import Environment, EnvironmentDetector
from crawl4ai.telemetry.base import NullProvider
from crawl4ai.telemetry.consent import ConsentManager
class TestTelemetryConfig:
"""Test telemetry configuration management."""
def test_config_initialization(self):
"""Test config initialization with custom directory."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
assert config.config_dir == Path(tmpdir)
assert config.get_consent() == TelemetryConsent.NOT_SET
def test_consent_persistence(self):
"""Test that consent is saved and loaded correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
# Set consent
config.set_consent(TelemetryConsent.ALWAYS, email="test@example.com")
# Create new config instance to test persistence
config2 = TelemetryConfig(config_dir=Path(tmpdir))
assert config2.get_consent() == TelemetryConsent.ALWAYS
assert config2.get_email() == "test@example.com"
def test_environment_variable_override(self):
"""Test that environment variables override config."""
with tempfile.TemporaryDirectory() as tmpdir:
config = TelemetryConfig(config_dir=Path(tmpdir))
config.set_consent(TelemetryConsent.ALWAYS)
# Set environment variable to disable
os.environ['CRAWL4AI_TELEMETRY'] = '0'
try:
config.update_from_env()
assert config.get_consent() == TelemetryConsent.DENIED
finally:
del os.environ['CRAWL4AI_TELEMETRY']
class TestEnvironmentDetection:
"""Test environment detection functionality."""
def test_cli_detection(self):
"""Test CLI environment detection."""
# Mock sys.stdin.isatty
with patch('sys.stdin.isatty', return_value=True):
env = EnvironmentDetector.detect()
# Should detect as CLI in most test environments
assert env in [Environment.CLI, Environment.UNKNOWN]
def test_docker_detection(self):
"""Test Docker environment detection."""
# Mock Docker environment
with patch.dict(os.environ, {'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.DOCKER
def test_api_server_detection(self):
"""Test API server detection."""
with patch.dict(os.environ, {'CRAWL4AI_API_SERVER': 'true', 'CRAWL4AI_DOCKER': 'true'}):
env = EnvironmentDetector.detect()
assert env == Environment.API_SERVER
class TestTelemetryManager:
"""Test the main telemetry manager."""
def test_singleton_pattern(self):
"""Test that TelemetryManager is a singleton."""
manager1 = TelemetryManager.get_instance()
manager2 = TelemetryManager.get_instance()
assert manager1 is manager2
def test_exception_capture(self):
"""Test exception capture functionality."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create manager with custom config dir
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.ALWAYS
mock_config.is_enabled.return_value = True
mock_config.should_send_current.return_value = True
mock_config.get_email.return_value = "test@example.com"
mock_config.update_from_env.return_value = None
MockConfig.return_value = mock_config
# Mock the provider setup
with patch('crawl4ai.telemetry.providers.sentry.SentryProvider') as MockSentryProvider:
mock_provider = Mock()
mock_provider.initialize.return_value = True
mock_provider.send_exception.return_value = True
MockSentryProvider.return_value = mock_provider
manager = TelemetryManager()
# Test exception capture
test_exception = ValueError("Test error")
result = manager.capture_exception(test_exception, {'test': 'context'})
# Verify the exception was processed
assert mock_config.should_send_current.called
def test_null_provider_when_disabled(self):
"""Test that NullProvider is used when telemetry is disabled."""
with tempfile.TemporaryDirectory() as tmpdir:
with patch('crawl4ai.telemetry.TelemetryConfig') as MockConfig:
mock_config = Mock()
mock_config.get_consent.return_value = TelemetryConsent.DENIED
mock_config.is_enabled.return_value = False
MockConfig.return_value = mock_config
manager = TelemetryManager()
assert isinstance(manager._provider, NullProvider)
class TestConsentManager:
"""Test consent management functionality."""
def test_docker_default_enabled(self):
"""Test that Docker environment has telemetry enabled by default."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch('os.environ.get') as mock_env_get:
# Mock os.environ.get to return None for CRAWL4AI_TELEMETRY
mock_env_get.return_value = None
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent_manager.check_and_prompt()
# Should be enabled by default in Docker
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.ALWAYS
def test_docker_disabled_by_env(self):
"""Test that Docker telemetry can be disabled via environment variable."""
with patch('crawl4ai.telemetry.consent.EnvironmentDetector.detect', return_value=Environment.DOCKER):
with patch.dict(os.environ, {'CRAWL4AI_TELEMETRY': '0'}):
config = Mock()
config.get_consent.return_value = TelemetryConsent.NOT_SET
consent_manager = ConsentManager(config)
consent = consent_manager.check_and_prompt()
# Should be disabled
assert config.set_consent.called
assert config.set_consent.call_args[0][0] == TelemetryConsent.DENIED
class TestPublicAPI:
"""Test the public API functions."""
@patch('crawl4ai.telemetry.get_telemetry')
def test_enable_function(self, mock_get_telemetry):
"""Test the enable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
enable(email="test@example.com", always=True)
mock_manager.enable.assert_called_once_with(
email="test@example.com",
always=True,
once=False
)
@patch('crawl4ai.telemetry.get_telemetry')
def test_disable_function(self, mock_get_telemetry):
"""Test the disable() function."""
mock_manager = Mock()
mock_get_telemetry.return_value = mock_manager
disable()
mock_manager.disable.assert_called_once()
@patch('crawl4ai.telemetry.get_telemetry')
def test_status_function(self, mock_get_telemetry):
"""Test the status() function."""
mock_manager = Mock()
mock_manager.status.return_value = {
'enabled': True,
'consent': 'always',
'email': 'test@example.com'
}
mock_get_telemetry.return_value = mock_manager
result = status()
assert result['enabled'] is True
assert result['consent'] == 'always'
assert result['email'] == 'test@example.com'
class TestIntegration:
"""Integration tests for telemetry with AsyncWebCrawler."""
@pytest.mark.asyncio
async def test_crawler_exception_capture(self):
"""Test that AsyncWebCrawler captures exceptions."""
from crawl4ai import AsyncWebCrawler
with patch('crawl4ai.telemetry.capture_exception') as mock_capture:
# This should trigger an exception for invalid URL
async with AsyncWebCrawler() as crawler:
try:
# Use an invalid URL that will cause an error
result = await crawler.arun(url="not-a-valid-url")
except Exception:
pass
# Check if exception was captured (may not be called if error is handled)
# This is more of a smoke test to ensure the integration doesn't break
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
Final test and demo for HTTPS preservation feature (Issue #1410)
This demonstrates how the preserve_https_for_internal_links flag
prevents HTTPS downgrade when servers redirect to HTTP.
"""
import sys
import os
from urllib.parse import urljoin, urlparse
def demonstrate_issue():
"""Show the problem: HTTPS -> HTTP redirect causes HTTP links"""
print("=" * 60)
print("DEMONSTRATING THE ISSUE")
print("=" * 60)
# Simulate what happens during crawling
original_url = "https://quotes.toscrape.com/tag/deep-thoughts"
redirected_url = "http://quotes.toscrape.com/tag/deep-thoughts/" # Server redirects to HTTP
# Extract a relative link
relative_link = "/author/Albert-Einstein"
# Standard URL joining uses the redirected (HTTP) base
resolved_url = urljoin(redirected_url, relative_link)
print(f"Original URL: {original_url}")
print(f"Redirected to: {redirected_url}")
print(f"Relative link: {relative_link}")
print(f"Resolved link: {resolved_url}")
print(f"\n❌ Problem: Link is now HTTP instead of HTTPS!")
return resolved_url
def demonstrate_solution():
"""Show the solution: preserve HTTPS for internal links"""
print("\n" + "=" * 60)
print("DEMONSTRATING THE SOLUTION")
print("=" * 60)
# Our normalize_url with HTTPS preservation
def normalize_url_with_preservation(href, base_url, preserve_https=False, original_scheme=None):
"""Normalize URL with optional HTTPS preservation"""
# Standard resolution
full_url = urljoin(base_url, href.strip())
# Preserve HTTPS if requested
if preserve_https and original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Only for same-domain links
if parsed_full.scheme == 'http' and parsed_full.netloc == parsed_base.netloc:
full_url = full_url.replace('http://', 'https://', 1)
print(f" → Preserved HTTPS for {parsed_full.netloc}")
return full_url
# Same scenario as before
original_url = "https://quotes.toscrape.com/tag/deep-thoughts"
redirected_url = "http://quotes.toscrape.com/tag/deep-thoughts/"
relative_link = "/author/Albert-Einstein"
# Without preservation (current behavior)
resolved_without = normalize_url_with_preservation(
relative_link, redirected_url,
preserve_https=False, original_scheme='https'
)
print(f"\nWithout preservation:")
print(f" Result: {resolved_without}")
# With preservation (new feature)
resolved_with = normalize_url_with_preservation(
relative_link, redirected_url,
preserve_https=True, original_scheme='https'
)
print(f"\nWith preservation (preserve_https_for_internal_links=True):")
print(f" Result: {resolved_with}")
print(f"\n✅ Solution: Internal link stays HTTPS!")
return resolved_with
def test_edge_cases():
"""Test important edge cases"""
print("\n" + "=" * 60)
print("EDGE CASES")
print("=" * 60)
from urllib.parse import urljoin, urlparse
def preserve_https(href, base_url, original_scheme):
"""Helper to test preservation logic"""
full_url = urljoin(base_url, href)
if original_scheme == 'https':
parsed_full = urlparse(full_url)
parsed_base = urlparse(base_url)
# Fixed: check for protocol-relative URLs
if (parsed_full.scheme == 'http' and
parsed_full.netloc == parsed_base.netloc and
not href.strip().startswith('//')):
full_url = full_url.replace('http://', 'https://', 1)
return full_url
test_cases = [
# (description, href, base_url, original_scheme, should_be_https)
("External link", "http://other.com/page", "http://example.com", "https", False),
("Already HTTPS", "/page", "https://example.com", "https", True),
("No original HTTPS", "/page", "http://example.com", "http", False),
("Subdomain", "/page", "http://sub.example.com", "https", True),
("Protocol-relative", "//example.com/page", "http://example.com", "https", False),
]
for desc, href, base_url, orig_scheme, should_be_https in test_cases:
result = preserve_https(href, base_url, orig_scheme)
is_https = result.startswith('https://')
status = "" if is_https == should_be_https else ""
print(f"\n{status} {desc}:")
print(f" Input: {href} + {base_url}")
print(f" Result: {result}")
print(f" Expected HTTPS: {should_be_https}, Got: {is_https}")
def usage_example():
"""Show how to use the feature in crawl4ai"""
print("\n" + "=" * 60)
print("USAGE IN CRAWL4AI")
print("=" * 60)
print("""
To enable HTTPS preservation in your crawl4ai code:
```python
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
async with AsyncWebCrawler() as crawler:
config = CrawlerRunConfig(
preserve_https_for_internal_links=True # Enable HTTPS preservation
)
result = await crawler.arun(
url="https://example.com",
config=config
)
# All internal links will maintain HTTPS even if
# the server redirects to HTTP
```
This is especially useful for:
- Sites that redirect HTTPS to HTTP but still support HTTPS
- Security-conscious crawling where you want to stay on HTTPS
- Avoiding mixed content issues in downstream processing
""")
if __name__ == "__main__":
# Run all demonstrations
demonstrate_issue()
demonstrate_solution()
test_edge_cases()
usage_example()
print("\n" + "=" * 60)
print("✅ All tests complete!")
print("=" * 60)