Add comprehensive Docker cluster orchestration with horizontal scaling support. CLI Commands: - crwl server start/stop/restart/status/scale/logs - Auto-detection: Single (N=1) → Swarm (N>1) → Compose (N>1 fallback) - Support for 1-100 container replicas with zero-downtime scaling Infrastructure: - Nginx load balancing (round-robin API, sticky sessions monitoring) - Redis-based container discovery via heartbeats (30s interval) - Real-time monitoring dashboard with cluster-wide visibility - WebSocket aggregation from all containers Security & Stability Fixes (12 critical issues): - Add timeout protection to browser pool locks (prevent deadlocks) - Implement Redis retry logic with exponential backoff - Add container ID validation (prevent Redis key injection) - Add CLI input sanitization (prevent shell injection) - Add file locking for state management (prevent corruption) - Fix WebSocket resource leaks and connection cleanup - Add graceful degradation and circuit breakers Configuration: - RedisTTLConfig dataclass with environment variable support - Template-based docker-compose.yml and nginx.conf generation - Comprehensive error handling with actionable messages Documentation: - AGENT.md: Complete DevOps context for AI assistants - MULTI_CONTAINER_ARCHITECTURE.md: Technical architecture guide - Reorganized docs into deploy/docker/docs/
253 lines
8.7 KiB
Python
253 lines
8.7 KiB
Python
import dns.resolver
|
|
import logging
|
|
import yaml
|
|
import os
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from fastapi import Request
|
|
from typing import Dict, Optional
|
|
|
|
class TaskStatus(str, Enum):
|
|
PROCESSING = "processing"
|
|
FAILED = "failed"
|
|
COMPLETED = "completed"
|
|
|
|
class FilterType(str, Enum):
|
|
RAW = "raw"
|
|
FIT = "fit"
|
|
BM25 = "bm25"
|
|
LLM = "llm"
|
|
|
|
def load_config() -> Dict:
|
|
"""Load and return application configuration with environment variable overrides."""
|
|
config_path = Path(__file__).parent / "config.yml"
|
|
with open(config_path, "r") as config_file:
|
|
config = yaml.safe_load(config_file)
|
|
|
|
# Override LLM provider from environment if set
|
|
llm_provider = os.environ.get("LLM_PROVIDER")
|
|
if llm_provider:
|
|
config["llm"]["provider"] = llm_provider
|
|
logging.info(f"LLM provider overridden from environment: {llm_provider}")
|
|
|
|
# Also support direct API key from environment if the provider-specific key isn't set
|
|
llm_api_key = os.environ.get("LLM_API_KEY")
|
|
if llm_api_key and "api_key" not in config["llm"]:
|
|
config["llm"]["api_key"] = llm_api_key
|
|
logging.info("LLM API key loaded from LLM_API_KEY environment variable")
|
|
|
|
return config
|
|
|
|
def setup_logging(config: Dict) -> None:
|
|
"""Configure application logging."""
|
|
logging.basicConfig(
|
|
level=config["logging"]["level"],
|
|
format=config["logging"]["format"]
|
|
)
|
|
|
|
def get_base_url(request: Request) -> str:
|
|
"""Get base URL including scheme and host."""
|
|
return f"{request.url.scheme}://{request.url.netloc}"
|
|
|
|
def is_task_id(value: str) -> bool:
|
|
"""Check if the value matches task ID pattern."""
|
|
return value.startswith("llm_") and "_" in value
|
|
|
|
def datetime_handler(obj: any) -> Optional[str]:
|
|
"""Handle datetime serialization for JSON."""
|
|
if hasattr(obj, 'isoformat'):
|
|
return obj.isoformat()
|
|
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
|
|
|
|
def should_cleanup_task(created_at: str, ttl_seconds: int = 3600) -> bool:
|
|
"""Check if task should be cleaned up based on creation time."""
|
|
created = datetime.fromisoformat(created_at)
|
|
return (datetime.now() - created).total_seconds() > ttl_seconds
|
|
|
|
def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
|
|
"""Decode Redis hash data from bytes to strings."""
|
|
return {k.decode('utf-8'): v.decode('utf-8') for k, v in hash_data.items()}
|
|
|
|
|
|
|
|
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]:
|
|
"""Get the appropriate API key based on the LLM provider.
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The API key if directly configured, otherwise None to let litellm handle it
|
|
"""
|
|
# Check if direct API key is configured (for backward compatibility)
|
|
if "api_key" in config["llm"]:
|
|
return config["llm"]["api_key"]
|
|
|
|
# Return None - litellm will automatically find the right environment variable
|
|
return None
|
|
|
|
|
|
def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple[bool, str]:
|
|
"""Validate that the LLM provider has an associated API key.
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
# If a direct API key is configured, validation passes
|
|
if "api_key" in config["llm"]:
|
|
return True, ""
|
|
|
|
# Otherwise, trust that litellm will find the appropriate environment variable
|
|
# We can't easily validate this without reimplementing litellm's logic
|
|
return True, ""
|
|
|
|
|
|
def get_llm_temperature(config: Dict, provider: Optional[str] = None) -> Optional[float]:
|
|
"""Get temperature setting based on the LLM provider.
|
|
|
|
Priority order:
|
|
1. Provider-specific environment variable (e.g., OPENAI_TEMPERATURE)
|
|
2. Global LLM_TEMPERATURE environment variable
|
|
3. None (to use litellm/provider defaults)
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The temperature setting if configured, otherwise None
|
|
"""
|
|
# Check provider-specific temperature first
|
|
if provider:
|
|
provider_name = provider.split('/')[0].upper()
|
|
provider_temp = os.environ.get(f"{provider_name}_TEMPERATURE")
|
|
if provider_temp:
|
|
try:
|
|
return float(provider_temp)
|
|
except ValueError:
|
|
logging.warning(f"Invalid temperature value for {provider_name}: {provider_temp}")
|
|
|
|
# Check global LLM_TEMPERATURE
|
|
global_temp = os.environ.get("LLM_TEMPERATURE")
|
|
if global_temp:
|
|
try:
|
|
return float(global_temp)
|
|
except ValueError:
|
|
logging.warning(f"Invalid global temperature value: {global_temp}")
|
|
|
|
# Return None to use litellm/provider defaults
|
|
return None
|
|
|
|
|
|
def get_llm_base_url(config: Dict, provider: Optional[str] = None) -> Optional[str]:
|
|
"""Get base URL setting based on the LLM provider.
|
|
|
|
Priority order:
|
|
1. Provider-specific environment variable (e.g., OPENAI_BASE_URL)
|
|
2. Global LLM_BASE_URL environment variable
|
|
3. None (to use default endpoints)
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The base URL if configured, otherwise None
|
|
"""
|
|
# Check provider-specific base URL first
|
|
if provider:
|
|
provider_name = provider.split('/')[0].upper()
|
|
provider_url = os.environ.get(f"{provider_name}_BASE_URL")
|
|
if provider_url:
|
|
return provider_url
|
|
|
|
# Check global LLM_BASE_URL
|
|
return os.environ.get("LLM_BASE_URL")
|
|
|
|
|
|
def verify_email_domain(email: str) -> bool:
|
|
try:
|
|
domain = email.split('@')[1]
|
|
# Try to resolve MX records for the domain.
|
|
records = dns.resolver.resolve(domain, 'MX')
|
|
return True if records else False
|
|
except Exception as e:
|
|
return False
|
|
|
|
def get_container_memory_percent() -> float:
|
|
"""Get actual container memory usage vs limit (cgroup v1/v2 aware)."""
|
|
try:
|
|
# Try cgroup v2 first
|
|
usage_path = Path("/sys/fs/cgroup/memory.current")
|
|
limit_path = Path("/sys/fs/cgroup/memory.max")
|
|
if not usage_path.exists():
|
|
# Fall back to cgroup v1
|
|
usage_path = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
|
|
limit_path = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
|
|
|
|
usage = int(usage_path.read_text())
|
|
limit = int(limit_path.read_text())
|
|
|
|
# Handle unlimited (v2: "max", v1: > 1e18)
|
|
if limit > 1e18:
|
|
import psutil
|
|
limit = psutil.virtual_memory().total
|
|
|
|
return (usage / limit) * 100
|
|
except:
|
|
# Non-container or unsupported: fallback to host
|
|
import psutil
|
|
return psutil.virtual_memory().percent
|
|
|
|
|
|
def get_container_id() -> str:
|
|
"""Get current container ID (hostname in Docker)."""
|
|
import socket
|
|
return socket.gethostname()
|
|
|
|
|
|
def detect_deployment_mode() -> tuple[str, list[dict]]:
|
|
"""Detect if running in single/swarm/compose mode and get container list.
|
|
|
|
Returns:
|
|
(mode, containers) where mode is "single"|"swarm"|"compose"
|
|
containers is list of {id, hostname, healthy}
|
|
"""
|
|
import socket
|
|
my_hostname = socket.gethostname()
|
|
|
|
# Check if we're behind nginx (Compose mode indicator)
|
|
# In Compose, service name resolves to multiple IPs
|
|
try:
|
|
import socket as sock
|
|
# Try to resolve "crawl4ai" service name (Compose service)
|
|
try:
|
|
addrs = sock.getaddrinfo("crawl4ai", None)
|
|
unique_ips = set(addr[4][0] for addr in addrs)
|
|
if len(unique_ips) > 1:
|
|
# Multiple IPs = Compose with replicas
|
|
containers = [
|
|
{"id": f"container-{i+1}", "hostname": f"crawl4ai-{i+1}", "healthy": True}
|
|
for i in range(len(unique_ips))
|
|
]
|
|
return "compose", containers
|
|
except:
|
|
pass
|
|
|
|
# Check for Swarm mode (TODO: needs swarm-specific detection)
|
|
# For now, if hostname pattern matches swarm, detect it
|
|
if "." in my_hostname and len(my_hostname.split(".")) > 2:
|
|
# Swarm hostname format: service.slot.task_id
|
|
return "swarm", [{"id": my_hostname, "hostname": my_hostname, "healthy": True}]
|
|
|
|
except:
|
|
pass
|
|
|
|
# Default: single container
|
|
return "single", [{"id": my_hostname, "hostname": my_hostname, "healthy": True}] |