Files
crawl4ai/crawl4ai/async_dispatcher.py
Nasrin 7cac008c10 Release/v0.7.6 (#1556)
* fix(docker-api): migrate to modern datetime library API

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>

* Fix examples in README.md

* feat(docker): add user-provided hooks support to Docker API

Implements comprehensive hooks functionality allowing users to provide custom Python
functions as strings that execute at specific points in the crawling pipeline.

Key Features:
- Support for all 8 crawl4ai hook points:
  • on_browser_created: Initialize browser settings
  • on_page_context_created: Configure page context
  • before_goto: Pre-navigation setup
  • after_goto: Post-navigation processing
  • on_user_agent_updated: User agent modification handling
  • on_execution_started: Crawl execution initialization
  • before_retrieve_html: Pre-extraction processing
  • before_return_html: Final HTML processing

Implementation Details:
- Created UserHookManager for validation, compilation, and safe execution
- Added IsolatedHookWrapper for error isolation and timeout protection
- AST-based validation ensures code structure correctness
- Sandboxed execution with restricted builtins for security
- Configurable timeout (1-120 seconds) prevents infinite loops
- Comprehensive error handling ensures hooks don't crash main process
- Execution tracking with detailed statistics and logging

API Changes:
- Added HookConfig schema with code and timeout fields
- Extended CrawlRequest with optional hooks parameter
- Added /hooks/info endpoint for hook discovery
- Updated /crawl and /crawl/stream endpoints to support hooks

Safety Features:
- Malformed hooks return clear validation errors
- Hook errors are isolated and reported without stopping crawl
- Execution statistics track success/failure/timeout rates
- All hook results are JSON-serializable

Testing:
- Comprehensive test suite covering all 8 hooks
- Error handling and timeout scenarios validated
- Authentication, performance, and content extraction examples
- 100% success rate in production testing

Documentation:
- Added extensive hooks section to docker-deployment.md
- Security warnings about user-provided code risks
- Real-world examples using httpbin.org, GitHub, BBC
- Best practices and troubleshooting guide

ref #1377

* fix(deep-crawl): BestFirst priority inversion; remove pre-scoring truncation. ref #1253

  Use negative scores in PQ to visit high-score URLs first and drop link cap prior to scoring; add test for ordering.

* docs: Update URL seeding examples to use proper async context managers
- Wrap all AsyncUrlSeeder usage with async context managers
- Update URL seeding adventure example to use "sitemap+cc" source, focus on course posts, and add stream=True parameter to fix runtime error

* fix(crawler): Removed the incorrect reference in browser_config variable #1310

* docs: update Docker instructions to use the latest release tag

* fix(docker): Fix LLM API key handling for multi-provider support

Previously, the system incorrectly used OPENAI_API_KEY for all LLM providers
due to a hardcoded api_key_env fallback in config.yml. This caused authentication
errors when using non-OpenAI providers like Gemini.

Changes:
- Remove api_key_env from config.yml to let litellm handle provider-specific env vars
- Simplify get_llm_api_key() to return None, allowing litellm to auto-detect keys
- Update validate_llm_provider() to trust litellm's built-in key detection
- Update documentation to reflect the new automatic key handling

The fix leverages litellm's existing capability to automatically find the correct
environment variable for each provider (OPENAI_API_KEY, GEMINI_API_TOKEN, etc.)
without manual configuration.

ref #1291

* docs: update adaptive crawler docs and cache defaults; remove deprecated examples (#1330)
- Replace BaseStrategy with CrawlStrategy in custom strategy examples (DomainSpecificStrategy, HybridStrategy)
- Remove “Custom Link Scoring” and “Caching Strategy” sections no longer aligned with current library
- Revise memory pruning example to use adaptive.get_relevant_content and index-based retention of top 500 docs
- Correct Quickstart note: default cache mode is CacheMode.BYPASS; instruct enabling with CacheMode.ENABLED

* fix(utils): Improve URL normalization by avoiding quote/unquote to preserve '+' signs. ref #1332

* feat: Add comprehensive website to API example with frontend

This commit adds a complete, web scraping API example that demonstrates how to get structured data from any website and use it like an API using the crawl4ai library with a minimalist frontend interface.

Core Functionality
- AI-powered web scraping with plain English queries
- Dual scraping approaches: Schema-based (faster) and LLM-based (flexible)
- Intelligent schema caching for improved performance
- Custom LLM model support with API key management
- Automatic duplicate request prevention

Modern Frontend Interface
- Minimalist black-and-white design inspired by modern web apps
- Responsive layout with smooth animations and transitions
- Three main pages: Scrape Data, Models Management, API Request History
- Real-time results display with JSON formatting
- Copy-to-clipboard functionality for extracted data
- Toast notifications for user feedback
- Auto-scroll to results when scraping starts

Model Management System
- Web-based model configuration interface
- Support for any LLM provider (OpenAI, Gemini, Anthropic, etc.)
- Simplified configuration requiring only provider and API token
- Add, list, and delete model configurations
- Secure storage of API keys in local JSON files

API Request History
- Automatic saving of all API requests and responses
- Display of request history with URL, query, and cURL commands
- Duplicate prevention (same URL + query combinations)
- Request deletion functionality
- Clean, simplified display focusing on essential information

Technical Implementation

Backend (FastAPI)
- RESTful API with comprehensive endpoints
- Pydantic models for request/response validation
- Async web scraping with crawl4ai library
- Error handling with detailed error messages
- File-based storage for models and request history

Frontend (Vanilla JS/CSS/HTML)
- No framework dependencies - pure HTML, CSS, JavaScript
- Modern CSS Grid and Flexbox layouts
- Custom dropdown styling with SVG arrows
- Responsive design for mobile and desktop
- Smooth scrolling and animations

Core Library Integration
- WebScraperAgent class for orchestration
- ModelConfig class for LLM configuration management
- Schema generation and caching system
- LLM extraction strategy support
- Browser configuration with headless mode

* fix(dependencies): add cssselect to project dependencies

Fixes bug reported in issue #1405
[Bug]: Excluded selector (excluded_selector) doesn't work

This commit reintroduces the cssselect library which was removed by PR (https://github.com/unclecode/crawl4ai/pull/1368) and merged via (437395e490).

Integration tested against 0.7.4 Docker container. Reintroducing cssselector package eliminated errors seen in logs and excluded_selector functionality was restored.

Refs: #1405

* fix(docker): resolve filter serialization and JSON encoding errors in deep crawl strategy (ref #1419)

  - Fix URLPatternFilter serialization by preventing private __slots__ from being serialized as constructor params
  - Add public attributes to URLPatternFilter to store original constructor parameters for proper serialization
  - Handle property descriptors in CrawlResult.model_dump() to prevent JSON serialization errors
  - Ensure filter chains work correctly with Docker client and REST API

  The issue occurred because:
  1. Private implementation details (_simple_suffixes, etc.) were being serialized and passed as constructor arguments during deserialization
  2. Property descriptors were being included in the serialized output, causing "Object of type property is not JSON serializable" errors

  Changes:
  - async_configs.py: Comment out __slots__ serialization logic (lines 100-109)
  - filters.py: Add patterns, use_glob, reverse to URLPatternFilter __slots__ and store as public attributes
  - models.py: Convert property descriptors to strings in model_dump() instead of including them directly

* fix(logger): ensure logger is a Logger instance in crawling strategies. ref #1437

* feat(docker): Add temperature and base_url parameters for LLM configuration. ref #1035

  Implement hierarchical configuration for LLM parameters with support for:
  - Temperature control (0.0-2.0) to adjust response creativity
  - Custom base_url for proxy servers and alternative endpoints
  - 4-tier priority: request params > provider env > global env > defaults

  Add helper functions in utils.py, update API schemas and handlers,
  support environment variables (LLM_TEMPERATURE, OPENAI_TEMPERATURE, etc.),
  and provide comprehensive documentation with examples.

* feat(docker): improve docker error handling
- Return comprehensive error messages along with status codes for api internal errors.
- Fix fit_html property serialization issue in both /crawl and /crawl/stream endpoints
- Add sanitization to ensure fit_html is always JSON-serializable (string or None)
- Add comprehensive error handling test suite.

* #1375 : refactor(proxy) Deprecate 'proxy' parameter in BrowserConfig and enhance proxy string parsing

- Updated ProxyConfig.from_string to support multiple proxy formats, including URLs with credentials.
- Deprecated the 'proxy' parameter in BrowserConfig, replacing it with 'proxy_config' for better flexibility.
- Added warnings for deprecated usage and clarified behavior when both parameters are provided.
- Updated documentation and tests to reflect changes in proxy configuration handling.

* Remove deprecated test for 'proxy' parameter in BrowserConfig and update .gitignore to include test_scripts directory.

* feat: add preserve_https_for_internal_links flag to maintain HTTPS during crawling. Ref #1410

Added a new `preserve_https_for_internal_links` configuration flag that preserves the original HTTPS scheme for same-domain links even when the server redirects to HTTP.

* feat: update documentation for preserve_https_for_internal_links. ref #1410

* fix: drop Python 3.9 support and require Python >=3.10.
The library no longer supports Python 3.9 and so it was important to drop all references to python 3.9.
Following changes have been made:
- pyproject.toml: set requires-python to ">=3.10"; remove 3.9 classifier
- setup.py: set python_requires to ">=3.10"; remove 3.9 classifier
- docs: update Python version mentions
  - deploy/docker/c4ai-doc-context.md: options -> 3.10, 3.11, 3.12, 3.13

* issue #1329 refactor(crawler): move unwanted properties to CrawlerRunConfig class

* fix(auth): fixed Docker JWT authentication. ref #1442

* remove: delete unused yoyo snapshot subproject

* fix: raise error on last attempt failure in perform_completion_with_backoff. ref #989

* Commit without API

* fix: update option labels in request builder for clarity

* fix: allow custom LLM providers for adaptive crawler embedding config. ref: #1291

  - Change embedding_llm_config from Dict to Union[LLMConfig, Dict] for type safety
  - Add backward-compatible conversion property _embedding_llm_config_dict
  - Replace all hardcoded OpenAI embedding configs with configurable options
  - Fix LLMConfig object attribute access in query expansion logic
  - Add comprehensive example demonstrating multiple provider configurations
  - Update documentation with both LLMConfig object and dictionary usage patterns

  Users can now specify any LLM provider for query expansion in embedding strategy:
  - New: embedding_llm_config=LLMConfig(provider='anthropic/claude-3', api_token='key')
  - Old: embedding_llm_config={'provider': 'openai/gpt-4', 'api_token': 'key'} (still works)

* refactor(BrowserConfig): change deprecation warning for 'proxy' parameter to UserWarning

* feat(StealthAdapter): fix stealth features for Playwright integration. ref #1481

* #1505 fix(api): update config handling to only set base config if not provided by user

* fix(docker-deployment): replace console.log with print for metadata extraction

* Release v0.7.5: The Update

- Updated version to 0.7.5
- Added comprehensive demo and release notes
- Updated documentation

* refactor(release): remove memory management section for cleaner documentation. ref #1443

* feat(docs): add brand book and page copy functionality

- Add comprehensive brand book with color system, typography, components
- Add page copy dropdown with markdown copy/view functionality
- Update mkdocs.yml with new assets and branding navigation
- Use terminal-style ASCII icons and condensed menu design

* Update gitignore add local scripts folder

* fix: remove this import as it causes python to treat "json" as a variable in the except block

* fix: always return a list, even if we catch an exception

* feat(marketplace): Add Crawl4AI marketplace with secure configuration

- Implement marketplace frontend and admin dashboard
- Add FastAPI backend with environment-based configuration
- Use .env file for secrets management
- Include data generation scripts
- Add proper CORS configuration
- Remove hardcoded password from admin login
- Update gitignore for security

* fix(marketplace): Update URLs to use /marketplace path and relative API endpoints

- Change API_BASE to relative '/api' for production
- Move marketplace to /marketplace instead of /marketplace/frontend
- Update MkDocs navigation
- Fix logo path in marketplace index

* fix(docs): hide copy menu on non-markdown pages

* feat(marketplace): add sponsor logo uploads

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>

* feat(docs): add chatgpt quick link to page actions

* fix(marketplace): align admin api with backend endpoints

* fix(marketplace): isolate api under marketplace prefix

* fix(marketplace): resolve app detail page routing and styling issues

- Fixed JavaScript errors from missing HTML elements (install-code, usage-code, integration-code)
- Added missing CSS classes for tabs, overview layout, sidebar, and integration content
- Fixed tab navigation to display horizontally in single line
- Added proper padding to tab content sections (removed from container, added to content)
- Fixed tab selector from .nav-tab to .tab-btn to match HTML structure
- Added sidebar styling with stats grid and metadata display
- Improved responsive design with mobile-friendly tab scrolling
- Fixed code block positioning for copy buttons
- Removed margin from first headings to prevent extra spacing
- Added null checks for DOM elements in JavaScript to prevent errors

These changes resolve the routing issue where clicking on apps caused page redirects,
and fix the broken layout where CSS was not properly applied to the app detail page.

* fix(marketplace): prevent hero image overflow and secondary card stretching

- Fixed hero image to 200px height with min/max constraints
- Added object-fit: cover to hero-image img elements
- Changed secondary-featured align-items from stretch to flex-start
- Fixed secondary-card height to 118px (no flex: 1 stretching)
- Updated responsive grid layouts for wider screens
- Added flex: 1 to hero-content for better content distribution

These changes ensure a rigid, predictable layout that prevents:
1. Large images from pushing text content down
2. Single secondary cards from stretching to fill entire height

* feat: Add hooks utility for function-based hooks with Docker client integration. ref #1377

   Add hooks_to_string() utility function that converts Python function objects
   to string representations for the Docker API, enabling developers to write hooks
   as regular Python functions instead of strings.

   Core Changes:
   - New hooks_to_string() utility in crawl4ai/utils.py using inspect.getsource()
   - Docker client now accepts both function objects and strings for hooks
   - Automatic detection and conversion in Crawl4aiDockerClient._prepare_request()
   - New hooks and hooks_timeout parameters in client.crawl() method

   Documentation:
   - Docker client examples with function-based hooks (docs/examples/docker_client_hooks_example.py)
   - Updated main Docker deployment guide with comprehensive hooks section
   - Added unit tests for hooks utility (tests/docker/test_hooks_utility.py)

* feat: Add hooks utility for function-based hooks with Docker client integration. ref #1377

   Add hooks_to_string() utility function that converts Python function objects
   to string representations for the Docker API, enabling developers to write hooks
   as regular Python functions instead of strings.

   Core Changes:
   - New hooks_to_string() utility in crawl4ai/utils.py using inspect.getsource()
   - Docker client now accepts both function objects and strings for hooks
   - Automatic detection and conversion in Crawl4aiDockerClient._prepare_request()
   - New hooks and hooks_timeout parameters in client.crawl() method

   Documentation:
   - Docker client examples with function-based hooks (docs/examples/docker_client_hooks_example.py)
   - Updated main Docker deployment guide with comprehensive hooks section
   - Added unit tests for hooks utility (tests/docker/test_hooks_utility.py)

* fix(docs): clarify Docker Hooks System with function-based API in README

* docs: Add demonstration files for v0.7.5 release, showcasing the new Docker Hooks System and all other features.

* docs: Update 0.7.5 video walkthrough

* docs: add complete SDK reference documentation

Add comprehensive single-page SDK reference combining:
- Installation & Setup
- Quick Start
- Core API (AsyncWebCrawler, arun, arun_many, CrawlResult)
- Configuration (BrowserConfig, CrawlerConfig, Parameters)
- Crawling Patterns
- Content Processing (Markdown, Fit Markdown, Selection, Interaction, Link & Media)
- Extraction Strategies (LLM and No-LLM)
- Advanced Features (Session Management, Hooks & Auth)

Generated using scripts/generate_sdk_docs.py in ultra-dense mode
optimized for AI assistant consumption.

Stats: 23K words, 185 code blocks, 220KB

* feat: add AI assistant skill package for Crawl4AI

- Create comprehensive skill package for AI coding assistants
- Include complete SDK reference (23K words, v0.7.4)
- Add three extraction scripts (basic, batch, pipeline)
- Implement version tracking in skill and scripts
- Add prominent download section on homepage
- Place skill in docs/assets for web distribution

The skill enables AI assistants like Claude, Cursor, and Windsurf
to effectively use Crawl4AI with optimized workflows for markdown
generation and data extraction.

* fix: remove non-existent wiki link and clarify skill usage instructions

* fix: update Crawl4AI skill with corrected parameters and examples

- Fixed CrawlerConfig → CrawlerRunConfig throughout
- Fixed parameter names (timeout → page_timeout, store_html removed)
- Fixed schema format (selector → baseSelector)
- Corrected proxy configuration (in BrowserConfig, not CrawlerRunConfig)
- Fixed fit_markdown usage with content filters
- Added comprehensive references to docs/examples/ directory
- Created safe packaging script to avoid root directory pollution
- All scripts tested and verified working

* fix: thoroughly verify and fix all Crawl4AI skill examples

- Cross-checked every section against actual docs
- Fixed BM25ContentFilter parameters (user_query, bm25_threshold)
- Removed incorrect wait_for selector from basic example
- Added comprehensive test suite (4 test files)
- All examples now tested and verified working
- Tests validate: basic crawling, markdown generation, data extraction, advanced patterns
- Package size: 76.6 KB (includes tests for future validation)

* feat(ci): split release pipeline and add Docker caching

- Split release.yml into PyPI/GitHub release and Docker workflows
- Add GitHub Actions cache for Docker builds (10-15x faster rebuilds)
- Implement dual-trigger for docker-release.yml (auto + manual)
- Add comprehensive workflow documentation in .github/workflows/docs/
- Backup original workflow as release.yml.backup

* feat: add webhook notifications for crawl job completion

Implements webhook support for the crawl job API to eliminate polling requirements.

Changes:
- Added WebhookConfig and WebhookPayload schemas to schemas.py
- Created webhook.py with WebhookDeliveryService class
- Integrated webhook notifications in api.py handle_crawl_job
- Updated job.py CrawlJobPayload to accept webhook_config
- Added webhook configuration section to config.yml
- Included comprehensive usage examples in WEBHOOK_EXAMPLES.md

Features:
- Webhook notifications on job completion (success/failure)
- Configurable data inclusion in webhook payload
- Custom webhook headers support
- Global default webhook URL configuration
- Exponential backoff retry logic (5 attempts: 1s, 2s, 4s, 8s, 16s)
- 30-second timeout per webhook call

Usage:
POST /crawl/job with optional webhook_config:
- webhook_url: URL to receive notifications
- webhook_data_in_payload: include full results (default: false)
- webhook_headers: custom headers for authentication

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add webhook documentation to Docker README

Added comprehensive webhook section to README.md including:
- Overview of asynchronous job queue with webhooks
- Benefits and use cases
- Quick start examples
- Webhook authentication
- Global webhook configuration
- Job status polling alternative

Updated table of contents and summary to include webhook feature.
Maintains consistent tone and style with rest of README.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add webhook example for Docker deployment

Added docker_webhook_example.py demonstrating:
- Submitting crawl jobs with webhook configuration
- Flask-based webhook receiver implementation
- Three usage patterns:
  1. Webhook notification only (fetch data separately)
  2. Webhook with full data in payload
  3. Traditional polling approach for comparison

Includes comprehensive comments explaining:
- Webhook payload structure
- Authentication headers setup
- Error handling
- Production deployment tips

Example is fully functional and ready to run with Flask installed.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add webhook implementation validation tests

Added comprehensive test suite to validate webhook implementation:
- Module import verification
- WebhookDeliveryService initialization
- Pydantic model validation (WebhookConfig)
- Payload construction logic
- Exponential backoff calculation
- API integration checks

All tests pass (6/6), confirming implementation is correct.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add comprehensive webhook feature test script

Added end-to-end test script that automates webhook feature testing:

Script Features (test_webhook_feature.sh):
- Automatic branch switching and dependency installation
- Redis and server startup/shutdown management
- Webhook receiver implementation
- Integration test for webhook notifications
- Comprehensive cleanup and error handling
- Returns to original branch after completion

Test Flow:
1. Fetch and checkout webhook feature branch
2. Activate venv and install dependencies
3. Start Redis and Crawl4AI server
4. Submit crawl job with webhook config
5. Verify webhook delivery and payload
6. Clean up all processes and return to original branch

Documentation:
- WEBHOOK_TEST_README.md with usage instructions
- Troubleshooting guide
- Exit codes and safety features

Usage: ./tests/test_webhook_feature.sh

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: properly serialize Pydantic HttpUrl in webhook config

Use model_dump(mode='json') instead of deprecated dict() method to ensure
Pydantic special types (HttpUrl, UUID, etc.) are properly serialized to
JSON-compatible native Python types.

This fixes webhook delivery failures caused by HttpUrl objects remaining
as Pydantic types in the webhook_config dict, which caused JSON
serialization errors and httpx request failures.

Also update mcp requirement to >=1.18.0 for compatibility.

* feat: add webhook support for /llm/job endpoint

Add comprehensive webhook notification support for the /llm/job endpoint,
following the same pattern as the existing /crawl/job implementation.

Changes:
- Add webhook_config field to LlmJobPayload model (job.py)
- Implement webhook notifications in process_llm_extraction() with 4
  notification points: success, provider validation failure, extraction
  failure, and general exceptions (api.py)
- Store webhook_config in Redis task data for job tracking
- Initialize WebhookDeliveryService with exponential backoff retry logic
Documentation:
- Add Example 6 to WEBHOOK_EXAMPLES.md showing LLM extraction with webhooks
- Update Flask webhook handler to support both crawl and llm_extraction tasks
- Add TypeScript client examples for LLM jobs
- Add comprehensive examples to docker_webhook_example.py with schema support
- Clarify data structure differences between webhook and API responses

Testing:
- Add test_llm_webhook_feature.py with 7 validation tests (all passing)
- Verify pattern consistency with /crawl/job implementation
- Add implementation guide (WEBHOOK_LLM_JOB_IMPLEMENTATION.md)

* fix: remove duplicate comma in webhook_config parameter

* fix: update Crawl4AI Docker container port from 11234 to 11235

* Release v0.7.6: The 0.7.6 Update

- Updated version to 0.7.6
- Added comprehensive demo and release notes
- Updated all documentation
- Update the veriosn in Dockerfile to 0.7.6

---------

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
Co-authored-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
Co-authored-by: Nezar Ali <abu5sohaib@gmail.com>
Co-authored-by: Soham Kukreti <kukretisoham@gmail.com>
Co-authored-by: James T. Wood <jamesthomaswood@gmail.com>
Co-authored-by: AHMET YILMAZ <tawfik@kidocode.com>
Co-authored-by: nafeqq-1306 <nafiquee@yahoo.com>
Co-authored-by: unclecode <unclecode@kidocode.com>
Co-authored-by: Martin Sjöborg <martin.sjoborg@quartr.se>
Co-authored-by: Martin Sjöborg <martin@sjoborg.org>
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
2025-10-22 20:41:06 +08:00

771 lines
30 KiB
Python

from typing import Dict, Optional, List, Tuple, Union
from .async_configs import CrawlerRunConfig
from .models import (
CrawlResult,
CrawlerTaskResult,
CrawlStatus,
DomainState,
)
from .components.crawler_monitor import CrawlerMonitor
from .types import AsyncWebCrawler
from collections.abc import AsyncGenerator
import time
import psutil
import asyncio
import uuid
from urllib.parse import urlparse
import random
from abc import ABC, abstractmethod
from .utils import get_true_memory_usage_percent
class RateLimiter:
def __init__(
self,
base_delay: Tuple[float, float] = (1.0, 3.0),
max_delay: float = 60.0,
max_retries: int = 3,
rate_limit_codes: List[int] = None,
):
self.base_delay = base_delay
self.max_delay = max_delay
self.max_retries = max_retries
self.rate_limit_codes = rate_limit_codes or [429, 503]
self.domains: Dict[str, DomainState] = {}
def get_domain(self, url: str) -> str:
return urlparse(url).netloc
async def wait_if_needed(self, url: str) -> None:
domain = self.get_domain(url)
state = self.domains.get(domain)
if not state:
self.domains[domain] = DomainState()
state = self.domains[domain]
now = time.time()
if state.last_request_time:
wait_time = max(0, state.current_delay - (now - state.last_request_time))
if wait_time > 0:
await asyncio.sleep(wait_time)
# Random delay within base range if no current delay
if state.current_delay == 0:
state.current_delay = random.uniform(*self.base_delay)
state.last_request_time = time.time()
def update_delay(self, url: str, status_code: int) -> bool:
domain = self.get_domain(url)
state = self.domains[domain]
if status_code in self.rate_limit_codes:
state.fail_count += 1
if state.fail_count > self.max_retries:
return False
# Exponential backoff with random jitter
state.current_delay = min(
state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay
)
else:
# Gradually reduce delay on success
state.current_delay = max(
random.uniform(*self.base_delay), state.current_delay * 0.75
)
state.fail_count = 0
return True
class BaseDispatcher(ABC):
def __init__(
self,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
self.crawler = None
self._domain_last_hit: Dict[str, float] = {}
self.concurrent_sessions = 0
self.rate_limiter = rate_limiter
self.monitor = monitor
def select_config(self, url: str, configs: Union[CrawlerRunConfig, List[CrawlerRunConfig]]) -> Optional[CrawlerRunConfig]:
"""Select the appropriate config for a given URL.
Args:
url: The URL to match against
configs: Single config or list of configs to choose from
Returns:
The matching config, or None if no match found
"""
# Single config - return as is
if isinstance(configs, CrawlerRunConfig):
return configs
# Empty list - return None
if not configs:
return None
# Find first matching config
for config in configs:
if config.is_match(url):
return config
# No match found - return None to indicate URL should be skipped
return None
@abstractmethod
async def crawl_url(
self,
url: str,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
monitor: Optional[CrawlerMonitor] = None,
) -> CrawlerTaskResult:
pass
@abstractmethod
async def run_urls(
self,
urls: List[str],
crawler: AsyncWebCrawler, # noqa: F821
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
monitor: Optional[CrawlerMonitor] = None,
) -> List[CrawlerTaskResult]:
pass
class MemoryAdaptiveDispatcher(BaseDispatcher):
def __init__(
self,
memory_threshold_percent: float = 90.0,
critical_threshold_percent: float = 95.0, # New critical threshold
recovery_threshold_percent: float = 85.0, # New recovery threshold
check_interval: float = 1.0,
max_session_permit: int = 20,
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
memory_wait_timeout: Optional[float] = 600.0,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
super().__init__(rate_limiter, monitor)
self.memory_threshold_percent = memory_threshold_percent
self.critical_threshold_percent = critical_threshold_percent
self.recovery_threshold_percent = recovery_threshold_percent
self.check_interval = check_interval
self.max_session_permit = max_session_permit
self.fairness_timeout = fairness_timeout
self.memory_wait_timeout = memory_wait_timeout
self.result_queue = asyncio.Queue()
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
self.current_memory_percent = 0.0 # Track current memory usage
self._high_memory_start_time: Optional[float] = None
async def _memory_monitor_task(self):
"""Background task to continuously monitor memory usage and update state"""
while True:
self.current_memory_percent = get_true_memory_usage_percent()
# Enter memory pressure mode if we cross the threshold
if self.current_memory_percent >= self.memory_threshold_percent:
if not self.memory_pressure_mode:
self.memory_pressure_mode = True
self._high_memory_start_time = time.time()
if self.monitor:
self.monitor.update_memory_status("PRESSURE")
else:
if self._high_memory_start_time is None:
self._high_memory_start_time = time.time()
if (
self.memory_wait_timeout is not None
and self._high_memory_start_time is not None
and time.time() - self._high_memory_start_time >= self.memory_wait_timeout
):
raise MemoryError(
"Memory usage exceeded threshold for"
f" {self.memory_wait_timeout} seconds"
)
# Exit memory pressure mode if we go below recovery threshold
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
self.memory_pressure_mode = False
self._high_memory_start_time = None
if self.monitor:
self.monitor.update_memory_status("NORMAL")
elif self.current_memory_percent < self.memory_threshold_percent:
self._high_memory_start_time = None
# In critical mode, we might need to take more drastic action
if self.current_memory_percent >= self.critical_threshold_percent:
if self.monitor:
self.monitor.update_memory_status("CRITICAL")
# We could implement additional memory-saving measures here
await asyncio.sleep(self.check_interval)
def _get_priority_score(self, wait_time: float, retry_count: int) -> float:
"""Calculate priority score (lower is higher priority)
- URLs waiting longer than fairness_timeout get higher priority
- More retry attempts decreases priority
"""
if wait_time > self.fairness_timeout:
# High priority for long-waiting URLs
return -wait_time
# Standard priority based on retries
return retry_count
async def crawl_url(
self,
url: str,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
retry_count: int = 0,
) -> CrawlerTaskResult:
start_time = time.time()
error_message = ""
memory_usage = peak_memory = 0.0
# Select appropriate config for this URL
selected_config = self.select_config(url, config)
# If no config matches, return failed result
if selected_config is None:
error_message = f"No matching configuration found for URL: {url}"
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.FAILED,
error_message=error_message
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=CrawlResult(
url=url,
html="",
metadata={"status": "no_config_match"},
success=False,
error_message=error_message
),
memory_usage=0,
peak_memory=0,
start_time=start_time,
end_time=time.time(),
error_message=error_message,
retry_count=retry_count
)
# Get starting memory for accurate measurement
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
try:
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.IN_PROGRESS,
start_time=start_time,
retry_count=retry_count
)
self.concurrent_sessions += 1
if self.rate_limiter:
await self.rate_limiter.wait_if_needed(url)
# Check if we're in critical memory state
if self.current_memory_percent >= self.critical_threshold_percent:
# Requeue this task with increased priority and retry count
enqueue_time = time.time()
priority = self._get_priority_score(enqueue_time - start_time, retry_count + 1)
await self.task_queue.put((priority, (url, task_id, retry_count + 1, enqueue_time)))
# Update monitoring
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.QUEUED,
error_message="Requeued due to critical memory pressure"
)
# Return placeholder result with requeued status
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=CrawlResult(
url=url, html="", metadata={"status": "requeued"},
success=False, error_message="Requeued due to critical memory pressure"
),
memory_usage=0,
peak_memory=0,
start_time=start_time,
end_time=time.time(),
error_message="Requeued due to critical memory pressure",
retry_count=retry_count + 1
)
# Execute the crawl with selected config
result = await self.crawler.arun(url, config=selected_config, session_id=task_id)
# Measure memory usage
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
# Handle rate limiting
if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
# Update status based on result
if not result.success:
error_message = result.error_message
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
elif self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
except Exception as e:
error_message = str(e)
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
result = CrawlResult(
url=url, html="", metadata={}, success=False, error_message=str(e)
)
finally:
end_time = time.time()
if self.monitor:
self.monitor.update_task(
task_id,
end_time=end_time,
memory_usage=memory_usage,
peak_memory=peak_memory,
error_message=error_message,
retry_count=retry_count
)
self.concurrent_sessions -= 1
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=end_time,
error_message=error_message,
retry_count=retry_count
)
async def run_urls(
self,
urls: List[str],
crawler: AsyncWebCrawler,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> List[CrawlerTaskResult]:
self.crawler = crawler
# Start the memory monitor task
memory_monitor = asyncio.create_task(self._memory_monitor_task())
if self.monitor:
self.monitor.start()
results = []
try:
# Initialize task queue
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
# Add to queue with initial priority 0, retry count 0, and current time
await self.task_queue.put((0, (url, task_id, 0, time.time())))
active_tasks = []
# Process until both queues are empty
while not self.task_queue.empty() or active_tasks:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode:
slots = self.max_session_permit - len(active_tasks)
while slots > 0:
try:
# Use get_nowait() to immediately get tasks without blocking
priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Wait for completion even if queue is starved
if active_tasks:
done, pending = await asyncio.wait(
active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
)
# Process completed tasks
for completed_task in done:
result = await completed_task
results.append(result)
# Update active tasks list
active_tasks = list(pending)
else:
# If no active tasks but still waiting, sleep briefly
await asyncio.sleep(self.check_interval / 2)
# Update priorities for waiting tasks if needed
await self._update_queue_priorities()
except Exception as e:
if self.monitor:
self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}")
finally:
# Clean up
memory_monitor.cancel()
if self.monitor:
self.monitor.stop()
return results
async def _update_queue_priorities(self):
"""Periodically update priorities of items in the queue to prevent starvation"""
# Skip if queue is empty
if self.task_queue.empty():
return
# Use a drain-and-refill approach to update all priorities
temp_items = []
# Drain the queue (with a safety timeout to prevent blocking)
try:
drain_start = time.time()
while not self.task_queue.empty() and time.time() - drain_start < 5.0: # 5 second safety timeout
try:
# Get item from queue with timeout
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
self.task_queue.get(), timeout=0.1
)
# Calculate new priority based on current wait time
current_time = time.time()
wait_time = current_time - enqueue_time
new_priority = self._get_priority_score(wait_time, retry_count)
# Store with updated priority
temp_items.append((new_priority, (url, task_id, retry_count, enqueue_time)))
# Update monitoring stats for this task
if self.monitor and task_id in self.monitor.stats:
self.monitor.update_task(task_id, wait_time=wait_time)
except asyncio.TimeoutError:
# Queue might be empty or very slow
break
except Exception as e:
# If anything goes wrong, make sure we refill the queue with what we've got
self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}")
# Calculate queue statistics
if temp_items and self.monitor:
total_queued = len(temp_items)
wait_times = [item[1][3] for item in temp_items]
highest_wait_time = time.time() - min(wait_times) if wait_times else 0
avg_wait_time = sum(time.time() - t for t in wait_times) / len(wait_times) if wait_times else 0
# Update queue statistics in monitor
self.monitor.update_queue_statistics(
total_queued=total_queued,
highest_wait_time=highest_wait_time,
avg_wait_time=avg_wait_time
)
# Sort by priority (lowest number = highest priority)
temp_items.sort(key=lambda x: x[0])
# Refill the queue with updated priorities
for item in temp_items:
await self.task_queue.put(item)
async def run_urls_stream(
self,
urls: List[str],
crawler: AsyncWebCrawler,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> AsyncGenerator[CrawlerTaskResult, None]:
self.crawler = crawler
# Start the memory monitor task
memory_monitor = asyncio.create_task(self._memory_monitor_task())
if self.monitor:
self.monitor.start()
try:
# Initialize task queue
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
# Add to queue with initial priority 0, retry count 0, and current time
await self.task_queue.put((0, (url, task_id, 0, time.time())))
active_tasks = []
completed_count = 0
total_urls = len(urls)
while completed_count < total_urls:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode:
slots = self.max_session_permit - len(active_tasks)
while slots > 0:
try:
# Use get_nowait() to immediately get tasks without blocking
priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Process completed tasks and yield results
if active_tasks:
done, pending = await asyncio.wait(
active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
)
for completed_task in done:
result = await completed_task
# Only count as completed if it wasn't requeued
if "requeued" not in result.error_message:
completed_count += 1
yield result
# Update active tasks list
active_tasks = list(pending)
else:
# If no active tasks but still waiting, sleep briefly
await asyncio.sleep(self.check_interval / 2)
# Update priorities for waiting tasks if needed
await self._update_queue_priorities()
finally:
# Clean up
memory_monitor.cancel()
if self.monitor:
self.monitor.stop()
class SemaphoreDispatcher(BaseDispatcher):
def __init__(
self,
semaphore_count: int = 5,
max_session_permit: int = 20,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
super().__init__(rate_limiter, monitor)
self.semaphore_count = semaphore_count
self.max_session_permit = max_session_permit
async def crawl_url(
self,
url: str,
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
task_id: str,
semaphore: asyncio.Semaphore = None,
) -> CrawlerTaskResult:
start_time = time.time()
error_message = ""
memory_usage = peak_memory = 0.0
# Select appropriate config for this URL
selected_config = self.select_config(url, config)
# If no config matches, return failed result
if selected_config is None:
error_message = f"No matching configuration found for URL: {url}"
if self.monitor:
self.monitor.update_task(
task_id,
status=CrawlStatus.FAILED,
error_message=error_message
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=CrawlResult(
url=url,
html="",
metadata={"status": "no_config_match"},
success=False,
error_message=error_message
),
memory_usage=0,
peak_memory=0,
start_time=start_time,
end_time=time.time(),
error_message=error_message
)
try:
if self.monitor:
self.monitor.update_task(
task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time
)
if self.rate_limiter:
await self.rate_limiter.wait_if_needed(url)
async with semaphore:
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
result = await self.crawler.arun(url, config=selected_config, session_id=task_id)
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=time.time(),
error_message=error_message,
)
if not result.success:
error_message = result.error_message
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
elif self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
except Exception as e:
error_message = str(e)
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
result = CrawlResult(
url=url, html="", metadata={}, success=False, error_message=str(e)
)
finally:
end_time = time.time()
if self.monitor:
self.monitor.update_task(
task_id,
end_time=end_time,
memory_usage=memory_usage,
peak_memory=peak_memory,
error_message=error_message,
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=end_time,
error_message=error_message,
)
async def run_urls(
self,
crawler: AsyncWebCrawler, # noqa: F821
urls: List[str],
config: Union[CrawlerRunConfig, List[CrawlerRunConfig]],
) -> List[CrawlerTaskResult]:
self.crawler = crawler
if self.monitor:
self.monitor.start()
try:
semaphore = asyncio.Semaphore(self.semaphore_count)
tasks = []
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
task = asyncio.create_task(
self.crawl_url(url, config, task_id, semaphore)
)
tasks.append(task)
return await asyncio.gather(*tasks, return_exceptions=True)
finally:
if self.monitor:
self.monitor.stop()