# crawler_pool.py - Smart browser pool with tiered management import asyncio, json, hashlib, time from contextlib import suppress from typing import Dict, Optional from crawl4ai import AsyncWebCrawler, BrowserConfig from utils import load_config, get_container_memory_percent import logging logger = logging.getLogger(__name__) CONFIG = load_config() # Pool tiers PERMANENT: Optional[AsyncWebCrawler] = None # Always-ready default browser HOT_POOL: Dict[str, AsyncWebCrawler] = {} # Frequent configs COLD_POOL: Dict[str, AsyncWebCrawler] = {} # Rare configs LAST_USED: Dict[str, float] = {} USAGE_COUNT: Dict[str, int] = {} LOCK = asyncio.Lock() # Config MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0) BASE_IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 300) DEFAULT_CONFIG_SIG = None # Cached sig for default config def _sig(cfg: BrowserConfig) -> str: """Generate config signature.""" payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",",":")) return hashlib.sha1(payload.encode()).hexdigest() def _is_default_config(sig: str) -> bool: """Check if config matches default.""" return sig == DEFAULT_CONFIG_SIG async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler: """Get crawler from pool with tiered strategy.""" sig = _sig(cfg) async with LOCK: # Check permanent browser for default config if PERMANENT and _is_default_config(sig): LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 logger.info("๐Ÿ”ฅ Using permanent browser") return PERMANENT # Check hot pool if sig in HOT_POOL: LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 logger.info(f"โ™จ๏ธ Using hot pool browser (sig={sig[:8]})") return HOT_POOL[sig] # Check cold pool (promote to hot if used 3+ times) if sig in COLD_POOL: LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 if USAGE_COUNT[sig] >= 3: logger.info(f"โฌ†๏ธ Promoting to hot pool (sig={sig[:8]}, count={USAGE_COUNT[sig]})") HOT_POOL[sig] = COLD_POOL.pop(sig) # Track promotion in monitor try: from monitor import get_monitor get_monitor().track_janitor_event("promote", sig, {"count": USAGE_COUNT[sig]}) except: pass return HOT_POOL[sig] logger.info(f"โ„๏ธ Using cold pool browser (sig={sig[:8]})") return COLD_POOL[sig] # Memory check before creating new mem_pct = get_container_memory_percent() if mem_pct >= MEM_LIMIT: logger.error(f"๐Ÿ’ฅ Memory pressure: {mem_pct:.1f}% >= {MEM_LIMIT}%") raise MemoryError(f"Memory at {mem_pct:.1f}%, refusing new browser") # Create new in cold pool logger.info(f"๐Ÿ†• Creating new browser in cold pool (sig={sig[:8]}, mem={mem_pct:.1f}%)") crawler = AsyncWebCrawler(config=cfg, thread_safe=False) await crawler.start() COLD_POOL[sig] = crawler LAST_USED[sig] = time.time() USAGE_COUNT[sig] = 1 return crawler async def init_permanent(cfg: BrowserConfig): """Initialize permanent default browser.""" global PERMANENT, DEFAULT_CONFIG_SIG async with LOCK: if PERMANENT: return DEFAULT_CONFIG_SIG = _sig(cfg) logger.info("๐Ÿ”ฅ Creating permanent default browser") PERMANENT = AsyncWebCrawler(config=cfg, thread_safe=False) await PERMANENT.start() LAST_USED[DEFAULT_CONFIG_SIG] = time.time() USAGE_COUNT[DEFAULT_CONFIG_SIG] = 0 async def close_all(): """Close all browsers.""" async with LOCK: tasks = [] if PERMANENT: tasks.append(PERMANENT.close()) tasks.extend([c.close() for c in HOT_POOL.values()]) tasks.extend([c.close() for c in COLD_POOL.values()]) await asyncio.gather(*tasks, return_exceptions=True) HOT_POOL.clear() COLD_POOL.clear() LAST_USED.clear() USAGE_COUNT.clear() async def janitor(): """Adaptive cleanup based on memory pressure.""" while True: mem_pct = get_container_memory_percent() # Adaptive intervals and TTLs if mem_pct > 80: interval, cold_ttl, hot_ttl = 10, 30, 120 elif mem_pct > 60: interval, cold_ttl, hot_ttl = 30, 60, 300 else: interval, cold_ttl, hot_ttl = 60, BASE_IDLE_TTL, BASE_IDLE_TTL * 2 await asyncio.sleep(interval) now = time.time() async with LOCK: # Clean cold pool for sig in list(COLD_POOL.keys()): if now - LAST_USED.get(sig, now) > cold_ttl: idle_time = now - LAST_USED[sig] logger.info(f"๐Ÿงน Closing cold browser (sig={sig[:8]}, idle={idle_time:.0f}s)") with suppress(Exception): await COLD_POOL[sig].close() COLD_POOL.pop(sig, None) LAST_USED.pop(sig, None) USAGE_COUNT.pop(sig, None) # Track in monitor try: from monitor import get_monitor get_monitor().track_janitor_event("close_cold", sig, {"idle_seconds": int(idle_time), "ttl": cold_ttl}) except: pass # Clean hot pool (more conservative) for sig in list(HOT_POOL.keys()): if now - LAST_USED.get(sig, now) > hot_ttl: idle_time = now - LAST_USED[sig] logger.info(f"๐Ÿงน Closing hot browser (sig={sig[:8]}, idle={idle_time:.0f}s)") with suppress(Exception): await HOT_POOL[sig].close() HOT_POOL.pop(sig, None) LAST_USED.pop(sig, None) USAGE_COUNT.pop(sig, None) # Track in monitor try: from monitor import get_monitor get_monitor().track_janitor_event("close_hot", sig, {"idle_seconds": int(idle_time), "ttl": hot_ttl}) except: pass # Log pool stats if mem_pct > 60: logger.info(f"๐Ÿ“Š Pool: hot={len(HOT_POOL)}, cold={len(COLD_POOL)}, mem={mem_pct:.1f}%")