Major refactoring to eliminate memory leaks and enable high-scale crawling: - **Smart 3-Tier Browser Pool**: - Permanent browser (always-ready default config) - Hot pool (configs used 3+ times, longer TTL) - Cold pool (new/rare configs, short TTL) - Auto-promotion: cold → hot after 3 uses - 100% pool reuse achieved in tests - **Container-Aware Memory Detection**: - Read cgroup v1/v2 memory limits (not host metrics) - Accurate memory pressure detection in Docker - Memory-based browser creation blocking - **Adaptive Janitor**: - Dynamic cleanup intervals (10s/30s/60s based on memory) - Tiered TTLs: cold 30-300s, hot 120-600s - Aggressive cleanup at high memory pressure - **Unified Pool Usage**: - All endpoints now use pool (/html, /screenshot, /pdf, /execute_js, /md, /llm) - Fixed config signature mismatch (permanent browser matches endpoints) - get_default_browser_config() helper for consistency - **Configuration**: - Reduced idle_ttl: 1800s → 300s (30min → 5min) - Fixed port: 11234 → 11235 (match Gunicorn) **Performance Results** (from stress tests): - Memory: 10x reduction (500-700MB × N → 270MB permanent) - Latency: 30-50x faster (<100ms pool hits vs 3-5s startup) - Reuse: 100% for default config, 60%+ for variants - Capacity: 100+ concurrent requests (vs ~20 before) - Leak: 0 MB/cycle (stable across tests) **Test Infrastructure**: - 7-phase sequential test suite (tests/) - Docker stats integration + log analysis - Pool promotion verification - Memory leak detection - Full endpoint coverage Fixes memory issues reported in production deployments.
206 lines
7.0 KiB
Python
206 lines
7.0 KiB
Python
import dns.resolver
|
|
import logging
|
|
import yaml
|
|
import os
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from fastapi import Request
|
|
from typing import Dict, Optional
|
|
|
|
class TaskStatus(str, Enum):
|
|
PROCESSING = "processing"
|
|
FAILED = "failed"
|
|
COMPLETED = "completed"
|
|
|
|
class FilterType(str, Enum):
|
|
RAW = "raw"
|
|
FIT = "fit"
|
|
BM25 = "bm25"
|
|
LLM = "llm"
|
|
|
|
def load_config() -> Dict:
|
|
"""Load and return application configuration with environment variable overrides."""
|
|
config_path = Path(__file__).parent / "config.yml"
|
|
with open(config_path, "r") as config_file:
|
|
config = yaml.safe_load(config_file)
|
|
|
|
# Override LLM provider from environment if set
|
|
llm_provider = os.environ.get("LLM_PROVIDER")
|
|
if llm_provider:
|
|
config["llm"]["provider"] = llm_provider
|
|
logging.info(f"LLM provider overridden from environment: {llm_provider}")
|
|
|
|
# Also support direct API key from environment if the provider-specific key isn't set
|
|
llm_api_key = os.environ.get("LLM_API_KEY")
|
|
if llm_api_key and "api_key" not in config["llm"]:
|
|
config["llm"]["api_key"] = llm_api_key
|
|
logging.info("LLM API key loaded from LLM_API_KEY environment variable")
|
|
|
|
return config
|
|
|
|
def setup_logging(config: Dict) -> None:
|
|
"""Configure application logging."""
|
|
logging.basicConfig(
|
|
level=config["logging"]["level"],
|
|
format=config["logging"]["format"]
|
|
)
|
|
|
|
def get_base_url(request: Request) -> str:
|
|
"""Get base URL including scheme and host."""
|
|
return f"{request.url.scheme}://{request.url.netloc}"
|
|
|
|
def is_task_id(value: str) -> bool:
|
|
"""Check if the value matches task ID pattern."""
|
|
return value.startswith("llm_") and "_" in value
|
|
|
|
def datetime_handler(obj: any) -> Optional[str]:
|
|
"""Handle datetime serialization for JSON."""
|
|
if hasattr(obj, 'isoformat'):
|
|
return obj.isoformat()
|
|
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
|
|
|
|
def should_cleanup_task(created_at: str, ttl_seconds: int = 3600) -> bool:
|
|
"""Check if task should be cleaned up based on creation time."""
|
|
created = datetime.fromisoformat(created_at)
|
|
return (datetime.now() - created).total_seconds() > ttl_seconds
|
|
|
|
def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
|
|
"""Decode Redis hash data from bytes to strings."""
|
|
return {k.decode('utf-8'): v.decode('utf-8') for k, v in hash_data.items()}
|
|
|
|
|
|
|
|
def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]:
|
|
"""Get the appropriate API key based on the LLM provider.
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The API key if directly configured, otherwise None to let litellm handle it
|
|
"""
|
|
# Check if direct API key is configured (for backward compatibility)
|
|
if "api_key" in config["llm"]:
|
|
return config["llm"]["api_key"]
|
|
|
|
# Return None - litellm will automatically find the right environment variable
|
|
return None
|
|
|
|
|
|
def validate_llm_provider(config: Dict, provider: Optional[str] = None) -> tuple[bool, str]:
|
|
"""Validate that the LLM provider has an associated API key.
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
# If a direct API key is configured, validation passes
|
|
if "api_key" in config["llm"]:
|
|
return True, ""
|
|
|
|
# Otherwise, trust that litellm will find the appropriate environment variable
|
|
# We can't easily validate this without reimplementing litellm's logic
|
|
return True, ""
|
|
|
|
|
|
def get_llm_temperature(config: Dict, provider: Optional[str] = None) -> Optional[float]:
|
|
"""Get temperature setting based on the LLM provider.
|
|
|
|
Priority order:
|
|
1. Provider-specific environment variable (e.g., OPENAI_TEMPERATURE)
|
|
2. Global LLM_TEMPERATURE environment variable
|
|
3. None (to use litellm/provider defaults)
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The temperature setting if configured, otherwise None
|
|
"""
|
|
# Check provider-specific temperature first
|
|
if provider:
|
|
provider_name = provider.split('/')[0].upper()
|
|
provider_temp = os.environ.get(f"{provider_name}_TEMPERATURE")
|
|
if provider_temp:
|
|
try:
|
|
return float(provider_temp)
|
|
except ValueError:
|
|
logging.warning(f"Invalid temperature value for {provider_name}: {provider_temp}")
|
|
|
|
# Check global LLM_TEMPERATURE
|
|
global_temp = os.environ.get("LLM_TEMPERATURE")
|
|
if global_temp:
|
|
try:
|
|
return float(global_temp)
|
|
except ValueError:
|
|
logging.warning(f"Invalid global temperature value: {global_temp}")
|
|
|
|
# Return None to use litellm/provider defaults
|
|
return None
|
|
|
|
|
|
def get_llm_base_url(config: Dict, provider: Optional[str] = None) -> Optional[str]:
|
|
"""Get base URL setting based on the LLM provider.
|
|
|
|
Priority order:
|
|
1. Provider-specific environment variable (e.g., OPENAI_BASE_URL)
|
|
2. Global LLM_BASE_URL environment variable
|
|
3. None (to use default endpoints)
|
|
|
|
Args:
|
|
config: The application configuration dictionary
|
|
provider: Optional provider override (e.g., "openai/gpt-4")
|
|
|
|
Returns:
|
|
The base URL if configured, otherwise None
|
|
"""
|
|
# Check provider-specific base URL first
|
|
if provider:
|
|
provider_name = provider.split('/')[0].upper()
|
|
provider_url = os.environ.get(f"{provider_name}_BASE_URL")
|
|
if provider_url:
|
|
return provider_url
|
|
|
|
# Check global LLM_BASE_URL
|
|
return os.environ.get("LLM_BASE_URL")
|
|
|
|
|
|
def verify_email_domain(email: str) -> bool:
|
|
try:
|
|
domain = email.split('@')[1]
|
|
# Try to resolve MX records for the domain.
|
|
records = dns.resolver.resolve(domain, 'MX')
|
|
return True if records else False
|
|
except Exception as e:
|
|
return False
|
|
|
|
def get_container_memory_percent() -> float:
|
|
"""Get actual container memory usage vs limit (cgroup v1/v2 aware)."""
|
|
try:
|
|
# Try cgroup v2 first
|
|
usage_path = Path("/sys/fs/cgroup/memory.current")
|
|
limit_path = Path("/sys/fs/cgroup/memory.max")
|
|
if not usage_path.exists():
|
|
# Fall back to cgroup v1
|
|
usage_path = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
|
|
limit_path = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
|
|
|
|
usage = int(usage_path.read_text())
|
|
limit = int(limit_path.read_text())
|
|
|
|
# Handle unlimited (v2: "max", v1: > 1e18)
|
|
if limit > 1e18:
|
|
import psutil
|
|
limit = psutil.virtual_memory().total
|
|
|
|
return (usage / limit) * 100
|
|
except:
|
|
# Non-container or unsupported: fallback to host
|
|
import psutil
|
|
return psutil.virtual_memory().percent |