diff --git a/crawl4ai/config_health_monitor.py b/crawl4ai/config_health_monitor.py new file mode 100644 index 00000000..6bdbb6f2 --- /dev/null +++ b/crawl4ai/config_health_monitor.py @@ -0,0 +1,1072 @@ +from __future__ import annotations + +import asyncio +import copy +import time +import uuid +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Literal, + Optional, + Union, +) + +from .async_configs import BrowserConfig, CrawlerRunConfig +from .async_logger import AsyncLogger, LogLevel +from .async_webcrawler import AsyncWebCrawler +from .cache_context import CacheMode + + +# ============================================================================ +# ConfigHealthMonitor Supporting Types +# ============================================================================ + +@dataclass +class ResolutionResult: + """Result of a resolution strategy execution.""" + success: bool + action: str # Human-readable description of action taken + modified_config: Optional[CrawlerRunConfig] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ConfigHealthState: + """Health state for a monitored configuration.""" + config_id: str + config: CrawlerRunConfig + test_url: str + status: Literal["healthy", "degraded", "failed", "resolving"] = "healthy" + consecutive_failures: int = 0 + consecutive_successes: int = 0 + last_check_time: Optional[datetime] = None + last_success_time: Optional[datetime] = None + last_error: Optional[str] = None + resolution_attempts: int = 0 + metrics: Dict[str, Any] = field(default_factory=dict) + + def copy(self) -> ConfigHealthState: + """Create a copy of this state.""" + return ConfigHealthState( + config_id=self.config_id, + config=self.config, + test_url=self.test_url, + status=self.status, + consecutive_failures=self.consecutive_failures, + consecutive_successes=self.consecutive_successes, + last_check_time=self.last_check_time, + last_success_time=self.last_success_time, + last_error=self.last_error, + resolution_attempts=self.resolution_attempts, + metrics=self.metrics.copy() + ) + + +# Type alias for resolution strategies +ResolutionStrategy = Callable[ + [ConfigHealthState, "ConfigHealthMonitor"], + Awaitable[ResolutionResult] +] + + +class _ErrorTrackingLogger: + """Lightweight proxy that records error logs for health checks.""" + + __slots__ = ("_base_logger", "_error_events") + + def __init__(self, base_logger): + object.__setattr__(self, "_base_logger", base_logger) + object.__setattr__(self, "_error_events", deque(maxlen=500)) + + def _record_error(self, message: str, tag: str, params: Optional[Dict[str, Any]]): + formatted_message = message + if params: + try: + formatted_message = message.format(**params) + except Exception: + pass + self._error_events.append({ + "timestamp": time.time(), + "tag": tag, + "message": formatted_message, + }) + + def poll_errors(self, since_ts: float) -> List[Dict[str, Any]]: + """Return error logs emitted since the provided timestamp.""" + if since_ts is None: + return list(self._error_events) + + recent = [event for event in self._error_events if event["timestamp"] >= since_ts] + + # Keep deque trimmed to the last few minutes to avoid unbounded memory use + cutoff = max(since_ts - 300.0, 0.0) + while self._error_events and self._error_events[0]["timestamp"] < cutoff: + self._error_events.popleft() + + return recent + + def debug(self, message: str, tag: str = "DEBUG", **kwargs): + return self._base_logger.debug(message, tag=tag, **kwargs) + + def info(self, message: str, tag: str = "INFO", **kwargs): + return self._base_logger.info(message, tag=tag, **kwargs) + + def success(self, message: str, tag: str = "SUCCESS", **kwargs): + return self._base_logger.success(message, tag=tag, **kwargs) + + def warning(self, message: str, tag: str = "WARNING", **kwargs): + return self._base_logger.warning(message, tag=tag, **kwargs) + + def error(self, message: str, tag: str = "ERROR", **kwargs): + self._record_error(message, tag, kwargs.get("params")) + return self._base_logger.error(message, tag=tag, **kwargs) + + def critical(self, message: str, tag: str = "CRITICAL", **kwargs): + self._record_error(message, tag, kwargs.get("params")) + return self._base_logger.critical(message, tag=tag, **kwargs) + + def exception(self, message: str, tag: str = "EXCEPTION", **kwargs): + self._record_error(message, tag, kwargs.get("params")) + return self._base_logger.exception(message, tag=tag, **kwargs) + + def fatal(self, message: str, tag: str = "FATAL", **kwargs): + self._record_error(message, tag, kwargs.get("params")) + return self._base_logger.fatal(message, tag=tag, **kwargs) + + def alert(self, message: str, tag: str = "ALERT", **kwargs): + self._record_error(message, tag, kwargs.get("params")) + return self._base_logger.alert(message, tag=tag, **kwargs) + + def error_status(self, url: str, error: str, tag: str = "ERROR", url_length: int = 100): + self._record_error(error, tag, None) + return self._base_logger.error_status(url, error, tag=tag, url_length=url_length) + + def url_status(self, url: str, success: bool, timing: float, tag: str = "FETCH", url_length: int = 100): + return self._base_logger.url_status(url, success, timing, tag=tag, url_length=url_length) + + def __getattr__(self, item): + return getattr(self._base_logger, item) + + def __setattr__(self, key, value): + if key in self.__slots__: + object.__setattr__(self, key, value) + else: + setattr(self._base_logger, key, value) + + +# ============================================================================ +# ConfigHealthMonitor Class +# ============================================================================ + +class ConfigHealthMonitor: + """ + Monitors health of crawler configurations and applies resolution strategies. + + Features: + - Periodic health checks for multiple configs + - Configurable failure thresholds + - Built-in and custom resolution strategies + - Metrics collection and reporting + - Graceful shutdown and cleanup + """ + + def __init__( + self, + browser_config: Optional[BrowserConfig] = None, + check_interval: float = 60.0, + failure_threshold: int = 3, + resolution_retry_limit: int = 2, + enable_metrics: bool = True, + logger: Optional[AsyncLogger] = None + ): + """ + Args: + browser_config: Shared browser configuration for all health checks + check_interval: Seconds between health checks (default: 60s) + failure_threshold: Consecutive failures before triggering resolution + resolution_retry_limit: Max resolution attempts before marking as failed + enable_metrics: Collect and report performance metrics + logger: Custom logger instance + """ + # Configuration + self.browser_config = browser_config + self.check_interval = max(10.0, check_interval) # Minimum 10s + self.failure_threshold = max(1, failure_threshold) + self.resolution_retry_limit = max(0, resolution_retry_limit) + self.enable_metrics = enable_metrics + + # Logger + if logger is None: + self.logger = AsyncLogger( + log_level=LogLevel.INFO, + verbose=False + ) + else: + self.logger = logger + + if not isinstance(self.logger, _ErrorTrackingLogger): + self.logger = _ErrorTrackingLogger(self.logger) + + # State management + self._health_states: Dict[str, ConfigHealthState] = {} + self._resolution_strategies: Dict[str, ResolutionStrategy] = {} + self._default_resolution_strategy: Optional[ResolutionStrategy] = None + self._state_lock = asyncio.Lock() + + # Crawler instance (shared across all health checks) + self._crawler: Optional[AsyncWebCrawler] = None + + # Monitoring control + self._running = False + self._monitor_task: Optional[asyncio.Task] = None + + # Metrics + self._metrics = { + "total_checks": 0, + "successful_checks": 0, + "failed_checks": 0, + "total_resolutions": 0, + "successful_resolutions": 0, + "start_time": None, + } + self._config_metrics: Dict[str, Dict[str, Any]] = {} + + # Performance tracking + self._last_check_times: Dict[str, float] = {} + + self.logger.info( + "ConfigHealthMonitor initialized", + tag="HEALTH_MONITOR", + params={ + "check_interval": self.check_interval, + "failure_threshold": self.failure_threshold, + "resolution_retry_limit": self.resolution_retry_limit, + "enable_metrics": self.enable_metrics, + } + ) + + # ======================================================================== + # Properties + # ======================================================================== + + @property + def is_running(self) -> bool: + """Check if monitoring is active.""" + return self._running + + @property + def registered_count(self) -> int: + """Number of registered configurations.""" + return len(self._health_states) + + @property + def uptime(self) -> Optional[float]: + """Monitor uptime in seconds.""" + if self._metrics["start_time"] is None: + return None + return time.time() - self._metrics["start_time"] + + # ======================================================================== + # Lifecycle Methods + # ======================================================================== + + async def start(self) -> None: + """Initialize crawler and start monitoring loop.""" + if self._running: + self.logger.warning( + "ConfigHealthMonitor already running", + tag="HEALTH_MONITOR" + ) + return + + try: + + # Initialize crawler + self._crawler = AsyncWebCrawler( + config=self.browser_config, + logger=self.logger + ) + await self._crawler.__aenter__() + + # Start monitoring + self._running = True + self._metrics["start_time"] = time.time() + self._monitor_task = asyncio.create_task(self._monitoring_loop()) + + self.logger.success( + "ConfigHealthMonitor started", + tag="HEALTH_MONITOR", + params={"registered_configs": self.registered_count} + ) + + except Exception as e: + self.logger.error( + f"Failed to start ConfigHealthMonitor: {e}", + tag="HEALTH_MONITOR" + ) + await self._cleanup() + raise + + async def stop(self) -> None: + """Gracefully stop monitoring and cleanup resources.""" + if not self._running: + self.logger.warning( + "ConfigHealthMonitor not running", + tag="HEALTH_MONITOR" + ) + return + + self.logger.info( + "Stopping ConfigHealthMonitor...", + tag="HEALTH_MONITOR" + ) + + self._running = False + + # Cancel monitoring task + if self._monitor_task and not self._monitor_task.done(): + self._monitor_task.cancel() + try: + await self._monitor_task + except asyncio.CancelledError: + pass + + # Cleanup resources + await self._cleanup() + + self.logger.success( + "ConfigHealthMonitor stopped", + tag="HEALTH_MONITOR", + params={ + "total_checks": self._metrics["total_checks"], + "uptime_seconds": self.uptime, + } + ) + + async def _cleanup(self) -> None: + """Cleanup internal resources.""" + if self._crawler: + try: + await self._crawler.__aexit__(None, None, None) + except Exception as e: + self.logger.error( + f"Error during crawler cleanup: {e}", + tag="HEALTH_MONITOR" + ) + finally: + self._crawler = None + + async def __aenter__(self): + """Context manager entry.""" + await self.start() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + await self.stop() + return False + + # ======================================================================== + # Private Helper Methods + # ======================================================================== + + def _generate_config_id(self) -> str: + """Generate a unique config ID.""" + return f"config_{uuid.uuid4().hex[:8]}" + + def _update_metrics( + self, + state: ConfigHealthState, + status_override: Optional[str] = None + ) -> None: + """Update internal metrics based on state.""" + if not self.enable_metrics: + return + + config_id = state.config_id + + # Initialize config metrics if needed + if config_id not in self._config_metrics: + self._config_metrics[config_id] = { + "total_checks": 0, + "successful_checks": 0, + "failed_checks": 0, + "total_response_time": 0.0, + "min_response_time": float('inf'), + "max_response_time": 0.0, + "resolution_attempts": 0, + "last_status": "unknown", + } + + # Update config-specific metrics + metrics = self._config_metrics[config_id] + metrics["total_checks"] += 1 + status = status_override or state.status + metrics["last_status"] = status + + if status == "healthy": + metrics["successful_checks"] += 1 + else: + metrics["failed_checks"] += 1 + + # Update response time metrics if available + if "response_time" in state.metrics: + rt = state.metrics["response_time"] + metrics["total_response_time"] += rt + metrics["min_response_time"] = min(metrics["min_response_time"], rt) + metrics["max_response_time"] = max(metrics["max_response_time"], rt) + + metrics["resolution_attempts"] = state.resolution_attempts + + def _record_check_result( + self, + state: ConfigHealthState, + status_label: str, + is_healthy: bool + ) -> None: + """Record the outcome of a single health check.""" + if not self.enable_metrics: + return + + self._metrics["total_checks"] += 1 + if is_healthy: + self._metrics["successful_checks"] += 1 + else: + self._metrics["failed_checks"] += 1 + + self._update_metrics(state, status_override=status_label) + + # ======================================================================== + # Configuration Management + # ======================================================================== + + def register_config( + self, + config: CrawlerRunConfig, + test_url: str, + config_id: Optional[str] = None, + resolution_strategy: Optional[ResolutionStrategy] = None + ) -> str: + """ + Register a configuration to monitor. + + Args: + config: The CrawlerRunConfig to monitor + test_url: URL to use for health checks + config_id: Unique identifier (auto-generated if None) + resolution_strategy: Custom resolution function for this config + + Returns: + config_id: The assigned configuration ID + + Raises: + ValueError: If config_id already exists + """ + if config_id is None: + config_id = self._generate_config_id() + + if config_id in self._health_states: + raise ValueError(f"Config ID '{config_id}' already registered") + + # Validate test_url + if not test_url or not test_url.strip(): + raise ValueError("test_url cannot be empty") + + # Create health state + state = ConfigHealthState( + config_id=config_id, + config=config, + test_url=test_url.strip(), + status="healthy", + ) + + self._health_states[config_id] = state + + # Register resolution strategy if provided + if resolution_strategy: + self._resolution_strategies[config_id] = resolution_strategy + + self.logger.info( + f"Registered config '{config_id}'", + tag="HEALTH_MONITOR", + params={ + "config_id": config_id, + "test_url": test_url, + "has_custom_strategy": resolution_strategy is not None, + "total_configs": self.registered_count, + } + ) + + return config_id + + def unregister_config(self, config_id: str) -> bool: + """ + Remove a configuration from monitoring. + + Args: + config_id: The configuration ID to remove + + Returns: + True if removed, False if not found + """ + if config_id not in self._health_states: + self.logger.warning( + f"Config '{config_id}' not found for unregistration", + tag="HEALTH_MONITOR" + ) + return False + + # Remove state and strategy + del self._health_states[config_id] + self._resolution_strategies.pop(config_id, None) + self._config_metrics.pop(config_id, None) + self._last_check_times.pop(config_id, None) + + self.logger.info( + f"Unregistered config '{config_id}'", + tag="HEALTH_MONITOR", + params={"remaining_configs": self.registered_count} + ) + + return True + + def set_resolution_strategy( + self, + strategy: ResolutionStrategy, + config_id: Optional[str] = None + ) -> None: + """ + Set resolution strategy for specific config or as global default. + + Args: + strategy: Resolution callable + config_id: Target config (None = set as global default) + + Raises: + ValueError: If config_id specified but not found + """ + if config_id is None: + # Set as default strategy + self._default_resolution_strategy = strategy + self.logger.info( + "Set default resolution strategy", + tag="HEALTH_MONITOR" + ) + else: + # Set for specific config + if config_id not in self._health_states: + raise ValueError(f"Config ID '{config_id}' not registered") + + self._resolution_strategies[config_id] = strategy + self.logger.info( + f"Set resolution strategy for config '{config_id}'", + tag="HEALTH_MONITOR" + ) + + # ======================================================================== + # Health Checking + # ======================================================================== + + async def check_health(self, config_id: str) -> bool: + """ + Perform a single health check for a configuration. + + Args: + config_id: The configuration to check + + Returns: + True if healthy, False otherwise + + Raises: + ValueError: If config_id not found + """ + if config_id not in self._health_states: + raise ValueError(f"Config ID '{config_id}' not registered") + + async with self._state_lock: + state = self._health_states[config_id] + is_healthy = await self._perform_health_check(state) + + # Update state + if is_healthy: + state.status = "healthy" + state.consecutive_failures = 0 + state.consecutive_successes += 1 + state.last_success_time = datetime.now() + state.resolution_attempts = 0 + else: + state.consecutive_failures += 1 + state.consecutive_successes = 0 + state.status = ( + "degraded" + if state.consecutive_failures < self.failure_threshold + else "failed" + ) + + state.last_check_time = datetime.now() + + # Update metrics + if self.enable_metrics: + self._record_check_result(state, state.status, is_healthy) + + return is_healthy + + async def _perform_health_check(self, state: ConfigHealthState) -> bool: + """ + Execute health check crawl. + + Args: + state: The configuration state to check + + Returns: + True if healthy, False otherwise + """ + if not self._crawler: + state.last_error = "Crawler not initialized" + return False + + start_time = time.time() + error_window_start = start_time + + try: + # Create lightweight config for health check + health_config = copy.deepcopy(state.config) + health_config.cache_mode = CacheMode.BYPASS # Always fresh for health checks + + # Perform crawl + result = await self._crawler.arun( + url=state.test_url, + config=health_config, + session_id=f"health_check_{state.config_id}" + ) + + # Record metrics + response_time = time.time() - start_time + state.metrics["response_time"] = response_time + state.metrics["status_code"] = result.status_code + state.metrics["html_length"] = len(result.html) if result.html else 0 + state.metrics["success"] = result.success + + recent_errors: List[Dict[str, Any]] = [] + if hasattr(self.logger, "poll_errors"): + recent_errors = self.logger.poll_errors(error_window_start) + + had_logger_errors = bool(recent_errors) + if had_logger_errors: + state.metrics["logger_errors"] = [event["message"] for event in recent_errors] + else: + state.metrics.pop("logger_errors", None) + + # Evaluate health criteria + is_healthy = ( + result.success + and result.status_code in [200, 201, 202, 203, 206] + and len(result.html or "") > 100 # Minimum content threshold + ) and not had_logger_errors + + if had_logger_errors: + self.logger.warning( + f"Logger reported {len(recent_errors)} errors during health check for '{state.config_id}'", + tag="HEALTH_CHECK", + params={ + "config_id": state.config_id, + "last_error": recent_errors[-1]["message"], + } + ) + + if not is_healthy: + log_error_msg = recent_errors[-1]["message"] if had_logger_errors else None + state.last_error = log_error_msg or result.error_message or "Health check failed" + self.logger.warning( + f"Health check failed for '{state.config_id}'", + tag="HEALTH_CHECK", + params={ + "config_id": state.config_id, + "status_code": result.status_code, + "error": state.last_error, + "consecutive_failures": state.consecutive_failures + 1, + } + ) + else: + self.logger.debug( + f"Health check passed for '{state.config_id}'", + tag="HEALTH_CHECK", + params={ + "config_id": state.config_id, + "response_time": round(response_time, 2), + "status_code": result.status_code, + } + ) + + return is_healthy + + except Exception as e: + response_time = time.time() - start_time + state.last_error = str(e) + state.metrics["response_time"] = response_time + state.metrics["exception"] = type(e).__name__ + + self.logger.error( + f"Health check exception for '{state.config_id}': {e}", + tag="HEALTH_CHECK", + params={ + "config_id": state.config_id, + "exception_type": type(e).__name__, + } + ) + + return False + + # ======================================================================== + # Resolution Management + # ======================================================================== + + async def _apply_resolution(self, config_id: str) -> bool: + """ + Apply resolution strategy for a failing config. + + Args: + config_id: The configuration to resolve + + Returns: + True if resolution succeeded, False otherwise + """ + state = self._health_states[config_id] + state.status = "resolving" + state.resolution_attempts += 1 + + # Get resolution strategy (config-specific or default) + strategy = self._resolution_strategies.get( + config_id, + self._default_resolution_strategy + ) + + if strategy is None: + self.logger.warning( + f"No resolution strategy for '{config_id}'", + tag="RESOLUTION", + params={"config_id": config_id} + ) + return False + + try: + self.logger.info( + f"Applying resolution for '{config_id}'", + tag="RESOLUTION", + params={ + "config_id": config_id, + "attempt": state.resolution_attempts, + "consecutive_failures": state.consecutive_failures, + } + ) + + # Execute resolution strategy + result = await strategy(state, self) + + # Update metrics + if self.enable_metrics: + self._metrics["total_resolutions"] += 1 + if result.success: + self._metrics["successful_resolutions"] += 1 + + if result.success: + self.logger.success( + f"Resolution succeeded for '{config_id}': {result.action}", + tag="RESOLUTION", + params={ + "config_id": config_id, + "action": result.action, + "metadata": result.metadata, + } + ) + + # Apply modified config if provided + if result.modified_config: + state.config = result.modified_config + self.logger.info( + f"Applied modified config for '{config_id}'", + tag="RESOLUTION" + ) + + # Reset failure counters on successful resolution + state.consecutive_failures = 0 + state.resolution_attempts = 0 + + return True + else: + self.logger.warning( + f"Resolution failed for '{config_id}': {result.action}", + tag="RESOLUTION", + params={ + "config_id": config_id, + "action": result.action, + } + ) + return False + + except Exception as e: + self.logger.error( + f"Resolution exception for '{config_id}': {e}", + tag="RESOLUTION", + params={ + "config_id": config_id, + "exception_type": type(e).__name__, + } + ) + return False + + async def _handle_permanent_failure(self, config_id: str) -> None: + """ + Handle config that has exceeded resolution retry limit. + + Args: + config_id: The permanently failed configuration + """ + state = self._health_states[config_id] + + self.logger.error( + f"Config '{config_id}' marked as permanently failed", + tag="HEALTH_MONITOR", + params={ + "config_id": config_id, + "consecutive_failures": state.consecutive_failures, + "resolution_attempts": state.resolution_attempts, + "last_error": state.last_error, + } + ) + + # Emit alert or notification here + # Could integrate with external alerting systems + + # ======================================================================== + # Monitoring Loop + # ======================================================================== + + async def _monitoring_loop(self) -> None: + """Main monitoring loop (runs in background task).""" + self.logger.info( + "Monitoring loop started", + tag="HEALTH_MONITOR", + params={"check_interval": self.check_interval} + ) + + while self._running: + try: + # Check all registered configs + config_ids = list(self._health_states.keys()) + + if not config_ids: + # No configs to monitor, wait and continue + await asyncio.sleep(self.check_interval) + continue + + for config_id in config_ids: + if not self._running: + break + + try: + await self._check_and_resolve(config_id) + except Exception as e: + self.logger.error( + f"Error checking config '{config_id}': {e}", + tag="HEALTH_MONITOR" + ) + + # Wait for next interval + await asyncio.sleep(self.check_interval) + + except asyncio.CancelledError: + self.logger.info( + "Monitoring loop cancelled", + tag="HEALTH_MONITOR" + ) + break + except Exception as e: + self.logger.error( + f"Monitoring loop error: {e}", + tag="HEALTH_MONITOR" + ) + # Brief pause before retry + await asyncio.sleep(5) + + self.logger.info( + "Monitoring loop stopped", + tag="HEALTH_MONITOR" + ) + + async def _check_and_resolve(self, config_id: str) -> None: + """ + Check health and apply resolution if needed. + + Args: + config_id: The configuration to check and potentially resolve + """ + async with self._state_lock: + state = self._health_states.get(config_id) + if not state: + return + + # Perform health check + is_healthy = await self._perform_health_check(state) + + if is_healthy: + if state.status != "healthy": + self.logger.success( + f"Config '{config_id}' recovered", + tag="HEALTH_MONITOR" + ) + state.status = "healthy" + state.consecutive_failures = 0 + state.consecutive_successes += 1 + state.last_success_time = datetime.now() + state.resolution_attempts = 0 + self._record_check_result(state, "healthy", True) + else: + state.consecutive_failures += 1 + state.consecutive_successes = 0 + status_label = ( + "degraded" + if state.consecutive_failures < self.failure_threshold + else "failed" + ) + state.status = status_label + self._record_check_result(state, status_label, False) + + if status_label == "failed": + if state.resolution_attempts < self.resolution_retry_limit: + resolution_success = await self._apply_resolution(config_id) + + if resolution_success: + is_healthy_after = await self._perform_health_check(state) + + if is_healthy_after: + state.status = "healthy" + state.consecutive_failures = 0 + state.consecutive_successes = 1 + state.last_success_time = datetime.now() + state.resolution_attempts = 0 + self._record_check_result(state, "healthy", True) + else: + state.consecutive_failures += 1 + state.consecutive_successes = 0 + status_label = ( + "degraded" + if state.consecutive_failures < self.failure_threshold + else "failed" + ) + state.status = status_label + self._record_check_result(state, status_label, False) + else: + await self._handle_permanent_failure(config_id) + + state.last_check_time = datetime.now() + + # ======================================================================== + # Status and Metrics Queries + # ======================================================================== + + def get_health_status( + self, + config_id: Optional[str] = None + ) -> Union[ConfigHealthState, Dict[str, ConfigHealthState]]: + """ + Get health status for one or all configs. + + Args: + config_id: Specific config ID, or None for all configs + + Returns: + Single ConfigHealthState or dict of all states + + Raises: + ValueError: If config_id specified but not found + """ + if config_id is None: + # Return all states (as copies for thread safety) + return { + cid: state.copy() + for cid, state in self._health_states.items() + } + else: + if config_id not in self._health_states: + raise ValueError(f"Config ID '{config_id}' not registered") + + return self._health_states[config_id].copy() + + def get_metrics(self) -> Dict[str, Any]: + """ + Get aggregated metrics across all configs. + + Returns: + Dictionary containing comprehensive metrics + """ + metrics = self._metrics.copy() + + # Calculate derived metrics + total_checks = metrics["total_checks"] + if total_checks > 0: + metrics["success_rate"] = metrics["successful_checks"] / total_checks + else: + metrics["success_rate"] = 0.0 + + total_resolutions = metrics["total_resolutions"] + if total_resolutions > 0: + metrics["resolution_success_rate"] = ( + metrics["successful_resolutions"] / total_resolutions + ) + else: + metrics["resolution_success_rate"] = 0.0 + + # Add uptime + metrics["uptime_seconds"] = self.uptime + + # Add per-config metrics + config_metrics = {} + for config_id, state in self._health_states.items(): + config_stats = self._config_metrics.get(config_id, {}) + + # Calculate per-config derived metrics + total = config_stats.get("total_checks", 0) + if total > 0: + uptime_percent = ( + config_stats.get("successful_checks", 0) / total * 100 + ) + avg_response_time = ( + config_stats.get("total_response_time", 0.0) / total + ) + else: + uptime_percent = 0.0 + avg_response_time = 0.0 + + config_metrics[config_id] = { + "status": state.status, + "uptime_percent": round(uptime_percent, 2), + "avg_response_time": round(avg_response_time, 3), + "min_response_time": config_stats.get("min_response_time", 0.0), + "max_response_time": config_stats.get("max_response_time", 0.0), + "total_checks": total, + "successful_checks": config_stats.get("successful_checks", 0), + "failed_checks": config_stats.get("failed_checks", 0), + "consecutive_failures": state.consecutive_failures, + "resolution_attempts": state.resolution_attempts, + "last_check": ( + state.last_check_time.isoformat() + if state.last_check_time else None + ), + "last_success": ( + state.last_success_time.isoformat() + if state.last_success_time else None + ), + "last_error": state.last_error, + } + + metrics["configs"] = config_metrics + + return metrics \ No newline at end of file diff --git a/docs/examples/config_health_monitor_example.py b/docs/examples/config_health_monitor_example.py new file mode 100644 index 00000000..30feaaa3 --- /dev/null +++ b/docs/examples/config_health_monitor_example.py @@ -0,0 +1,378 @@ +""" +Example: Using ConfigHealthMonitor for Crawler Configuration Health Monitoring + +This example demonstrates how to: +1. Initialize a ConfigHealthMonitor +2. Register multiple crawler configurations +3. Set up custom resolution strategies +4. Monitor health status and metrics +5. Handle configuration failures automatically +""" + +import asyncio +import copy +from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState +from crawl4ai import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_configs import CacheMode + + +# ============================================================================ +# Custom Resolution Strategies +# ============================================================================ + +async def incremental_backoff_strategy( + state: ConfigHealthState, + monitor: ConfigHealthMonitor +) -> ResolutionResult: + """ + Increase timeouts progressively when health checks fail. + """ + print(f" Applying incremental backoff for '{state.config_id}'...") + + new_config = copy.deepcopy(state.config) + + # Increase timeouts by 100% + new_config.page_timeout = int(state.config.page_timeout * 2) + if state.config.delay_before_return_html: + new_config.delay_before_return_html = state.config.delay_before_return_html + 2.0 + + print(f" -> Increased page_timeout to {new_config.page_timeout}ms") + + return ResolutionResult( + success=True, + action="timeout_increased", + modified_config=new_config, + metadata={ + "old_timeout": state.config.page_timeout, + "new_timeout": new_config.page_timeout + } + ) + + +async def toggle_magic_mode_strategy( + state: ConfigHealthState, + monitor: ConfigHealthMonitor +) -> ResolutionResult: + """ + Enable/disable magic mode for anti-bot handling. + """ + print(f" Toggling magic mode for '{state.config_id}'...") + + new_config = copy.deepcopy(state.config) + new_config.magic = not state.config.magic + + action = f"magic_{'enabled' if new_config.magic else 'disabled'}" + print(f" -> Magic mode now: {new_config.magic}") + + return ResolutionResult( + success=True, + action=action, + modified_config=new_config + ) + + +async def log_and_alert_strategy( + state: ConfigHealthState, + monitor: ConfigHealthMonitor +) -> ResolutionResult: + """ + Log failure and send alert (in production, this would send to monitoring system). + """ + print(f" ALERT: Config '{state.config_id}' has failed!") + print(f" → Error: {state.last_error}") + print(f" → Consecutive failures: {state.consecutive_failures}") + print(f" → Resolution attempts: {state.resolution_attempts}") + + # In production, send to Slack, email, PagerDuty, etc. + # await send_slack_alert(state) + # await send_email_alert(state) + + return ResolutionResult( + success=False, + action="alerted", + metadata={"alert_sent": True} + ) + + +def create_resolution_chain(strategies): + """ + Create a resolution chain that tries strategies sequentially. + + After each successful strategy we immediately run a health check. If the + check still fails, we continue to the next strategy until one succeeds or + we exhaust the chain. + """ + async def chained_strategy( + state: ConfigHealthState, + monitor: ConfigHealthMonitor + ) -> ResolutionResult: + if not strategies: + return ResolutionResult(success=False, action="no_strategies_configured") + + print(f"\nStarting resolution chain for '{state.config_id}'") + + steps_metadata = [] + + for i, strategy in enumerate(strategies, 1): + print(f"\n Step {i}/{len(strategies)}: {strategy.__name__}") + result = await strategy(state, monitor) + steps_metadata.append({ + "step": i, + "strategy": strategy.__name__, + "success": result.success, + "action": result.action, + "metadata": result.metadata + }) + + if result.success: + action_label = result.action or strategy.__name__ + print(f" Resolution applied: {action_label}") + + if result.modified_config: + state.config = result.modified_config + + print(" Running validation health check...") + try: + validation_passed = await monitor._perform_health_check(state) + except Exception as exc: + print(f" Validation error: {exc}") + validation_passed = False + + steps_metadata[-1]["validation_passed"] = validation_passed + + if validation_passed: + print(" Validation succeeded. Resolution chain complete.") + return ResolutionResult( + success=True, + action=action_label, + modified_config=state.config, + metadata={"steps": steps_metadata} + ) + + print(" Validation failed. Trying next strategy...") + else: + print(f" Resolution failed: {result.action}") + + print(f"\n All resolution strategies failed") + return ResolutionResult( + success=False, + action="all_strategies_failed", + metadata={"steps": steps_metadata} + ) + + return chained_strategy + + +# ============================================================================ +# Main Example +# ============================================================================ + +async def main(): + print("=" * 70) + print("ConfigHealthMonitor Example") + print("=" * 70) + + # Initialize monitor + print("\nInitializing ConfigHealthMonitor...") + monitor = ConfigHealthMonitor( + browser_config=BrowserConfig( + headless=True, + verbose=False + ), + check_interval=15.0, # Check every 15 seconds + failure_threshold=2, # Trigger resolution after 2 failures + resolution_retry_limit=2, # Try resolution twice max + enable_metrics=True + ) + + await monitor.start() + print(f" Monitor started (check_interval={monitor.check_interval}s)") + + # ======================================================================== + # Register Configurations + # ======================================================================== + + print("\nRegistering configurations...") + + # Config 1: Reliable website (should stay healthy) + config_1_id = monitor.register_config( + config=CrawlerRunConfig( + page_timeout=30000, + cache_mode=CacheMode.BYPASS, + magic=True, + ), + test_url="https://www.olly.com/", + config_id="olly_scraper", + resolution_strategy=create_resolution_chain([ + toggle_magic_mode_strategy, + ]) + ) + print(f" Registered: {config_1_id} with resolution chain") + + # Config 2: Another reliable website + config_2_id = monitor.register_config( + config=CrawlerRunConfig( + page_timeout=20000, + magic=True, + ), + test_url="https://example.com", + config_id="example_scraper" + ) + print(f" Registered: {config_2_id}") + + # Config 3: Intentionally problematic (very short timeout) + # This will trigger resolution strategies + config_3_id = monitor.register_config( + config=CrawlerRunConfig( + page_timeout=100, # 100ms - will likely timeout + cache_mode=CacheMode.BYPASS, + ), + test_url="https://httpbin.org/delay/5", # Delays response by 5 seconds + config_id="impossible_scraper", + resolution_strategy=create_resolution_chain([ + incremental_backoff_strategy, + toggle_magic_mode_strategy, + log_and_alert_strategy + ]) + ) + print(f" Registered: {config_3_id} (with resolution chain)") + + print(f"\n Total configs registered: {monitor.registered_count}") + + # ======================================================================== + # Perform Manual Health Checks + # ======================================================================== + + print("\nPerforming initial health checks...") + + for config_id in [config_1_id, config_2_id, config_3_id]: + is_healthy = await monitor.check_health(config_id) + status = monitor.get_health_status(config_id) + + status_label = "healthy" if is_healthy else "unhealthy" + print(f" {config_id}: {status.status} ({status_label})") + if not is_healthy: + print(f" Error: {status.last_error}") + + # ======================================================================== + # Monitor for a Period + # ======================================================================== + + print("\nMonitoring for 60 seconds (background loop running)...") + print(" The monitor will automatically check all configs every 15s") + print(" and apply resolution strategies when failures are detected.\n") + + # Check status every 20 seconds + for i in range(3): + await asyncio.sleep(20) + + print(f"\nStatus Check #{i+1}") + print("-" * 70) + + all_statuses = monitor.get_health_status() + + for config_id, state in all_statuses.items(): + # Status emoji + print(f"\n{config_id}") + print(f" Status: {state.status}") + print(f" Consecutive failures: {state.consecutive_failures}") + print(f" Consecutive successes: {state.consecutive_successes}") + print(f" Resolution attempts: {state.resolution_attempts}") + + if state.last_check_time: + print(f" Last checked: {state.last_check_time.strftime('%H:%M:%S')}") + if state.last_success_time: + print(f" Last success: {state.last_success_time.strftime('%H:%M:%S')}") + if state.last_error: + print(f" Last error: {state.last_error[:100]}...") + + # ======================================================================== + # Final Metrics Report + # ======================================================================== + + print("\n" + "=" * 70) + print("Final Metrics Report") + print("=" * 70) + + metrics = monitor.get_metrics() + + # Global metrics + print("\nGlobal Metrics:") + print(f" Total checks: {metrics['total_checks']}") + print(f" Successful checks: {metrics['successful_checks']}") + print(f" Failed checks: {metrics['failed_checks']}") + print(f" Success rate: {metrics['success_rate']:.1%}") + print(f" Total resolutions: {metrics['total_resolutions']}") + print(f" Successful resolutions: {metrics['successful_resolutions']}") + if metrics['total_resolutions'] > 0: + print(f" Resolution success rate: {metrics['resolution_success_rate']:.1%}") + print(f" Uptime: {metrics['uptime_seconds']:.1f}s") + + # Per-config metrics + print("\nPer-Config Metrics:") + for config_id, config_metrics in metrics['configs'].items(): + print(f"\n {config_id}:") + print(f" Status: {config_metrics['status']}") + print(f" Uptime: {config_metrics['uptime_percent']:.1f}%") + print(f" Avg response time: {config_metrics['avg_response_time']:.3f}s") + print(f" Total checks: {config_metrics['total_checks']}") + print(f" Successful: {config_metrics['successful_checks']}") + print(f" Failed: {config_metrics['failed_checks']}") + print(f" Resolution attempts: {config_metrics['resolution_attempts']}") + + # ======================================================================== + # Cleanup + # ======================================================================== + + print("\nStopping monitor...") + await monitor.stop() + print(" Monitor stopped successfully") + + print("\n" + "=" * 70) + print("Example completed!") + print("=" * 70) + + +# ============================================================================ +# Alternative: Using Context Manager +# ============================================================================ + +async def example_with_context_manager(): + """ + Simplified example using context manager for automatic cleanup. + """ + print("\nExample: Using Context Manager\n") + + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False), + check_interval=30.0, + failure_threshold=3 + ) as monitor: + + # Register configs + monitor.register_config( + config=CrawlerRunConfig(page_timeout=30000), + test_url="https://httpbin.org/html", + config_id="example" + ) + + # Monitor automatically runs in background + print("Monitor running...") + await asyncio.sleep(10) + + # Get status + status = monitor.get_health_status("example") + print(f"Status: {status.status}") + + # Context manager automatically stops on exit + + print("Monitor automatically stopped") + + +if __name__ == "__main__": + # Run main example + asyncio.run(main()) + + # Uncomment to run context manager example + # asyncio.run(example_with_context_manager()) + diff --git a/tests/general/test_config_health_monitor.py b/tests/general/test_config_health_monitor.py new file mode 100644 index 00000000..3d71fc46 --- /dev/null +++ b/tests/general/test_config_health_monitor.py @@ -0,0 +1,455 @@ +""" +Tests for ConfigHealthMonitor class. + +This test suite validates the health monitoring functionality for crawler configurations. +""" + +import pytest +import asyncio +from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState +from crawl4ai import BrowserConfig, CrawlerRunConfig + + +class TestConfigHealthMonitorBasic: + """Basic functionality tests for ConfigHealthMonitor.""" + + @pytest.mark.asyncio + async def test_initialization(self): + """Test monitor initialization with default settings.""" + monitor = ConfigHealthMonitor() + + assert monitor.check_interval >= 10.0 # Minimum enforced + assert monitor.failure_threshold >= 1 + assert monitor.resolution_retry_limit >= 0 + assert monitor.registered_count == 0 + assert not monitor.is_running + assert monitor.uptime is None + + @pytest.mark.asyncio + async def test_initialization_with_config(self): + """Test monitor initialization with custom configuration.""" + browser_config = BrowserConfig(headless=True, verbose=False) + + monitor = ConfigHealthMonitor( + browser_config=browser_config, + check_interval=30.0, + failure_threshold=2, + resolution_retry_limit=3, + enable_metrics=True + ) + + assert monitor.check_interval == 30.0 + assert monitor.failure_threshold == 2 + assert monitor.resolution_retry_limit == 3 + assert monitor.enable_metrics is True + + @pytest.mark.asyncio + async def test_register_config(self): + """Test registering a configuration.""" + monitor = ConfigHealthMonitor() + + config = CrawlerRunConfig(page_timeout=30000) + config_id = monitor.register_config( + config=config, + test_url="https://example.com", + config_id="test_config" + ) + + assert config_id == "test_config" + assert monitor.registered_count == 1 + + @pytest.mark.asyncio + async def test_register_config_auto_id(self): + """Test registering a configuration with auto-generated ID.""" + monitor = ConfigHealthMonitor() + + config = CrawlerRunConfig(page_timeout=30000) + config_id = monitor.register_config( + config=config, + test_url="https://example.com" + ) + + assert config_id.startswith("config_") + assert monitor.registered_count == 1 + + @pytest.mark.asyncio + async def test_register_duplicate_config_id(self): + """Test that duplicate config IDs raise an error.""" + monitor = ConfigHealthMonitor() + + config = CrawlerRunConfig(page_timeout=30000) + monitor.register_config( + config=config, + test_url="https://example.com", + config_id="duplicate" + ) + + with pytest.raises(ValueError, match="already registered"): + monitor.register_config( + config=config, + test_url="https://example.com", + config_id="duplicate" + ) + + @pytest.mark.asyncio + async def test_register_empty_url(self): + """Test that empty test URLs raise an error.""" + monitor = ConfigHealthMonitor() + config = CrawlerRunConfig() + + with pytest.raises(ValueError, match="cannot be empty"): + monitor.register_config( + config=config, + test_url="" + ) + + @pytest.mark.asyncio + async def test_unregister_config(self): + """Test unregistering a configuration.""" + monitor = ConfigHealthMonitor() + + config = CrawlerRunConfig() + config_id = monitor.register_config( + config=config, + test_url="https://example.com", + config_id="to_remove" + ) + + assert monitor.registered_count == 1 + + result = monitor.unregister_config(config_id) + assert result is True + assert monitor.registered_count == 0 + + @pytest.mark.asyncio + async def test_unregister_nonexistent_config(self): + """Test unregistering a non-existent configuration.""" + monitor = ConfigHealthMonitor() + + result = monitor.unregister_config("nonexistent") + assert result is False + + +class TestConfigHealthMonitorLifecycle: + """Lifecycle management tests.""" + + @pytest.mark.asyncio + async def test_start_stop(self): + """Test monitor start and stop.""" + monitor = ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) + + assert not monitor.is_running + + await monitor.start() + assert monitor.is_running + assert monitor.uptime is not None + + await monitor.stop() + assert not monitor.is_running + + @pytest.mark.asyncio + async def test_context_manager(self): + """Test monitor as async context manager.""" + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) as monitor: + assert monitor.is_running + + # Register a config + config_id = monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com" + ) + assert monitor.registered_count == 1 + + # After context exit, should be stopped + assert not monitor.is_running + + @pytest.mark.asyncio + async def test_double_start(self): + """Test that double start is handled gracefully.""" + monitor = ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) + + await monitor.start() + await monitor.start() # Should log warning but not fail + + assert monitor.is_running + await monitor.stop() + + @pytest.mark.asyncio + async def test_stop_without_start(self): + """Test that stop without start is handled gracefully.""" + monitor = ConfigHealthMonitor() + await monitor.stop() # Should log warning but not fail + + +class TestConfigHealthMonitorHealthChecks: + """Health checking tests.""" + + @pytest.mark.asyncio + async def test_manual_health_check_success(self): + """Test manual health check on a working URL.""" + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) as monitor: + config_id = monitor.register_config( + config=CrawlerRunConfig(page_timeout=30000), + test_url="https://example.com", + config_id="example_test" + ) + + # Perform health check + is_healthy = await monitor.check_health(config_id) + + assert is_healthy is True + + # Check state + status = monitor.get_health_status(config_id) + assert status.status == "healthy" + assert status.consecutive_failures == 0 + assert status.consecutive_successes == 1 + assert status.last_check_time is not None + assert status.last_success_time is not None + + @pytest.mark.asyncio + async def test_manual_health_check_failure(self): + """Test manual health check on a non-existent URL.""" + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) as monitor: + config_id = monitor.register_config( + config=CrawlerRunConfig(page_timeout=10000), + test_url="https://this-domain-definitely-does-not-exist-12345.com", + config_id="failing_test" + ) + + # Perform health check + is_healthy = await monitor.check_health(config_id) + + assert is_healthy is False + + # Check state + status = monitor.get_health_status(config_id) + assert status.consecutive_failures == 1 + assert status.last_error is not None + + @pytest.mark.asyncio + async def test_health_check_nonexistent_config(self): + """Test health check on non-existent config raises error.""" + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) as monitor: + with pytest.raises(ValueError, match="not registered"): + await monitor.check_health("nonexistent") + + +class TestConfigHealthMonitorResolution: + """Resolution strategy tests.""" + + @pytest.mark.asyncio + async def test_set_default_resolution_strategy(self): + """Test setting a default resolution strategy.""" + monitor = ConfigHealthMonitor() + + async def dummy_strategy(state, monitor): + return ResolutionResult(success=True, action="dummy") + + monitor.set_resolution_strategy(dummy_strategy) + assert monitor._default_resolution_strategy == dummy_strategy + + @pytest.mark.asyncio + async def test_set_config_specific_resolution_strategy(self): + """Test setting a config-specific resolution strategy.""" + monitor = ConfigHealthMonitor() + + config_id = monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com", + config_id="with_strategy" + ) + + async def custom_strategy(state, monitor): + return ResolutionResult(success=True, action="custom") + + monitor.set_resolution_strategy(custom_strategy, config_id) + assert monitor._resolution_strategies[config_id] == custom_strategy + + @pytest.mark.asyncio + async def test_set_strategy_for_nonexistent_config(self): + """Test setting strategy for non-existent config raises error.""" + monitor = ConfigHealthMonitor() + + async def dummy_strategy(state, monitor): + return ResolutionResult(success=True, action="dummy") + + with pytest.raises(ValueError, match="not registered"): + monitor.set_resolution_strategy(dummy_strategy, "nonexistent") + + @pytest.mark.asyncio + async def test_register_with_resolution_strategy(self): + """Test registering a config with a resolution strategy.""" + monitor = ConfigHealthMonitor() + + async def custom_strategy(state, monitor): + return ResolutionResult(success=True, action="custom") + + config_id = monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com", + resolution_strategy=custom_strategy + ) + + assert monitor._resolution_strategies[config_id] == custom_strategy + + +class TestConfigHealthMonitorMetrics: + """Metrics and status query tests.""" + + @pytest.mark.asyncio + async def test_get_health_status_single(self): + """Test getting status for a single config.""" + monitor = ConfigHealthMonitor() + + config_id = monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com", + config_id="status_test" + ) + + status = monitor.get_health_status(config_id) + + assert isinstance(status, ConfigHealthState) + assert status.config_id == config_id + assert status.status == "healthy" + + @pytest.mark.asyncio + async def test_get_health_status_all(self): + """Test getting status for all configs.""" + monitor = ConfigHealthMonitor() + + # Register multiple configs + for i in range(3): + monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com", + config_id=f"config_{i}" + ) + + all_statuses = monitor.get_health_status() + + assert isinstance(all_statuses, dict) + assert len(all_statuses) == 3 + assert all(isinstance(s, ConfigHealthState) for s in all_statuses.values()) + + @pytest.mark.asyncio + async def test_get_health_status_nonexistent(self): + """Test getting status for non-existent config raises error.""" + monitor = ConfigHealthMonitor() + + with pytest.raises(ValueError, match="not registered"): + monitor.get_health_status("nonexistent") + + @pytest.mark.asyncio + async def test_get_metrics_empty(self): + """Test getting metrics with no configs.""" + monitor = ConfigHealthMonitor() + + metrics = monitor.get_metrics() + + assert metrics["total_checks"] == 0 + assert metrics["successful_checks"] == 0 + assert metrics["failed_checks"] == 0 + assert metrics["success_rate"] == 0.0 + assert metrics["configs"] == {} + + @pytest.mark.asyncio + async def test_get_metrics_with_checks(self): + """Test metrics after performing health checks.""" + async with ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False), + enable_metrics=True + ) as monitor: + config_id = monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://example.com", + config_id="metrics_test" + ) + + # Perform a health check + await monitor.check_health(config_id) + + metrics = monitor.get_metrics() + + assert metrics["total_checks"] >= 0 + assert "configs" in metrics + assert config_id in metrics["configs"] + + config_metrics = metrics["configs"][config_id] + assert config_metrics["status"] == "healthy" + assert config_metrics["total_checks"] >= 1 + assert "avg_response_time" in config_metrics + + +class TestConfigHealthMonitorProperties: + """Property tests.""" + + @pytest.mark.asyncio + async def test_is_running_property(self): + """Test is_running property.""" + monitor = ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) + + assert monitor.is_running is False + + await monitor.start() + assert monitor.is_running is True + + await monitor.stop() + assert monitor.is_running is False + + @pytest.mark.asyncio + async def test_registered_count_property(self): + """Test registered_count property.""" + monitor = ConfigHealthMonitor() + + assert monitor.registered_count == 0 + + for i in range(5): + monitor.register_config( + config=CrawlerRunConfig(), + test_url="https://httpbin.org/html", + config_id=f"count_test_{i}" + ) + + assert monitor.registered_count == 5 + + monitor.unregister_config("count_test_0") + assert monitor.registered_count == 4 + + @pytest.mark.asyncio + async def test_uptime_property(self): + """Test uptime property.""" + monitor = ConfigHealthMonitor( + browser_config=BrowserConfig(headless=True, verbose=False) + ) + + assert monitor.uptime is None + + await monitor.start() + await asyncio.sleep(0.1) + + uptime = monitor.uptime + assert uptime is not None + assert uptime >= 0.1 + + await monitor.stop() + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) +