Backend Improvements (11 fixes applied): Critical Fixes: - Add lock protection for browser pool access in monitor stats - Ensure async track_janitor_event across all call sites - Improve error handling in monitor request tracking (already in place) Important Fixes: - Replace fire-and-forget Redis with background persistence worker - Add time-based expiry for completed requests/errors (5min cleanup) - Implement input validation for monitor route parameters - Add 4s timeout to timeline updater to prevent hangs - Add warning when killing browsers with active requests - Implement monitor cleanup on shutdown with final persistence - Document memory estimates with TODO for actual tracking Frontend Enhancements: WebSocket Real-time Updates: - Add WebSocket endpoint at /monitor/ws for live monitoring - Implement auto-reconnect with exponential backoff (max 5 attempts) - Add graceful fallback to HTTP polling on WebSocket failure - Send comprehensive updates every 2 seconds (health, requests, browsers, timeline, events) UI/UX Improvements: - Add live connection status indicator with pulsing animation - Green "Live" = WebSocket connected - Yellow "Connecting..." = Attempting connection - Blue "Polling" = Fallback to HTTP polling - Red "Disconnected" = Connection failed - Restore original beautiful styling for all sections - Improve request table layout with flex-grow for URL column - Add browser type text labels alongside emojis - Add flex layout to browser section header Testing: - Add test-websocket.py for WebSocket validation - All 7 integration tests passing successfully Summary: 563 additions across 6 files
171 lines
6.5 KiB
Python
171 lines
6.5 KiB
Python
# crawler_pool.py - Smart browser pool with tiered management
|
|
import asyncio, json, hashlib, time
|
|
from contextlib import suppress
|
|
from typing import Dict, Optional
|
|
from crawl4ai import AsyncWebCrawler, BrowserConfig
|
|
from utils import load_config, get_container_memory_percent
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
CONFIG = load_config()
|
|
|
|
# Pool tiers
|
|
PERMANENT: Optional[AsyncWebCrawler] = None # Always-ready default browser
|
|
HOT_POOL: Dict[str, AsyncWebCrawler] = {} # Frequent configs
|
|
COLD_POOL: Dict[str, AsyncWebCrawler] = {} # Rare configs
|
|
LAST_USED: Dict[str, float] = {}
|
|
USAGE_COUNT: Dict[str, int] = {}
|
|
LOCK = asyncio.Lock()
|
|
|
|
# Config
|
|
MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0)
|
|
BASE_IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 300)
|
|
DEFAULT_CONFIG_SIG = None # Cached sig for default config
|
|
|
|
def _sig(cfg: BrowserConfig) -> str:
|
|
"""Generate config signature."""
|
|
payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",",":"))
|
|
return hashlib.sha1(payload.encode()).hexdigest()
|
|
|
|
def _is_default_config(sig: str) -> bool:
|
|
"""Check if config matches default."""
|
|
return sig == DEFAULT_CONFIG_SIG
|
|
|
|
async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler:
|
|
"""Get crawler from pool with tiered strategy."""
|
|
sig = _sig(cfg)
|
|
async with LOCK:
|
|
# Check permanent browser for default config
|
|
if PERMANENT and _is_default_config(sig):
|
|
LAST_USED[sig] = time.time()
|
|
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
|
|
logger.info("🔥 Using permanent browser")
|
|
return PERMANENT
|
|
|
|
# Check hot pool
|
|
if sig in HOT_POOL:
|
|
LAST_USED[sig] = time.time()
|
|
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
|
|
logger.info(f"♨️ Using hot pool browser (sig={sig[:8]})")
|
|
return HOT_POOL[sig]
|
|
|
|
# Check cold pool (promote to hot if used 3+ times)
|
|
if sig in COLD_POOL:
|
|
LAST_USED[sig] = time.time()
|
|
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
|
|
|
|
if USAGE_COUNT[sig] >= 3:
|
|
logger.info(f"⬆️ Promoting to hot pool (sig={sig[:8]}, count={USAGE_COUNT[sig]})")
|
|
HOT_POOL[sig] = COLD_POOL.pop(sig)
|
|
|
|
# Track promotion in monitor
|
|
try:
|
|
from monitor import get_monitor
|
|
await get_monitor().track_janitor_event("promote", sig, {"count": USAGE_COUNT[sig]})
|
|
except:
|
|
pass
|
|
|
|
return HOT_POOL[sig]
|
|
|
|
logger.info(f"❄️ Using cold pool browser (sig={sig[:8]})")
|
|
return COLD_POOL[sig]
|
|
|
|
# Memory check before creating new
|
|
mem_pct = get_container_memory_percent()
|
|
if mem_pct >= MEM_LIMIT:
|
|
logger.error(f"💥 Memory pressure: {mem_pct:.1f}% >= {MEM_LIMIT}%")
|
|
raise MemoryError(f"Memory at {mem_pct:.1f}%, refusing new browser")
|
|
|
|
# Create new in cold pool
|
|
logger.info(f"🆕 Creating new browser in cold pool (sig={sig[:8]}, mem={mem_pct:.1f}%)")
|
|
crawler = AsyncWebCrawler(config=cfg, thread_safe=False)
|
|
await crawler.start()
|
|
COLD_POOL[sig] = crawler
|
|
LAST_USED[sig] = time.time()
|
|
USAGE_COUNT[sig] = 1
|
|
return crawler
|
|
|
|
async def init_permanent(cfg: BrowserConfig):
|
|
"""Initialize permanent default browser."""
|
|
global PERMANENT, DEFAULT_CONFIG_SIG
|
|
async with LOCK:
|
|
if PERMANENT:
|
|
return
|
|
DEFAULT_CONFIG_SIG = _sig(cfg)
|
|
logger.info("🔥 Creating permanent default browser")
|
|
PERMANENT = AsyncWebCrawler(config=cfg, thread_safe=False)
|
|
await PERMANENT.start()
|
|
LAST_USED[DEFAULT_CONFIG_SIG] = time.time()
|
|
USAGE_COUNT[DEFAULT_CONFIG_SIG] = 0
|
|
|
|
async def close_all():
|
|
"""Close all browsers."""
|
|
async with LOCK:
|
|
tasks = []
|
|
if PERMANENT:
|
|
tasks.append(PERMANENT.close())
|
|
tasks.extend([c.close() for c in HOT_POOL.values()])
|
|
tasks.extend([c.close() for c in COLD_POOL.values()])
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
HOT_POOL.clear()
|
|
COLD_POOL.clear()
|
|
LAST_USED.clear()
|
|
USAGE_COUNT.clear()
|
|
|
|
async def janitor():
|
|
"""Adaptive cleanup based on memory pressure."""
|
|
while True:
|
|
mem_pct = get_container_memory_percent()
|
|
|
|
# Adaptive intervals and TTLs
|
|
if mem_pct > 80:
|
|
interval, cold_ttl, hot_ttl = 10, 30, 120
|
|
elif mem_pct > 60:
|
|
interval, cold_ttl, hot_ttl = 30, 60, 300
|
|
else:
|
|
interval, cold_ttl, hot_ttl = 60, BASE_IDLE_TTL, BASE_IDLE_TTL * 2
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
now = time.time()
|
|
async with LOCK:
|
|
# Clean cold pool
|
|
for sig in list(COLD_POOL.keys()):
|
|
if now - LAST_USED.get(sig, now) > cold_ttl:
|
|
idle_time = now - LAST_USED[sig]
|
|
logger.info(f"🧹 Closing cold browser (sig={sig[:8]}, idle={idle_time:.0f}s)")
|
|
with suppress(Exception):
|
|
await COLD_POOL[sig].close()
|
|
COLD_POOL.pop(sig, None)
|
|
LAST_USED.pop(sig, None)
|
|
USAGE_COUNT.pop(sig, None)
|
|
|
|
# Track in monitor
|
|
try:
|
|
from monitor import get_monitor
|
|
await get_monitor().track_janitor_event("close_cold", sig, {"idle_seconds": int(idle_time), "ttl": cold_ttl})
|
|
except:
|
|
pass
|
|
|
|
# Clean hot pool (more conservative)
|
|
for sig in list(HOT_POOL.keys()):
|
|
if now - LAST_USED.get(sig, now) > hot_ttl:
|
|
idle_time = now - LAST_USED[sig]
|
|
logger.info(f"🧹 Closing hot browser (sig={sig[:8]}, idle={idle_time:.0f}s)")
|
|
with suppress(Exception):
|
|
await HOT_POOL[sig].close()
|
|
HOT_POOL.pop(sig, None)
|
|
LAST_USED.pop(sig, None)
|
|
USAGE_COUNT.pop(sig, None)
|
|
|
|
# Track in monitor
|
|
try:
|
|
from monitor import get_monitor
|
|
await get_monitor().track_janitor_event("close_hot", sig, {"idle_seconds": int(idle_time), "ttl": hot_ttl})
|
|
except:
|
|
pass
|
|
|
|
# Log pool stats
|
|
if mem_pct > 60:
|
|
logger.info(f"📊 Pool: hot={len(HOT_POOL)}, cold={len(COLD_POOL)}, mem={mem_pct:.1f}%")
|