Compare commits

..

1 Commits

Author SHA1 Message Date
Soham Kukreti
6eb3baed50 feat: Add ConfigHealthMonitor for automated crawler configuration health monitoring
Implement a comprehensive health monitoring system for crawler configurations
that automatically detects failures and applies resolution strategies.

Features

- **Continuous Health Monitoring**: Periodic health checks for multiple crawler
  configurations with configurable check intervals
- **Automatic Failure Detection**: Detects failures based on HTTP status codes,
  empty HTML responses, and logger errors
- **Resolution Strategies**: Built-in and custom resolution strategies that
  automatically attempt to fix failing configurations
- **Resolution Chains**: Support for sequential resolution strategies that
  validate each step before proceeding
- **Metrics Collection**: Comprehensive metrics tracking including success rates,
  response times, resolution attempts, and uptime statistics
- **Graceful Shutdown**: Robust cleanup mechanism that waits for active health
  checks to complete before shutting down
- **Error Tracking**: Integrated logger error tracking to detect non-critical
  errors that don't fail HTTP requests but indicate issues

Implementation Details

- New module `crawl4ai/config_health_monitor.py` containing:
  - `ConfigHealthMonitor`: Main monitoring class
  - `ConfigHealthState`: Health state tracking dataclass
  - `ResolutionResult`: Resolution strategy result dataclass
  - `ResolutionStrategy`: Type alias for resolution callables
  - `_ErrorTrackingLogger`: Proxy logger for error event tracking

- Key capabilities:
  - Register/unregister configurations for monitoring
  - Manual and automatic health checks
  - Config-specific or global resolution strategies
  - Thread-safe state management with asyncio locks
  - Per-config and global metrics reporting
  - Context manager support for automatic cleanup

Testing

- Comprehensive test suite in `tests/general/test_config_health_monitor.py`:
  - Basic functionality tests (initialization, registration)
  - Lifecycle management tests (start/stop, context manager)
  - Health checking tests (success/failure scenarios)
  - Resolution strategy tests
  - Metrics and status query tests
  - Property validation tests

Examples

- Example usage in `docs/examples/config_health_monitor_example.py`:
  - Demonstrates monitor initialization and configuration
  - Shows custom resolution strategies (incremental backoff, magic mode toggle)
  - Implements resolution chains with validation
  - Displays metrics reporting and status monitoring
  - Includes context manager usage pattern

Technical Notes

- Uses `copy.deepcopy()` for safe configuration mutation
- Implements `_ErrorTrackingLogger` to capture logger errors during health checks
- Tracks active health check tasks for graceful shutdown
- Uses `CacheMode.BYPASS` for health check configs to ensure fresh data
- Minimum check interval enforced at 10 seconds

This feature enables production-grade monitoring of crawler configurations,
automatically detecting and resolving issues before they impact crawling
operations.
2025-11-25 23:49:15 +05:30
12 changed files with 1920 additions and 84 deletions

View File

@@ -1792,10 +1792,7 @@ class LLMConfig:
frequency_penalty: Optional[float] = None,
presence_penalty: Optional[float] = None,
stop: Optional[List[str]] = None,
n: Optional[int] = None,
backoff_base_delay: Optional[int] = None,
backoff_max_attempts: Optional[int] = None,
backoff_exponential_factor: Optional[int] = None,
n: Optional[int] = None,
):
"""Configuaration class for LLM provider and API token."""
self.provider = provider
@@ -1824,9 +1821,6 @@ class LLMConfig:
self.presence_penalty = presence_penalty
self.stop = stop
self.n = n
self.backoff_base_delay = backoff_base_delay if backoff_base_delay is not None else 2
self.backoff_max_attempts = backoff_max_attempts if backoff_max_attempts is not None else 3
self.backoff_exponential_factor = backoff_exponential_factor if backoff_exponential_factor is not None else 2
@staticmethod
def from_kwargs(kwargs: dict) -> "LLMConfig":
@@ -1840,10 +1834,7 @@ class LLMConfig:
frequency_penalty=kwargs.get("frequency_penalty"),
presence_penalty=kwargs.get("presence_penalty"),
stop=kwargs.get("stop"),
n=kwargs.get("n"),
backoff_base_delay=kwargs.get("backoff_base_delay"),
backoff_max_attempts=kwargs.get("backoff_max_attempts"),
backoff_exponential_factor=kwargs.get("backoff_exponential_factor")
n=kwargs.get("n")
)
def to_dict(self):
@@ -1857,10 +1848,7 @@ class LLMConfig:
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"stop": self.stop,
"n": self.n,
"backoff_base_delay": self.backoff_base_delay,
"backoff_max_attempts": self.backoff_max_attempts,
"backoff_exponential_factor": self.backoff_exponential_factor
"n": self.n
}
def clone(self, **kwargs):

File diff suppressed because it is too large Load Diff

View File

@@ -980,9 +980,6 @@ class LLMContentFilter(RelevantContentFilter):
prompt,
api_token,
base_url=base_url,
base_delay=self.llm_config.backoff_base_delay,
max_attempts=self.llm_config.backoff_max_attempts,
exponential_factor=self.llm_config.backoff_exponential_factor,
extra_args=extra_args,
)

View File

@@ -649,9 +649,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
base_url=self.llm_config.base_url,
json_response=self.force_json_response,
extra_args=self.extra_args,
base_delay=self.llm_config.backoff_base_delay,
max_attempts=self.llm_config.backoff_max_attempts,
exponential_factor=self.llm_config.backoff_exponential_factor
) # , json_response=self.extract_type == "schema")
# Track usage
usage = TokenUsage(
@@ -849,9 +846,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
base_url=self.llm_config.base_url,
json_response=self.force_json_response,
extra_args=self.extra_args,
base_delay=self.llm_config.backoff_base_delay,
max_attempts=self.llm_config.backoff_max_attempts,
exponential_factor=self.llm_config.backoff_exponential_factor
)
# Track usage
usage = TokenUsage(

View File

@@ -795,9 +795,6 @@ Return only a JSON array of extracted tables following the specified format."""
api_token=self.llm_config.api_token,
base_url=self.llm_config.base_url,
json_response=True,
base_delay=self.llm_config.backoff_base_delay,
max_attempts=self.llm_config.backoff_max_attempts,
exponential_factor=self.llm_config.backoff_exponential_factor,
extra_args=self.extra_args
)
@@ -1119,9 +1116,6 @@ Return only a JSON array of extracted tables following the specified format."""
api_token=self.llm_config.api_token,
base_url=self.llm_config.base_url,
json_response=True,
base_delay=self.llm_config.backoff_base_delay,
max_attempts=self.llm_config.backoff_max_attempts,
exponential_factor=self.llm_config.backoff_exponential_factor,
extra_args=self.extra_args
)

View File

@@ -1745,9 +1745,6 @@ def perform_completion_with_backoff(
api_token,
json_response=False,
base_url=None,
base_delay=2,
max_attempts=3,
exponential_factor=2,
**kwargs,
):
"""
@@ -1764,9 +1761,6 @@ def perform_completion_with_backoff(
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
base_delay (int): The base delay in seconds. Defaults to 2.
max_attempts (int): The maximum number of attempts. Defaults to 3.
exponential_factor (int): The exponential factor. Defaults to 2.
**kwargs: Additional arguments for the API request.
Returns:
@@ -1776,6 +1770,9 @@ def perform_completion_with_backoff(
from litellm import completion
from litellm.exceptions import RateLimitError
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
@@ -1801,7 +1798,7 @@ def perform_completion_with_backoff(
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (exponential_factor**attempt) # Exponential backoff formula
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
time.sleep(delay)
else:
@@ -1834,9 +1831,6 @@ async def aperform_completion_with_backoff(
api_token,
json_response=False,
base_url=None,
base_delay=2,
max_attempts=3,
exponential_factor=2,
**kwargs,
):
"""
@@ -1853,9 +1847,6 @@ async def aperform_completion_with_backoff(
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
base_delay (int): The base delay in seconds. Defaults to 2.
max_attempts (int): The maximum number of attempts. Defaults to 3.
exponential_factor (int): The exponential factor. Defaults to 2.
**kwargs: Additional arguments for the API request.
Returns:
@@ -1866,6 +1857,9 @@ async def aperform_completion_with_backoff(
from litellm.exceptions import RateLimitError
import asyncio
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
@@ -1891,7 +1885,7 @@ async def aperform_completion_with_backoff(
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (exponential_factor**attempt) # Exponential backoff formula
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
await asyncio.sleep(delay)
else:

View File

@@ -108,10 +108,7 @@ async def handle_llm_qa(
prompt_with_variables=prompt,
api_token=get_llm_api_key(config), # Returns None to let litellm handle it
temperature=get_llm_temperature(config),
base_url=get_llm_base_url(config),
base_delay=config["llm"].get("backoff_base_delay", 2),
max_attempts=config["llm"].get("backoff_max_attempts", 3),
exponential_factor=config["llm"].get("backoff_exponential_factor", 2)
base_url=get_llm_base_url(config)
)
return response.choices[0].message.content

View File

@@ -0,0 +1,378 @@
"""
Example: Using ConfigHealthMonitor for Crawler Configuration Health Monitoring
This example demonstrates how to:
1. Initialize a ConfigHealthMonitor
2. Register multiple crawler configurations
3. Set up custom resolution strategies
4. Monitor health status and metrics
5. Handle configuration failures automatically
"""
import asyncio
import copy
from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_configs import CacheMode
# ============================================================================
# Custom Resolution Strategies
# ============================================================================
async def incremental_backoff_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Increase timeouts progressively when health checks fail.
"""
print(f" Applying incremental backoff for '{state.config_id}'...")
new_config = copy.deepcopy(state.config)
# Increase timeouts by 100%
new_config.page_timeout = int(state.config.page_timeout * 2)
if state.config.delay_before_return_html:
new_config.delay_before_return_html = state.config.delay_before_return_html + 2.0
print(f" -> Increased page_timeout to {new_config.page_timeout}ms")
return ResolutionResult(
success=True,
action="timeout_increased",
modified_config=new_config,
metadata={
"old_timeout": state.config.page_timeout,
"new_timeout": new_config.page_timeout
}
)
async def toggle_magic_mode_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Enable/disable magic mode for anti-bot handling.
"""
print(f" Toggling magic mode for '{state.config_id}'...")
new_config = copy.deepcopy(state.config)
new_config.magic = not state.config.magic
action = f"magic_{'enabled' if new_config.magic else 'disabled'}"
print(f" -> Magic mode now: {new_config.magic}")
return ResolutionResult(
success=True,
action=action,
modified_config=new_config
)
async def log_and_alert_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Log failure and send alert (in production, this would send to monitoring system).
"""
print(f" ALERT: Config '{state.config_id}' has failed!")
print(f" → Error: {state.last_error}")
print(f" → Consecutive failures: {state.consecutive_failures}")
print(f" → Resolution attempts: {state.resolution_attempts}")
# In production, send to Slack, email, PagerDuty, etc.
# await send_slack_alert(state)
# await send_email_alert(state)
return ResolutionResult(
success=False,
action="alerted",
metadata={"alert_sent": True}
)
def create_resolution_chain(strategies):
"""
Create a resolution chain that tries strategies sequentially.
After each successful strategy we immediately run a health check. If the
check still fails, we continue to the next strategy until one succeeds or
we exhaust the chain.
"""
async def chained_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
if not strategies:
return ResolutionResult(success=False, action="no_strategies_configured")
print(f"\nStarting resolution chain for '{state.config_id}'")
steps_metadata = []
for i, strategy in enumerate(strategies, 1):
print(f"\n Step {i}/{len(strategies)}: {strategy.__name__}")
result = await strategy(state, monitor)
steps_metadata.append({
"step": i,
"strategy": strategy.__name__,
"success": result.success,
"action": result.action,
"metadata": result.metadata
})
if result.success:
action_label = result.action or strategy.__name__
print(f" Resolution applied: {action_label}")
if result.modified_config:
state.config = result.modified_config
print(" Running validation health check...")
try:
validation_passed = await monitor._perform_health_check(state)
except Exception as exc:
print(f" Validation error: {exc}")
validation_passed = False
steps_metadata[-1]["validation_passed"] = validation_passed
if validation_passed:
print(" Validation succeeded. Resolution chain complete.")
return ResolutionResult(
success=True,
action=action_label,
modified_config=state.config,
metadata={"steps": steps_metadata}
)
print(" Validation failed. Trying next strategy...")
else:
print(f" Resolution failed: {result.action}")
print(f"\n All resolution strategies failed")
return ResolutionResult(
success=False,
action="all_strategies_failed",
metadata={"steps": steps_metadata}
)
return chained_strategy
# ============================================================================
# Main Example
# ============================================================================
async def main():
print("=" * 70)
print("ConfigHealthMonitor Example")
print("=" * 70)
# Initialize monitor
print("\nInitializing ConfigHealthMonitor...")
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(
headless=True,
verbose=False
),
check_interval=15.0, # Check every 15 seconds
failure_threshold=2, # Trigger resolution after 2 failures
resolution_retry_limit=2, # Try resolution twice max
enable_metrics=True
)
await monitor.start()
print(f" Monitor started (check_interval={monitor.check_interval}s)")
# ========================================================================
# Register Configurations
# ========================================================================
print("\nRegistering configurations...")
# Config 1: Reliable website (should stay healthy)
config_1_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=30000,
cache_mode=CacheMode.BYPASS,
magic=True,
),
test_url="https://www.olly.com/",
config_id="olly_scraper",
resolution_strategy=create_resolution_chain([
toggle_magic_mode_strategy,
])
)
print(f" Registered: {config_1_id} with resolution chain")
# Config 2: Another reliable website
config_2_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=20000,
magic=True,
),
test_url="https://example.com",
config_id="example_scraper"
)
print(f" Registered: {config_2_id}")
# Config 3: Intentionally problematic (very short timeout)
# This will trigger resolution strategies
config_3_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=100, # 100ms - will likely timeout
cache_mode=CacheMode.BYPASS,
),
test_url="https://httpbin.org/delay/5", # Delays response by 5 seconds
config_id="impossible_scraper",
resolution_strategy=create_resolution_chain([
incremental_backoff_strategy,
toggle_magic_mode_strategy,
log_and_alert_strategy
])
)
print(f" Registered: {config_3_id} (with resolution chain)")
print(f"\n Total configs registered: {monitor.registered_count}")
# ========================================================================
# Perform Manual Health Checks
# ========================================================================
print("\nPerforming initial health checks...")
for config_id in [config_1_id, config_2_id, config_3_id]:
is_healthy = await monitor.check_health(config_id)
status = monitor.get_health_status(config_id)
status_label = "healthy" if is_healthy else "unhealthy"
print(f" {config_id}: {status.status} ({status_label})")
if not is_healthy:
print(f" Error: {status.last_error}")
# ========================================================================
# Monitor for a Period
# ========================================================================
print("\nMonitoring for 60 seconds (background loop running)...")
print(" The monitor will automatically check all configs every 15s")
print(" and apply resolution strategies when failures are detected.\n")
# Check status every 20 seconds
for i in range(3):
await asyncio.sleep(20)
print(f"\nStatus Check #{i+1}")
print("-" * 70)
all_statuses = monitor.get_health_status()
for config_id, state in all_statuses.items():
# Status emoji
print(f"\n{config_id}")
print(f" Status: {state.status}")
print(f" Consecutive failures: {state.consecutive_failures}")
print(f" Consecutive successes: {state.consecutive_successes}")
print(f" Resolution attempts: {state.resolution_attempts}")
if state.last_check_time:
print(f" Last checked: {state.last_check_time.strftime('%H:%M:%S')}")
if state.last_success_time:
print(f" Last success: {state.last_success_time.strftime('%H:%M:%S')}")
if state.last_error:
print(f" Last error: {state.last_error[:100]}...")
# ========================================================================
# Final Metrics Report
# ========================================================================
print("\n" + "=" * 70)
print("Final Metrics Report")
print("=" * 70)
metrics = monitor.get_metrics()
# Global metrics
print("\nGlobal Metrics:")
print(f" Total checks: {metrics['total_checks']}")
print(f" Successful checks: {metrics['successful_checks']}")
print(f" Failed checks: {metrics['failed_checks']}")
print(f" Success rate: {metrics['success_rate']:.1%}")
print(f" Total resolutions: {metrics['total_resolutions']}")
print(f" Successful resolutions: {metrics['successful_resolutions']}")
if metrics['total_resolutions'] > 0:
print(f" Resolution success rate: {metrics['resolution_success_rate']:.1%}")
print(f" Uptime: {metrics['uptime_seconds']:.1f}s")
# Per-config metrics
print("\nPer-Config Metrics:")
for config_id, config_metrics in metrics['configs'].items():
print(f"\n {config_id}:")
print(f" Status: {config_metrics['status']}")
print(f" Uptime: {config_metrics['uptime_percent']:.1f}%")
print(f" Avg response time: {config_metrics['avg_response_time']:.3f}s")
print(f" Total checks: {config_metrics['total_checks']}")
print(f" Successful: {config_metrics['successful_checks']}")
print(f" Failed: {config_metrics['failed_checks']}")
print(f" Resolution attempts: {config_metrics['resolution_attempts']}")
# ========================================================================
# Cleanup
# ========================================================================
print("\nStopping monitor...")
await monitor.stop()
print(" Monitor stopped successfully")
print("\n" + "=" * 70)
print("Example completed!")
print("=" * 70)
# ============================================================================
# Alternative: Using Context Manager
# ============================================================================
async def example_with_context_manager():
"""
Simplified example using context manager for automatic cleanup.
"""
print("\nExample: Using Context Manager\n")
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False),
check_interval=30.0,
failure_threshold=3
) as monitor:
# Register configs
monitor.register_config(
config=CrawlerRunConfig(page_timeout=30000),
test_url="https://httpbin.org/html",
config_id="example"
)
# Monitor automatically runs in background
print("Monitor running...")
await asyncio.sleep(10)
# Get status
status = monitor.get_health_status("example")
print(f"Status: {status.status}")
# Context manager automatically stops on exit
print("Monitor automatically stopped")
if __name__ == "__main__":
# Run main example
asyncio.run(main())
# Uncomment to run context manager example
# asyncio.run(example_with_context_manager())

View File

@@ -439,19 +439,10 @@ LLMConfig is useful to pass LLM provider config to strategies and functions that
| **`provider`** | `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)* | Which LLM provider to use.
| **`api_token`** |1.Optional. When not provided explicitly, api_token will be read from environment variables based on provider. For example: If a gemini model is passed as provider then,`"GEMINI_API_KEY"` will be read from environment variables <br/> 2. API token of LLM provider <br/> eg: `api_token = "gsk_1ClHGGJ7Lpn4WGybR7vNWGdyb3FY7zXEw3SCiy0BAVM9lL8CQv"` <br/> 3. Environment variable - use with prefix "env:" <br/> eg:`api_token = "env: GROQ_API_KEY"` | API token to use for the given provider
| **`base_url`** |Optional. Custom API endpoint | If your provider has a custom endpoint
| **`backoff_base_delay`** |Optional. `int` *(default: `2`)* | Seconds to wait before the first retry when the provider throttles a request.
| **`backoff_max_attempts`** |Optional. `int` *(default: `3`)* | Total tries (initial call + retries) before surfacing an error.
| **`backoff_exponential_factor`** |Optional. `int` *(default: `2`)* | Multiplier that increases the wait time for each retry (`delay = base_delay * factor^attempt`).
## 3.2 Example Usage
```python
llm_config = LLMConfig(
provider="openai/gpt-4o-mini",
api_token=os.getenv("OPENAI_API_KEY"),
backoff_base_delay=1, # optional
backoff_max_attempts=5, # optional
backoff_exponential_factor=3, # optional
)
llm_config = LLMConfig(provider="openai/gpt-4o-mini", api_token=os.getenv("OPENAI_API_KEY"))
```
## 4. Putting It All Together

View File

@@ -1593,20 +1593,8 @@ The `clone()` method:
- Environment variable - use with prefix "env:" <br/> eg:`api_token = "env: GROQ_API_KEY"`
3. **`base_url`**:
- If your provider has a custom endpoint
4. **Backoff controls** *(optional)*:
- `backoff_base_delay` *(default `2` seconds)* how long to pause before the first retry if the provider rate-limits you.
- `backoff_max_attempts` *(default `3`)* total tries for the same prompt (initial call + retries).
- `backoff_exponential_factor` *(default `2`)* how quickly the pause grows between retries. A factor of 2 yields waits like 2s → 4s → 8s.
- Because these plug into Crawl4AIs retry helper, every LLM strategy automatically follows the pacing you define here.
```python
llm_config = LLMConfig(
provider="openai/gpt-4o-mini",
api_token=os.getenv("OPENAI_API_KEY"),
backoff_base_delay=1, # optional
backoff_max_attempts=5, # optional
backoff_exponential_factor=3, # optional
)
llm_config = LLMConfig(provider="openai/gpt-4o-mini", api_token=os.getenv("OPENAI_API_KEY"))
```
## 4. Putting It All Together
In a typical scenario, you define **one** `BrowserConfig` for your crawler session, then create **one or more** `CrawlerRunConfig` & `LLMConfig` depending on each call's needs:

View File

@@ -308,20 +308,8 @@ The `clone()` method:
3.**`base_url`**:
- If your provider has a custom endpoint
4.**Retry/backoff controls** *(optional)*:
- `backoff_base_delay` *(default `2` seconds)* base delay inserted before the first retry when the provider returns a rate-limit response.
- `backoff_max_attempts` *(default `3`)* total number of attempts (initial call plus retries) before the request is surfaced as an error.
- `backoff_exponential_factor` *(default `2`)* growth rate for the retry delay (`delay = base_delay * factor^attempt`).
- These values are forwarded to the shared `perform_completion_with_backoff` helper, ensuring every strategy that consumes your `LLMConfig` honors the same throttling policy.
```python
llm_config = LLMConfig(
provider="openai/gpt-4o-mini",
api_token=os.getenv("OPENAI_API_KEY"),
backoff_base_delay=1, # optional
backoff_max_attempts=5, # optional
backoff_exponential_factor=3, #optional
)
llm_config = LLMConfig(provider="openai/gpt-4o-mini", api_token=os.getenv("OPENAI_API_KEY"))
```
## 4. Putting It All Together

View File

@@ -0,0 +1,455 @@
"""
Tests for ConfigHealthMonitor class.
This test suite validates the health monitoring functionality for crawler configurations.
"""
import pytest
import asyncio
from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState
from crawl4ai import BrowserConfig, CrawlerRunConfig
class TestConfigHealthMonitorBasic:
"""Basic functionality tests for ConfigHealthMonitor."""
@pytest.mark.asyncio
async def test_initialization(self):
"""Test monitor initialization with default settings."""
monitor = ConfigHealthMonitor()
assert monitor.check_interval >= 10.0 # Minimum enforced
assert monitor.failure_threshold >= 1
assert monitor.resolution_retry_limit >= 0
assert monitor.registered_count == 0
assert not monitor.is_running
assert monitor.uptime is None
@pytest.mark.asyncio
async def test_initialization_with_config(self):
"""Test monitor initialization with custom configuration."""
browser_config = BrowserConfig(headless=True, verbose=False)
monitor = ConfigHealthMonitor(
browser_config=browser_config,
check_interval=30.0,
failure_threshold=2,
resolution_retry_limit=3,
enable_metrics=True
)
assert monitor.check_interval == 30.0
assert monitor.failure_threshold == 2
assert monitor.resolution_retry_limit == 3
assert monitor.enable_metrics is True
@pytest.mark.asyncio
async def test_register_config(self):
"""Test registering a configuration."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
config_id = monitor.register_config(
config=config,
test_url="https://example.com",
config_id="test_config"
)
assert config_id == "test_config"
assert monitor.registered_count == 1
@pytest.mark.asyncio
async def test_register_config_auto_id(self):
"""Test registering a configuration with auto-generated ID."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
config_id = monitor.register_config(
config=config,
test_url="https://example.com"
)
assert config_id.startswith("config_")
assert monitor.registered_count == 1
@pytest.mark.asyncio
async def test_register_duplicate_config_id(self):
"""Test that duplicate config IDs raise an error."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
monitor.register_config(
config=config,
test_url="https://example.com",
config_id="duplicate"
)
with pytest.raises(ValueError, match="already registered"):
monitor.register_config(
config=config,
test_url="https://example.com",
config_id="duplicate"
)
@pytest.mark.asyncio
async def test_register_empty_url(self):
"""Test that empty test URLs raise an error."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig()
with pytest.raises(ValueError, match="cannot be empty"):
monitor.register_config(
config=config,
test_url=""
)
@pytest.mark.asyncio
async def test_unregister_config(self):
"""Test unregistering a configuration."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig()
config_id = monitor.register_config(
config=config,
test_url="https://example.com",
config_id="to_remove"
)
assert monitor.registered_count == 1
result = monitor.unregister_config(config_id)
assert result is True
assert monitor.registered_count == 0
@pytest.mark.asyncio
async def test_unregister_nonexistent_config(self):
"""Test unregistering a non-existent configuration."""
monitor = ConfigHealthMonitor()
result = monitor.unregister_config("nonexistent")
assert result is False
class TestConfigHealthMonitorLifecycle:
"""Lifecycle management tests."""
@pytest.mark.asyncio
async def test_start_stop(self):
"""Test monitor start and stop."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert not monitor.is_running
await monitor.start()
assert monitor.is_running
assert monitor.uptime is not None
await monitor.stop()
assert not monitor.is_running
@pytest.mark.asyncio
async def test_context_manager(self):
"""Test monitor as async context manager."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
assert monitor.is_running
# Register a config
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com"
)
assert monitor.registered_count == 1
# After context exit, should be stopped
assert not monitor.is_running
@pytest.mark.asyncio
async def test_double_start(self):
"""Test that double start is handled gracefully."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
await monitor.start()
await monitor.start() # Should log warning but not fail
assert monitor.is_running
await monitor.stop()
@pytest.mark.asyncio
async def test_stop_without_start(self):
"""Test that stop without start is handled gracefully."""
monitor = ConfigHealthMonitor()
await monitor.stop() # Should log warning but not fail
class TestConfigHealthMonitorHealthChecks:
"""Health checking tests."""
@pytest.mark.asyncio
async def test_manual_health_check_success(self):
"""Test manual health check on a working URL."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(page_timeout=30000),
test_url="https://example.com",
config_id="example_test"
)
# Perform health check
is_healthy = await monitor.check_health(config_id)
assert is_healthy is True
# Check state
status = monitor.get_health_status(config_id)
assert status.status == "healthy"
assert status.consecutive_failures == 0
assert status.consecutive_successes == 1
assert status.last_check_time is not None
assert status.last_success_time is not None
@pytest.mark.asyncio
async def test_manual_health_check_failure(self):
"""Test manual health check on a non-existent URL."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(page_timeout=10000),
test_url="https://this-domain-definitely-does-not-exist-12345.com",
config_id="failing_test"
)
# Perform health check
is_healthy = await monitor.check_health(config_id)
assert is_healthy is False
# Check state
status = monitor.get_health_status(config_id)
assert status.consecutive_failures == 1
assert status.last_error is not None
@pytest.mark.asyncio
async def test_health_check_nonexistent_config(self):
"""Test health check on non-existent config raises error."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
with pytest.raises(ValueError, match="not registered"):
await monitor.check_health("nonexistent")
class TestConfigHealthMonitorResolution:
"""Resolution strategy tests."""
@pytest.mark.asyncio
async def test_set_default_resolution_strategy(self):
"""Test setting a default resolution strategy."""
monitor = ConfigHealthMonitor()
async def dummy_strategy(state, monitor):
return ResolutionResult(success=True, action="dummy")
monitor.set_resolution_strategy(dummy_strategy)
assert monitor._default_resolution_strategy == dummy_strategy
@pytest.mark.asyncio
async def test_set_config_specific_resolution_strategy(self):
"""Test setting a config-specific resolution strategy."""
monitor = ConfigHealthMonitor()
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="with_strategy"
)
async def custom_strategy(state, monitor):
return ResolutionResult(success=True, action="custom")
monitor.set_resolution_strategy(custom_strategy, config_id)
assert monitor._resolution_strategies[config_id] == custom_strategy
@pytest.mark.asyncio
async def test_set_strategy_for_nonexistent_config(self):
"""Test setting strategy for non-existent config raises error."""
monitor = ConfigHealthMonitor()
async def dummy_strategy(state, monitor):
return ResolutionResult(success=True, action="dummy")
with pytest.raises(ValueError, match="not registered"):
monitor.set_resolution_strategy(dummy_strategy, "nonexistent")
@pytest.mark.asyncio
async def test_register_with_resolution_strategy(self):
"""Test registering a config with a resolution strategy."""
monitor = ConfigHealthMonitor()
async def custom_strategy(state, monitor):
return ResolutionResult(success=True, action="custom")
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
resolution_strategy=custom_strategy
)
assert monitor._resolution_strategies[config_id] == custom_strategy
class TestConfigHealthMonitorMetrics:
"""Metrics and status query tests."""
@pytest.mark.asyncio
async def test_get_health_status_single(self):
"""Test getting status for a single config."""
monitor = ConfigHealthMonitor()
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="status_test"
)
status = monitor.get_health_status(config_id)
assert isinstance(status, ConfigHealthState)
assert status.config_id == config_id
assert status.status == "healthy"
@pytest.mark.asyncio
async def test_get_health_status_all(self):
"""Test getting status for all configs."""
monitor = ConfigHealthMonitor()
# Register multiple configs
for i in range(3):
monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id=f"config_{i}"
)
all_statuses = monitor.get_health_status()
assert isinstance(all_statuses, dict)
assert len(all_statuses) == 3
assert all(isinstance(s, ConfigHealthState) for s in all_statuses.values())
@pytest.mark.asyncio
async def test_get_health_status_nonexistent(self):
"""Test getting status for non-existent config raises error."""
monitor = ConfigHealthMonitor()
with pytest.raises(ValueError, match="not registered"):
monitor.get_health_status("nonexistent")
@pytest.mark.asyncio
async def test_get_metrics_empty(self):
"""Test getting metrics with no configs."""
monitor = ConfigHealthMonitor()
metrics = monitor.get_metrics()
assert metrics["total_checks"] == 0
assert metrics["successful_checks"] == 0
assert metrics["failed_checks"] == 0
assert metrics["success_rate"] == 0.0
assert metrics["configs"] == {}
@pytest.mark.asyncio
async def test_get_metrics_with_checks(self):
"""Test metrics after performing health checks."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False),
enable_metrics=True
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="metrics_test"
)
# Perform a health check
await monitor.check_health(config_id)
metrics = monitor.get_metrics()
assert metrics["total_checks"] >= 0
assert "configs" in metrics
assert config_id in metrics["configs"]
config_metrics = metrics["configs"][config_id]
assert config_metrics["status"] == "healthy"
assert config_metrics["total_checks"] >= 1
assert "avg_response_time" in config_metrics
class TestConfigHealthMonitorProperties:
"""Property tests."""
@pytest.mark.asyncio
async def test_is_running_property(self):
"""Test is_running property."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert monitor.is_running is False
await monitor.start()
assert monitor.is_running is True
await monitor.stop()
assert monitor.is_running is False
@pytest.mark.asyncio
async def test_registered_count_property(self):
"""Test registered_count property."""
monitor = ConfigHealthMonitor()
assert monitor.registered_count == 0
for i in range(5):
monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://httpbin.org/html",
config_id=f"count_test_{i}"
)
assert monitor.registered_count == 5
monitor.unregister_config("count_test_0")
assert monitor.registered_count == 4
@pytest.mark.asyncio
async def test_uptime_property(self):
"""Test uptime property."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert monitor.uptime is None
await monitor.start()
await asyncio.sleep(0.1)
uptime = monitor.uptime
assert uptime is not None
assert uptime >= 0.1
await monitor.stop()
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])