Compare commits

..

1 Commits

Author SHA1 Message Date
AHMET YILMAZ
eca04b0368 Refactor Pydantic model configuration to use ConfigDict for arbitrary types 2025-11-18 15:40:17 +08:00
8 changed files with 15 additions and 2006 deletions

View File

@@ -167,11 +167,6 @@ RUN mkdir -p /home/appuser/.cache/ms-playwright \
RUN crawl4ai-doctor
# Ensure all cache directories belong to appuser
# This fixes permission issues with .cache/url_seeder and other runtime cache dirs
RUN mkdir -p /home/appuser/.cache \
&& chown -R appuser:appuser /home/appuser/.cache
# Copy application code
COPY deploy/docker/* ${APP_HOME}/

View File

@@ -728,18 +728,18 @@ class EmbeddingStrategy(CrawlStrategy):
provider = llm_config_dict.get('provider', 'openai/gpt-4o-mini') if llm_config_dict else 'openai/gpt-4o-mini'
api_token = llm_config_dict.get('api_token') if llm_config_dict else None
response = perform_completion_with_backoff(
provider=provider,
prompt_with_variables=prompt,
api_token=api_token,
json_response=True
)
# response = perform_completion_with_backoff(
# provider=provider,
# prompt_with_variables=prompt,
# api_token=api_token,
# json_response=True
# )
variations = json.loads(response.choices[0].message.content)
# variations = json.loads(response.choices[0].message.content)
# # Mock data with more variations for split
# variations ={'queries': ['what are the best vegetables to use in fried rice?', 'how do I make vegetable fried rice from scratch?', 'can you provide a quick recipe for vegetable fried rice?', 'what cooking techniques are essential for perfect fried rice with vegetables?', 'how to add flavor to vegetable fried rice?', 'are there any tips for making healthy fried rice with vegetables?']}
variations ={'queries': ['what are the best vegetables to use in fried rice?', 'how do I make vegetable fried rice from scratch?', 'can you provide a quick recipe for vegetable fried rice?', 'what cooking techniques are essential for perfect fried rice with vegetables?', 'how to add flavor to vegetable fried rice?', 'are there any tips for making healthy fried rice with vegetables?']}
# variations = {'queries': [

View File

@@ -617,17 +617,17 @@ class AsyncWebCrawler:
else config.chunking_strategy
)
sections = chunking.chunk(content)
# extracted_content = config.extraction_strategy.run(_url, sections)
# extracted_content = config.extraction_strategy.run(url, sections)
# Use async version if available for better parallelism
if hasattr(config.extraction_strategy, 'arun'):
extracted_content = await config.extraction_strategy.arun(_url, sections)
extracted_content = await config.extraction_strategy.arun(url, sections)
else:
# Fallback to sync version run in thread pool to avoid blocking
extracted_content = await asyncio.to_thread(
config.extraction_strategy.run, url, sections
)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
from pydantic import BaseModel, HttpUrl, PrivateAttr, Field
from pydantic import BaseModel, HttpUrl, PrivateAttr, Field, ConfigDict
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from typing import AsyncGenerator
from typing import Generic, TypeVar
@@ -153,8 +153,7 @@ class CrawlResult(BaseModel):
console_messages: Optional[List[Dict[str, Any]]] = None
tables: List[Dict] = Field(default_factory=list) # NEW [{headers,rows,caption,summary}]
class Config:
arbitrary_types_allowed = True
model_config = ConfigDict(arbitrary_types_allowed=True)
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
# and model_dump override all exist to support a smooth transition from markdown as a string
@@ -332,8 +331,7 @@ class AsyncCrawlResponse(BaseModel):
network_requests: Optional[List[Dict[str, Any]]] = None
console_messages: Optional[List[Dict[str, Any]]] = None
class Config:
arbitrary_types_allowed = True
model_config = ConfigDict(arbitrary_types_allowed=True)
###############################
# Scraping Models

View File

@@ -1,378 +0,0 @@
"""
Example: Using ConfigHealthMonitor for Crawler Configuration Health Monitoring
This example demonstrates how to:
1. Initialize a ConfigHealthMonitor
2. Register multiple crawler configurations
3. Set up custom resolution strategies
4. Monitor health status and metrics
5. Handle configuration failures automatically
"""
import asyncio
import copy
from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_configs import CacheMode
# ============================================================================
# Custom Resolution Strategies
# ============================================================================
async def incremental_backoff_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Increase timeouts progressively when health checks fail.
"""
print(f" Applying incremental backoff for '{state.config_id}'...")
new_config = copy.deepcopy(state.config)
# Increase timeouts by 100%
new_config.page_timeout = int(state.config.page_timeout * 2)
if state.config.delay_before_return_html:
new_config.delay_before_return_html = state.config.delay_before_return_html + 2.0
print(f" -> Increased page_timeout to {new_config.page_timeout}ms")
return ResolutionResult(
success=True,
action="timeout_increased",
modified_config=new_config,
metadata={
"old_timeout": state.config.page_timeout,
"new_timeout": new_config.page_timeout
}
)
async def toggle_magic_mode_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Enable/disable magic mode for anti-bot handling.
"""
print(f" Toggling magic mode for '{state.config_id}'...")
new_config = copy.deepcopy(state.config)
new_config.magic = not state.config.magic
action = f"magic_{'enabled' if new_config.magic else 'disabled'}"
print(f" -> Magic mode now: {new_config.magic}")
return ResolutionResult(
success=True,
action=action,
modified_config=new_config
)
async def log_and_alert_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
"""
Log failure and send alert (in production, this would send to monitoring system).
"""
print(f" ALERT: Config '{state.config_id}' has failed!")
print(f" → Error: {state.last_error}")
print(f" → Consecutive failures: {state.consecutive_failures}")
print(f" → Resolution attempts: {state.resolution_attempts}")
# In production, send to Slack, email, PagerDuty, etc.
# await send_slack_alert(state)
# await send_email_alert(state)
return ResolutionResult(
success=False,
action="alerted",
metadata={"alert_sent": True}
)
def create_resolution_chain(strategies):
"""
Create a resolution chain that tries strategies sequentially.
After each successful strategy we immediately run a health check. If the
check still fails, we continue to the next strategy until one succeeds or
we exhaust the chain.
"""
async def chained_strategy(
state: ConfigHealthState,
monitor: ConfigHealthMonitor
) -> ResolutionResult:
if not strategies:
return ResolutionResult(success=False, action="no_strategies_configured")
print(f"\nStarting resolution chain for '{state.config_id}'")
steps_metadata = []
for i, strategy in enumerate(strategies, 1):
print(f"\n Step {i}/{len(strategies)}: {strategy.__name__}")
result = await strategy(state, monitor)
steps_metadata.append({
"step": i,
"strategy": strategy.__name__,
"success": result.success,
"action": result.action,
"metadata": result.metadata
})
if result.success:
action_label = result.action or strategy.__name__
print(f" Resolution applied: {action_label}")
if result.modified_config:
state.config = result.modified_config
print(" Running validation health check...")
try:
validation_passed = await monitor._perform_health_check(state)
except Exception as exc:
print(f" Validation error: {exc}")
validation_passed = False
steps_metadata[-1]["validation_passed"] = validation_passed
if validation_passed:
print(" Validation succeeded. Resolution chain complete.")
return ResolutionResult(
success=True,
action=action_label,
modified_config=state.config,
metadata={"steps": steps_metadata}
)
print(" Validation failed. Trying next strategy...")
else:
print(f" Resolution failed: {result.action}")
print(f"\n All resolution strategies failed")
return ResolutionResult(
success=False,
action="all_strategies_failed",
metadata={"steps": steps_metadata}
)
return chained_strategy
# ============================================================================
# Main Example
# ============================================================================
async def main():
print("=" * 70)
print("ConfigHealthMonitor Example")
print("=" * 70)
# Initialize monitor
print("\nInitializing ConfigHealthMonitor...")
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(
headless=True,
verbose=False
),
check_interval=15.0, # Check every 15 seconds
failure_threshold=2, # Trigger resolution after 2 failures
resolution_retry_limit=2, # Try resolution twice max
enable_metrics=True
)
await monitor.start()
print(f" Monitor started (check_interval={monitor.check_interval}s)")
# ========================================================================
# Register Configurations
# ========================================================================
print("\nRegistering configurations...")
# Config 1: Reliable website (should stay healthy)
config_1_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=30000,
cache_mode=CacheMode.BYPASS,
magic=True,
),
test_url="https://www.olly.com/",
config_id="olly_scraper",
resolution_strategy=create_resolution_chain([
toggle_magic_mode_strategy,
])
)
print(f" Registered: {config_1_id} with resolution chain")
# Config 2: Another reliable website
config_2_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=20000,
magic=True,
),
test_url="https://example.com",
config_id="example_scraper"
)
print(f" Registered: {config_2_id}")
# Config 3: Intentionally problematic (very short timeout)
# This will trigger resolution strategies
config_3_id = monitor.register_config(
config=CrawlerRunConfig(
page_timeout=100, # 100ms - will likely timeout
cache_mode=CacheMode.BYPASS,
),
test_url="https://httpbin.org/delay/5", # Delays response by 5 seconds
config_id="impossible_scraper",
resolution_strategy=create_resolution_chain([
incremental_backoff_strategy,
toggle_magic_mode_strategy,
log_and_alert_strategy
])
)
print(f" Registered: {config_3_id} (with resolution chain)")
print(f"\n Total configs registered: {monitor.registered_count}")
# ========================================================================
# Perform Manual Health Checks
# ========================================================================
print("\nPerforming initial health checks...")
for config_id in [config_1_id, config_2_id, config_3_id]:
is_healthy = await monitor.check_health(config_id)
status = monitor.get_health_status(config_id)
status_label = "healthy" if is_healthy else "unhealthy"
print(f" {config_id}: {status.status} ({status_label})")
if not is_healthy:
print(f" Error: {status.last_error}")
# ========================================================================
# Monitor for a Period
# ========================================================================
print("\nMonitoring for 60 seconds (background loop running)...")
print(" The monitor will automatically check all configs every 15s")
print(" and apply resolution strategies when failures are detected.\n")
# Check status every 20 seconds
for i in range(3):
await asyncio.sleep(20)
print(f"\nStatus Check #{i+1}")
print("-" * 70)
all_statuses = monitor.get_health_status()
for config_id, state in all_statuses.items():
# Status emoji
print(f"\n{config_id}")
print(f" Status: {state.status}")
print(f" Consecutive failures: {state.consecutive_failures}")
print(f" Consecutive successes: {state.consecutive_successes}")
print(f" Resolution attempts: {state.resolution_attempts}")
if state.last_check_time:
print(f" Last checked: {state.last_check_time.strftime('%H:%M:%S')}")
if state.last_success_time:
print(f" Last success: {state.last_success_time.strftime('%H:%M:%S')}")
if state.last_error:
print(f" Last error: {state.last_error[:100]}...")
# ========================================================================
# Final Metrics Report
# ========================================================================
print("\n" + "=" * 70)
print("Final Metrics Report")
print("=" * 70)
metrics = monitor.get_metrics()
# Global metrics
print("\nGlobal Metrics:")
print(f" Total checks: {metrics['total_checks']}")
print(f" Successful checks: {metrics['successful_checks']}")
print(f" Failed checks: {metrics['failed_checks']}")
print(f" Success rate: {metrics['success_rate']:.1%}")
print(f" Total resolutions: {metrics['total_resolutions']}")
print(f" Successful resolutions: {metrics['successful_resolutions']}")
if metrics['total_resolutions'] > 0:
print(f" Resolution success rate: {metrics['resolution_success_rate']:.1%}")
print(f" Uptime: {metrics['uptime_seconds']:.1f}s")
# Per-config metrics
print("\nPer-Config Metrics:")
for config_id, config_metrics in metrics['configs'].items():
print(f"\n {config_id}:")
print(f" Status: {config_metrics['status']}")
print(f" Uptime: {config_metrics['uptime_percent']:.1f}%")
print(f" Avg response time: {config_metrics['avg_response_time']:.3f}s")
print(f" Total checks: {config_metrics['total_checks']}")
print(f" Successful: {config_metrics['successful_checks']}")
print(f" Failed: {config_metrics['failed_checks']}")
print(f" Resolution attempts: {config_metrics['resolution_attempts']}")
# ========================================================================
# Cleanup
# ========================================================================
print("\nStopping monitor...")
await monitor.stop()
print(" Monitor stopped successfully")
print("\n" + "=" * 70)
print("Example completed!")
print("=" * 70)
# ============================================================================
# Alternative: Using Context Manager
# ============================================================================
async def example_with_context_manager():
"""
Simplified example using context manager for automatic cleanup.
"""
print("\nExample: Using Context Manager\n")
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False),
check_interval=30.0,
failure_threshold=3
) as monitor:
# Register configs
monitor.register_config(
config=CrawlerRunConfig(page_timeout=30000),
test_url="https://httpbin.org/html",
config_id="example"
)
# Monitor automatically runs in background
print("Monitor running...")
await asyncio.sleep(10)
# Get status
status = monitor.get_health_status("example")
print(f"Status: {status.status}")
# Context manager automatically stops on exit
print("Monitor automatically stopped")
if __name__ == "__main__":
# Run main example
asyncio.run(main())
# Uncomment to run context manager example
# asyncio.run(example_with_context_manager())

View File

@@ -9,21 +9,6 @@ from crawl4ai import (
RateLimiter,
CacheMode
)
from crawl4ai.extraction_strategy import ExtractionStrategy
class MockExtractionStrategy(ExtractionStrategy):
"""Mock extraction strategy for testing URL parameter handling"""
def __init__(self):
super().__init__()
self.run_calls = []
def extract(self, url: str, html: str, *args, **kwargs):
return [{"test": "data"}]
def run(self, url: str, sections: List[str], *args, **kwargs):
self.run_calls.append(url)
return super().run(url, sections, *args, **kwargs)
@pytest.mark.asyncio
@pytest.mark.parametrize("viewport", [
@@ -157,72 +142,8 @@ async def test_error_handling(error_url):
assert not result.success
assert result.error_message is not None
@pytest.mark.asyncio
async def test_extraction_strategy_run_with_regular_url():
"""
Regression test for extraction_strategy.run URL parameter handling with regular URLs.
This test verifies that when is_raw_html=False (regular URL),
extraction_strategy.run is called with the actual URL.
"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
mock_strategy = MockExtractionStrategy()
# Test regular URL (is_raw_html=False)
regular_url = "https://example.com"
result = await crawler.arun(
url=regular_url,
config=CrawlerRunConfig(
page_timeout=30000,
extraction_strategy=mock_strategy,
cache_mode=CacheMode.BYPASS
)
)
assert result.success
assert len(mock_strategy.run_calls) == 1
assert mock_strategy.run_calls[0] == regular_url, f"Expected '{regular_url}', got '{mock_strategy.run_calls[0]}'"
@pytest.mark.asyncio
async def test_extraction_strategy_run_with_raw_html():
"""
Regression test for extraction_strategy.run URL parameter handling with raw HTML.
This test verifies that when is_raw_html=True (URL starts with "raw:"),
extraction_strategy.run is called with "Raw HTML" instead of the actual URL.
"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
mock_strategy = MockExtractionStrategy()
# Test raw HTML URL (is_raw_html=True automatically set)
raw_html_url = "raw:<html><body><h1>Test HTML</h1><p>This is a test.</p></body></html>"
result = await crawler.arun(
url=raw_html_url,
config=CrawlerRunConfig(
page_timeout=30000,
extraction_strategy=mock_strategy,
cache_mode=CacheMode.BYPASS
)
)
assert result.success
assert len(mock_strategy.run_calls) == 1
assert mock_strategy.run_calls[0] == "Raw HTML", f"Expected 'Raw HTML', got '{mock_strategy.run_calls[0]}'"
if __name__ == "__main__":
asyncio.run(test_viewport_config((1024, 768)))
asyncio.run(test_memory_management())
asyncio.run(test_rate_limiting())
asyncio.run(test_javascript_execution())
asyncio.run(test_extraction_strategy_run_with_regular_url())
asyncio.run(test_extraction_strategy_run_with_raw_html())
asyncio.run(test_javascript_execution())

View File

@@ -1,455 +0,0 @@
"""
Tests for ConfigHealthMonitor class.
This test suite validates the health monitoring functionality for crawler configurations.
"""
import pytest
import asyncio
from crawl4ai.config_health_monitor import ConfigHealthMonitor, ResolutionResult, ConfigHealthState
from crawl4ai import BrowserConfig, CrawlerRunConfig
class TestConfigHealthMonitorBasic:
"""Basic functionality tests for ConfigHealthMonitor."""
@pytest.mark.asyncio
async def test_initialization(self):
"""Test monitor initialization with default settings."""
monitor = ConfigHealthMonitor()
assert monitor.check_interval >= 10.0 # Minimum enforced
assert monitor.failure_threshold >= 1
assert monitor.resolution_retry_limit >= 0
assert monitor.registered_count == 0
assert not monitor.is_running
assert monitor.uptime is None
@pytest.mark.asyncio
async def test_initialization_with_config(self):
"""Test monitor initialization with custom configuration."""
browser_config = BrowserConfig(headless=True, verbose=False)
monitor = ConfigHealthMonitor(
browser_config=browser_config,
check_interval=30.0,
failure_threshold=2,
resolution_retry_limit=3,
enable_metrics=True
)
assert monitor.check_interval == 30.0
assert monitor.failure_threshold == 2
assert monitor.resolution_retry_limit == 3
assert monitor.enable_metrics is True
@pytest.mark.asyncio
async def test_register_config(self):
"""Test registering a configuration."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
config_id = monitor.register_config(
config=config,
test_url="https://example.com",
config_id="test_config"
)
assert config_id == "test_config"
assert monitor.registered_count == 1
@pytest.mark.asyncio
async def test_register_config_auto_id(self):
"""Test registering a configuration with auto-generated ID."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
config_id = monitor.register_config(
config=config,
test_url="https://example.com"
)
assert config_id.startswith("config_")
assert monitor.registered_count == 1
@pytest.mark.asyncio
async def test_register_duplicate_config_id(self):
"""Test that duplicate config IDs raise an error."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig(page_timeout=30000)
monitor.register_config(
config=config,
test_url="https://example.com",
config_id="duplicate"
)
with pytest.raises(ValueError, match="already registered"):
monitor.register_config(
config=config,
test_url="https://example.com",
config_id="duplicate"
)
@pytest.mark.asyncio
async def test_register_empty_url(self):
"""Test that empty test URLs raise an error."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig()
with pytest.raises(ValueError, match="cannot be empty"):
monitor.register_config(
config=config,
test_url=""
)
@pytest.mark.asyncio
async def test_unregister_config(self):
"""Test unregistering a configuration."""
monitor = ConfigHealthMonitor()
config = CrawlerRunConfig()
config_id = monitor.register_config(
config=config,
test_url="https://example.com",
config_id="to_remove"
)
assert monitor.registered_count == 1
result = monitor.unregister_config(config_id)
assert result is True
assert monitor.registered_count == 0
@pytest.mark.asyncio
async def test_unregister_nonexistent_config(self):
"""Test unregistering a non-existent configuration."""
monitor = ConfigHealthMonitor()
result = monitor.unregister_config("nonexistent")
assert result is False
class TestConfigHealthMonitorLifecycle:
"""Lifecycle management tests."""
@pytest.mark.asyncio
async def test_start_stop(self):
"""Test monitor start and stop."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert not monitor.is_running
await monitor.start()
assert monitor.is_running
assert monitor.uptime is not None
await monitor.stop()
assert not monitor.is_running
@pytest.mark.asyncio
async def test_context_manager(self):
"""Test monitor as async context manager."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
assert monitor.is_running
# Register a config
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com"
)
assert monitor.registered_count == 1
# After context exit, should be stopped
assert not monitor.is_running
@pytest.mark.asyncio
async def test_double_start(self):
"""Test that double start is handled gracefully."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
await monitor.start()
await monitor.start() # Should log warning but not fail
assert monitor.is_running
await monitor.stop()
@pytest.mark.asyncio
async def test_stop_without_start(self):
"""Test that stop without start is handled gracefully."""
monitor = ConfigHealthMonitor()
await monitor.stop() # Should log warning but not fail
class TestConfigHealthMonitorHealthChecks:
"""Health checking tests."""
@pytest.mark.asyncio
async def test_manual_health_check_success(self):
"""Test manual health check on a working URL."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(page_timeout=30000),
test_url="https://example.com",
config_id="example_test"
)
# Perform health check
is_healthy = await monitor.check_health(config_id)
assert is_healthy is True
# Check state
status = monitor.get_health_status(config_id)
assert status.status == "healthy"
assert status.consecutive_failures == 0
assert status.consecutive_successes == 1
assert status.last_check_time is not None
assert status.last_success_time is not None
@pytest.mark.asyncio
async def test_manual_health_check_failure(self):
"""Test manual health check on a non-existent URL."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(page_timeout=10000),
test_url="https://this-domain-definitely-does-not-exist-12345.com",
config_id="failing_test"
)
# Perform health check
is_healthy = await monitor.check_health(config_id)
assert is_healthy is False
# Check state
status = monitor.get_health_status(config_id)
assert status.consecutive_failures == 1
assert status.last_error is not None
@pytest.mark.asyncio
async def test_health_check_nonexistent_config(self):
"""Test health check on non-existent config raises error."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
) as monitor:
with pytest.raises(ValueError, match="not registered"):
await monitor.check_health("nonexistent")
class TestConfigHealthMonitorResolution:
"""Resolution strategy tests."""
@pytest.mark.asyncio
async def test_set_default_resolution_strategy(self):
"""Test setting a default resolution strategy."""
monitor = ConfigHealthMonitor()
async def dummy_strategy(state, monitor):
return ResolutionResult(success=True, action="dummy")
monitor.set_resolution_strategy(dummy_strategy)
assert monitor._default_resolution_strategy == dummy_strategy
@pytest.mark.asyncio
async def test_set_config_specific_resolution_strategy(self):
"""Test setting a config-specific resolution strategy."""
monitor = ConfigHealthMonitor()
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="with_strategy"
)
async def custom_strategy(state, monitor):
return ResolutionResult(success=True, action="custom")
monitor.set_resolution_strategy(custom_strategy, config_id)
assert monitor._resolution_strategies[config_id] == custom_strategy
@pytest.mark.asyncio
async def test_set_strategy_for_nonexistent_config(self):
"""Test setting strategy for non-existent config raises error."""
monitor = ConfigHealthMonitor()
async def dummy_strategy(state, monitor):
return ResolutionResult(success=True, action="dummy")
with pytest.raises(ValueError, match="not registered"):
monitor.set_resolution_strategy(dummy_strategy, "nonexistent")
@pytest.mark.asyncio
async def test_register_with_resolution_strategy(self):
"""Test registering a config with a resolution strategy."""
monitor = ConfigHealthMonitor()
async def custom_strategy(state, monitor):
return ResolutionResult(success=True, action="custom")
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
resolution_strategy=custom_strategy
)
assert monitor._resolution_strategies[config_id] == custom_strategy
class TestConfigHealthMonitorMetrics:
"""Metrics and status query tests."""
@pytest.mark.asyncio
async def test_get_health_status_single(self):
"""Test getting status for a single config."""
monitor = ConfigHealthMonitor()
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="status_test"
)
status = monitor.get_health_status(config_id)
assert isinstance(status, ConfigHealthState)
assert status.config_id == config_id
assert status.status == "healthy"
@pytest.mark.asyncio
async def test_get_health_status_all(self):
"""Test getting status for all configs."""
monitor = ConfigHealthMonitor()
# Register multiple configs
for i in range(3):
monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id=f"config_{i}"
)
all_statuses = monitor.get_health_status()
assert isinstance(all_statuses, dict)
assert len(all_statuses) == 3
assert all(isinstance(s, ConfigHealthState) for s in all_statuses.values())
@pytest.mark.asyncio
async def test_get_health_status_nonexistent(self):
"""Test getting status for non-existent config raises error."""
monitor = ConfigHealthMonitor()
with pytest.raises(ValueError, match="not registered"):
monitor.get_health_status("nonexistent")
@pytest.mark.asyncio
async def test_get_metrics_empty(self):
"""Test getting metrics with no configs."""
monitor = ConfigHealthMonitor()
metrics = monitor.get_metrics()
assert metrics["total_checks"] == 0
assert metrics["successful_checks"] == 0
assert metrics["failed_checks"] == 0
assert metrics["success_rate"] == 0.0
assert metrics["configs"] == {}
@pytest.mark.asyncio
async def test_get_metrics_with_checks(self):
"""Test metrics after performing health checks."""
async with ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False),
enable_metrics=True
) as monitor:
config_id = monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://example.com",
config_id="metrics_test"
)
# Perform a health check
await monitor.check_health(config_id)
metrics = monitor.get_metrics()
assert metrics["total_checks"] >= 0
assert "configs" in metrics
assert config_id in metrics["configs"]
config_metrics = metrics["configs"][config_id]
assert config_metrics["status"] == "healthy"
assert config_metrics["total_checks"] >= 1
assert "avg_response_time" in config_metrics
class TestConfigHealthMonitorProperties:
"""Property tests."""
@pytest.mark.asyncio
async def test_is_running_property(self):
"""Test is_running property."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert monitor.is_running is False
await monitor.start()
assert monitor.is_running is True
await monitor.stop()
assert monitor.is_running is False
@pytest.mark.asyncio
async def test_registered_count_property(self):
"""Test registered_count property."""
monitor = ConfigHealthMonitor()
assert monitor.registered_count == 0
for i in range(5):
monitor.register_config(
config=CrawlerRunConfig(),
test_url="https://httpbin.org/html",
config_id=f"count_test_{i}"
)
assert monitor.registered_count == 5
monitor.unregister_config("count_test_0")
assert monitor.registered_count == 4
@pytest.mark.asyncio
async def test_uptime_property(self):
"""Test uptime property."""
monitor = ConfigHealthMonitor(
browser_config=BrowserConfig(headless=True, verbose=False)
)
assert monitor.uptime is None
await monitor.start()
await asyncio.sleep(0.1)
uptime = monitor.uptime
assert uptime is not None
assert uptime >= 0.1
await monitor.stop()
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])