Backend Improvements (11 fixes applied): Critical Fixes: - Add lock protection for browser pool access in monitor stats - Ensure async track_janitor_event across all call sites - Improve error handling in monitor request tracking (already in place) Important Fixes: - Replace fire-and-forget Redis with background persistence worker - Add time-based expiry for completed requests/errors (5min cleanup) - Implement input validation for monitor route parameters - Add 4s timeout to timeline updater to prevent hangs - Add warning when killing browsers with active requests - Implement monitor cleanup on shutdown with final persistence - Document memory estimates with TODO for actual tracking Frontend Enhancements: WebSocket Real-time Updates: - Add WebSocket endpoint at /monitor/ws for live monitoring - Implement auto-reconnect with exponential backoff (max 5 attempts) - Add graceful fallback to HTTP polling on WebSocket failure - Send comprehensive updates every 2 seconds (health, requests, browsers, timeline, events) UI/UX Improvements: - Add live connection status indicator with pulsing animation - Green "Live" = WebSocket connected - Yellow "Connecting..." = Attempting connection - Blue "Polling" = Fallback to HTTP polling - Red "Disconnected" = Connection failed - Restore original beautiful styling for all sections - Improve request table layout with flex-grow for URL column - Add browser type text labels alongside emojis - Add flex layout to browser section header Testing: - Add test-websocket.py for WebSocket validation - All 7 integration tests passing successfully Summary: 563 additions across 6 files
383 lines
15 KiB
Python
383 lines
15 KiB
Python
# monitor.py - Real-time monitoring stats with Redis persistence
|
|
import time
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime, timezone
|
|
from collections import deque
|
|
from redis import asyncio as aioredis
|
|
from utils import get_container_memory_percent
|
|
import psutil
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MonitorStats:
|
|
"""Tracks real-time server stats with Redis persistence."""
|
|
|
|
def __init__(self, redis: aioredis.Redis):
|
|
self.redis = redis
|
|
self.start_time = time.time()
|
|
|
|
# In-memory queues (fast reads, Redis backup)
|
|
self.active_requests: Dict[str, Dict] = {} # id -> request info
|
|
self.completed_requests: deque = deque(maxlen=100) # Last 100
|
|
self.janitor_events: deque = deque(maxlen=100)
|
|
self.errors: deque = deque(maxlen=100)
|
|
|
|
# Endpoint stats (persisted in Redis)
|
|
self.endpoint_stats: Dict[str, Dict] = {} # endpoint -> {count, total_time, errors, ...}
|
|
|
|
# Background persistence queue (max 10 pending persist requests)
|
|
self._persist_queue: asyncio.Queue = asyncio.Queue(maxsize=10)
|
|
self._persist_worker_task: Optional[asyncio.Task] = None
|
|
|
|
# Timeline data (5min window, 5s resolution = 60 points)
|
|
self.memory_timeline: deque = deque(maxlen=60)
|
|
self.requests_timeline: deque = deque(maxlen=60)
|
|
self.browser_timeline: deque = deque(maxlen=60)
|
|
|
|
async def track_request_start(self, request_id: str, endpoint: str, url: str, config: Dict = None):
|
|
"""Track new request start."""
|
|
req_info = {
|
|
"id": request_id,
|
|
"endpoint": endpoint,
|
|
"url": url[:100], # Truncate long URLs
|
|
"start_time": time.time(),
|
|
"config_sig": config.get("sig", "default") if config else "default",
|
|
"mem_start": psutil.Process().memory_info().rss / (1024 * 1024)
|
|
}
|
|
self.active_requests[request_id] = req_info
|
|
|
|
# Increment endpoint counter
|
|
if endpoint not in self.endpoint_stats:
|
|
self.endpoint_stats[endpoint] = {
|
|
"count": 0, "total_time": 0, "errors": 0,
|
|
"pool_hits": 0, "success": 0
|
|
}
|
|
self.endpoint_stats[endpoint]["count"] += 1
|
|
|
|
# Queue persistence (handled by background worker)
|
|
try:
|
|
self._persist_queue.put_nowait(True)
|
|
except asyncio.QueueFull:
|
|
logger.warning("Persistence queue full, skipping")
|
|
|
|
async def track_request_end(self, request_id: str, success: bool, error: str = None,
|
|
pool_hit: bool = True, status_code: int = 200):
|
|
"""Track request completion."""
|
|
if request_id not in self.active_requests:
|
|
return
|
|
|
|
req_info = self.active_requests.pop(request_id)
|
|
end_time = time.time()
|
|
elapsed = end_time - req_info["start_time"]
|
|
mem_end = psutil.Process().memory_info().rss / (1024 * 1024)
|
|
mem_delta = mem_end - req_info["mem_start"]
|
|
|
|
# Update stats
|
|
endpoint = req_info["endpoint"]
|
|
if endpoint in self.endpoint_stats:
|
|
self.endpoint_stats[endpoint]["total_time"] += elapsed
|
|
if success:
|
|
self.endpoint_stats[endpoint]["success"] += 1
|
|
else:
|
|
self.endpoint_stats[endpoint]["errors"] += 1
|
|
if pool_hit:
|
|
self.endpoint_stats[endpoint]["pool_hits"] += 1
|
|
|
|
# Add to completed queue
|
|
completed = {
|
|
**req_info,
|
|
"end_time": end_time,
|
|
"elapsed": round(elapsed, 2),
|
|
"mem_delta": round(mem_delta, 1),
|
|
"success": success,
|
|
"error": error,
|
|
"status_code": status_code,
|
|
"pool_hit": pool_hit
|
|
}
|
|
self.completed_requests.append(completed)
|
|
|
|
# Track errors
|
|
if not success and error:
|
|
self.errors.append({
|
|
"timestamp": end_time,
|
|
"endpoint": endpoint,
|
|
"url": req_info["url"],
|
|
"error": error,
|
|
"request_id": request_id
|
|
})
|
|
|
|
await self._persist_endpoint_stats()
|
|
|
|
async def track_janitor_event(self, event_type: str, sig: str, details: Dict):
|
|
"""Track janitor cleanup events."""
|
|
self.janitor_events.append({
|
|
"timestamp": time.time(),
|
|
"type": event_type, # "close_cold", "close_hot", "promote"
|
|
"sig": sig[:8],
|
|
"details": details
|
|
})
|
|
|
|
def _cleanup_old_entries(self, max_age_seconds: int = 300):
|
|
"""Remove entries older than max_age_seconds (default 5min)."""
|
|
now = time.time()
|
|
cutoff = now - max_age_seconds
|
|
|
|
# Clean completed requests
|
|
while self.completed_requests and self.completed_requests[0].get("end_time", 0) < cutoff:
|
|
self.completed_requests.popleft()
|
|
|
|
# Clean janitor events
|
|
while self.janitor_events and self.janitor_events[0].get("timestamp", 0) < cutoff:
|
|
self.janitor_events.popleft()
|
|
|
|
# Clean errors
|
|
while self.errors and self.errors[0].get("timestamp", 0) < cutoff:
|
|
self.errors.popleft()
|
|
|
|
async def update_timeline(self):
|
|
"""Update timeline data points (called every 5s)."""
|
|
now = time.time()
|
|
mem_pct = get_container_memory_percent()
|
|
|
|
# Clean old entries (keep last 5 minutes)
|
|
self._cleanup_old_entries(max_age_seconds=300)
|
|
|
|
# Count requests in last 5s
|
|
recent_reqs = sum(1 for req in self.completed_requests
|
|
if now - req.get("end_time", 0) < 5)
|
|
|
|
# Browser counts (acquire lock to prevent race conditions)
|
|
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LOCK
|
|
async with LOCK:
|
|
browser_count = {
|
|
"permanent": 1 if PERMANENT else 0,
|
|
"hot": len(HOT_POOL),
|
|
"cold": len(COLD_POOL)
|
|
}
|
|
|
|
self.memory_timeline.append({"time": now, "value": mem_pct})
|
|
self.requests_timeline.append({"time": now, "value": recent_reqs})
|
|
self.browser_timeline.append({"time": now, "browsers": browser_count})
|
|
|
|
async def _persist_endpoint_stats(self):
|
|
"""Persist endpoint stats to Redis."""
|
|
try:
|
|
await self.redis.set(
|
|
"monitor:endpoint_stats",
|
|
json.dumps(self.endpoint_stats),
|
|
ex=86400 # 24h TTL
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to persist endpoint stats: {e}")
|
|
|
|
async def _persistence_worker(self):
|
|
"""Background worker to persist stats to Redis."""
|
|
while True:
|
|
try:
|
|
await self._persist_queue.get()
|
|
await self._persist_endpoint_stats()
|
|
self._persist_queue.task_done()
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Persistence worker error: {e}")
|
|
|
|
def start_persistence_worker(self):
|
|
"""Start the background persistence worker."""
|
|
if not self._persist_worker_task:
|
|
self._persist_worker_task = asyncio.create_task(self._persistence_worker())
|
|
logger.info("Started persistence worker")
|
|
|
|
async def stop_persistence_worker(self):
|
|
"""Stop the background persistence worker."""
|
|
if self._persist_worker_task:
|
|
self._persist_worker_task.cancel()
|
|
try:
|
|
await self._persist_worker_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._persist_worker_task = None
|
|
logger.info("Stopped persistence worker")
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup on shutdown - persist final stats and stop workers."""
|
|
logger.info("Monitor cleanup starting...")
|
|
try:
|
|
# Persist final stats before shutdown
|
|
await self._persist_endpoint_stats()
|
|
# Stop background worker
|
|
await self.stop_persistence_worker()
|
|
logger.info("Monitor cleanup completed")
|
|
except Exception as e:
|
|
logger.error(f"Monitor cleanup error: {e}")
|
|
|
|
async def load_from_redis(self):
|
|
"""Load persisted stats from Redis."""
|
|
try:
|
|
data = await self.redis.get("monitor:endpoint_stats")
|
|
if data:
|
|
self.endpoint_stats = json.loads(data)
|
|
logger.info("Loaded endpoint stats from Redis")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load from Redis: {e}")
|
|
|
|
async def get_health_summary(self) -> Dict:
|
|
"""Get current system health snapshot."""
|
|
mem_pct = get_container_memory_percent()
|
|
cpu_pct = psutil.cpu_percent(interval=0.1)
|
|
|
|
# Network I/O (delta since last call)
|
|
net = psutil.net_io_counters()
|
|
|
|
# Pool status (acquire lock to prevent race conditions)
|
|
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LOCK
|
|
async with LOCK:
|
|
# TODO: Track actual browser process memory instead of estimates
|
|
# These are conservative estimates based on typical Chromium usage
|
|
permanent_mem = 270 if PERMANENT else 0 # Estimate: ~270MB for permanent browser
|
|
hot_mem = len(HOT_POOL) * 180 # Estimate: ~180MB per hot pool browser
|
|
cold_mem = len(COLD_POOL) * 180 # Estimate: ~180MB per cold pool browser
|
|
permanent_active = PERMANENT is not None
|
|
hot_count = len(HOT_POOL)
|
|
cold_count = len(COLD_POOL)
|
|
|
|
return {
|
|
"container": {
|
|
"memory_percent": round(mem_pct, 1),
|
|
"cpu_percent": round(cpu_pct, 1),
|
|
"network_sent_mb": round(net.bytes_sent / (1024**2), 2),
|
|
"network_recv_mb": round(net.bytes_recv / (1024**2), 2),
|
|
"uptime_seconds": int(time.time() - self.start_time)
|
|
},
|
|
"pool": {
|
|
"permanent": {"active": permanent_active, "memory_mb": permanent_mem},
|
|
"hot": {"count": hot_count, "memory_mb": hot_mem},
|
|
"cold": {"count": cold_count, "memory_mb": cold_mem},
|
|
"total_memory_mb": permanent_mem + hot_mem + cold_mem
|
|
},
|
|
"janitor": {
|
|
"next_cleanup_estimate": "adaptive", # Would need janitor state
|
|
"memory_pressure": "LOW" if mem_pct < 60 else "MEDIUM" if mem_pct < 80 else "HIGH"
|
|
}
|
|
}
|
|
|
|
def get_active_requests(self) -> List[Dict]:
|
|
"""Get list of currently active requests."""
|
|
now = time.time()
|
|
return [
|
|
{
|
|
**req,
|
|
"elapsed": round(now - req["start_time"], 1),
|
|
"status": "running"
|
|
}
|
|
for req in self.active_requests.values()
|
|
]
|
|
|
|
def get_completed_requests(self, limit: int = 50, filter_status: str = "all") -> List[Dict]:
|
|
"""Get recent completed requests."""
|
|
requests = list(self.completed_requests)[-limit:]
|
|
if filter_status == "success":
|
|
requests = [r for r in requests if r.get("success")]
|
|
elif filter_status == "error":
|
|
requests = [r for r in requests if not r.get("success")]
|
|
return requests
|
|
|
|
async def get_browser_list(self) -> List[Dict]:
|
|
"""Get detailed browser pool information."""
|
|
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LAST_USED, USAGE_COUNT, DEFAULT_CONFIG_SIG, LOCK
|
|
|
|
browsers = []
|
|
now = time.time()
|
|
|
|
# Acquire lock to prevent race conditions during iteration
|
|
async with LOCK:
|
|
if PERMANENT:
|
|
browsers.append({
|
|
"type": "permanent",
|
|
"sig": DEFAULT_CONFIG_SIG[:8] if DEFAULT_CONFIG_SIG else "unknown",
|
|
"age_seconds": int(now - self.start_time),
|
|
"last_used_seconds": int(now - LAST_USED.get(DEFAULT_CONFIG_SIG, now)),
|
|
"memory_mb": 270,
|
|
"hits": USAGE_COUNT.get(DEFAULT_CONFIG_SIG, 0),
|
|
"killable": False
|
|
})
|
|
|
|
for sig, crawler in HOT_POOL.items():
|
|
browsers.append({
|
|
"type": "hot",
|
|
"sig": sig[:8],
|
|
"age_seconds": int(now - self.start_time), # Approximation
|
|
"last_used_seconds": int(now - LAST_USED.get(sig, now)),
|
|
"memory_mb": 180, # Estimate
|
|
"hits": USAGE_COUNT.get(sig, 0),
|
|
"killable": True
|
|
})
|
|
|
|
for sig, crawler in COLD_POOL.items():
|
|
browsers.append({
|
|
"type": "cold",
|
|
"sig": sig[:8],
|
|
"age_seconds": int(now - self.start_time),
|
|
"last_used_seconds": int(now - LAST_USED.get(sig, now)),
|
|
"memory_mb": 180,
|
|
"hits": USAGE_COUNT.get(sig, 0),
|
|
"killable": True
|
|
})
|
|
|
|
return browsers
|
|
|
|
def get_endpoint_stats_summary(self) -> Dict[str, Dict]:
|
|
"""Get aggregated endpoint statistics."""
|
|
summary = {}
|
|
for endpoint, stats in self.endpoint_stats.items():
|
|
count = stats["count"]
|
|
avg_time = (stats["total_time"] / count) if count > 0 else 0
|
|
success_rate = (stats["success"] / count * 100) if count > 0 else 0
|
|
pool_hit_rate = (stats["pool_hits"] / count * 100) if count > 0 else 0
|
|
|
|
summary[endpoint] = {
|
|
"count": count,
|
|
"avg_latency_ms": round(avg_time * 1000, 1),
|
|
"success_rate_percent": round(success_rate, 1),
|
|
"pool_hit_rate_percent": round(pool_hit_rate, 1),
|
|
"errors": stats["errors"]
|
|
}
|
|
return summary
|
|
|
|
def get_timeline_data(self, metric: str, window: str = "5m") -> Dict:
|
|
"""Get timeline data for charts."""
|
|
# For now, only 5m window supported
|
|
if metric == "memory":
|
|
data = list(self.memory_timeline)
|
|
elif metric == "requests":
|
|
data = list(self.requests_timeline)
|
|
elif metric == "browsers":
|
|
data = list(self.browser_timeline)
|
|
else:
|
|
return {"timestamps": [], "values": []}
|
|
|
|
return {
|
|
"timestamps": [int(d["time"]) for d in data],
|
|
"values": [d.get("value", d.get("browsers")) for d in data]
|
|
}
|
|
|
|
def get_janitor_log(self, limit: int = 100) -> List[Dict]:
|
|
"""Get recent janitor events."""
|
|
return list(self.janitor_events)[-limit:]
|
|
|
|
def get_errors_log(self, limit: int = 100) -> List[Dict]:
|
|
"""Get recent errors."""
|
|
return list(self.errors)[-limit:]
|
|
|
|
# Global instance (initialized in server.py)
|
|
monitor_stats: Optional[MonitorStats] = None
|
|
|
|
def get_monitor() -> MonitorStats:
|
|
"""Get global monitor instance."""
|
|
if monitor_stats is None:
|
|
raise RuntimeError("Monitor not initialized")
|
|
return monitor_stats
|