73 lines
2.8 KiB
Python
73 lines
2.8 KiB
Python
# crawler_pool.py (new file)
|
||
import asyncio, json, hashlib, time, psutil
|
||
from contextlib import suppress
|
||
from typing import Dict
|
||
from crawl4ai import AsyncWebCrawler, BrowserConfig, BrowserAdapter
|
||
from typing import Dict ,Optional
|
||
from utils import load_config
|
||
|
||
CONFIG = load_config()
|
||
|
||
POOL: Dict[str, AsyncWebCrawler] = {}
|
||
LAST_USED: Dict[str, float] = {}
|
||
LOCK = asyncio.Lock()
|
||
|
||
MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0) # % RAM – refuse new browsers above this
|
||
IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 1800) # close if unused for 30 min
|
||
|
||
|
||
def _sig(cfg: BrowserConfig, adapter: Optional[BrowserAdapter] = None) -> str:
|
||
try:
|
||
config_payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",", ":"))
|
||
except (TypeError, ValueError):
|
||
# Fallback to string representation if JSON serialization fails
|
||
config_payload = str(cfg.to_dict())
|
||
adapter_name = adapter.__class__.__name__ if adapter else "PlaywrightAdapter"
|
||
payload = f"{config_payload}:{adapter_name}"
|
||
return hashlib.sha1(payload.encode()).hexdigest()
|
||
|
||
|
||
async def get_crawler(
|
||
cfg: BrowserConfig, adapter: Optional[BrowserAdapter] = None
|
||
) -> AsyncWebCrawler:
|
||
sig = None
|
||
try:
|
||
sig = _sig(cfg)
|
||
async with LOCK:
|
||
if sig in POOL:
|
||
LAST_USED[sig] = time.time();
|
||
return POOL[sig]
|
||
if psutil.virtual_memory().percent >= MEM_LIMIT:
|
||
raise MemoryError("RAM pressure – new browser denied")
|
||
crawler = AsyncWebCrawler(config=cfg, thread_safe=False)
|
||
await crawler.start()
|
||
POOL[sig] = crawler; LAST_USED[sig] = time.time()
|
||
return crawler
|
||
except MemoryError as e:
|
||
raise MemoryError(f"RAM pressure – new browser denied: {e}")
|
||
except Exception as e:
|
||
raise RuntimeError(f"Failed to start browser: {e}")
|
||
finally:
|
||
if sig:
|
||
if sig in POOL:
|
||
LAST_USED[sig] = time.time()
|
||
else:
|
||
# If we failed to start the browser, we should remove it from the pool
|
||
POOL.pop(sig, None)
|
||
LAST_USED.pop(sig, None)
|
||
# If we failed to start the browser, we should remove it from the pool
|
||
async def close_all():
|
||
async with LOCK:
|
||
await asyncio.gather(*(c.close() for c in POOL.values()), return_exceptions=True)
|
||
POOL.clear(); LAST_USED.clear()
|
||
|
||
async def janitor():
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
now = time.time()
|
||
async with LOCK:
|
||
for sig, crawler in list(POOL.items()):
|
||
if now - LAST_USED[sig] > IDLE_TTL:
|
||
with suppress(Exception): await crawler.close()
|
||
POOL.pop(sig, None); LAST_USED.pop(sig, None)
|