feat(monitor): add real-time crawler monitoring system with memory management

Implements a comprehensive monitoring and visualization system for tracking web crawler operations in real-time. The system includes:
- Terminal-based dashboard with rich UI for displaying task statuses
- Memory pressure monitoring and adaptive dispatch control
- Queue statistics and performance metrics tracking
- Detailed task progress visualization
- Stress testing framework for memory management

This addition helps operators track crawler performance and manage memory usage more effectively.
This commit is contained in:
UncleCode
2025-03-12 19:05:24 +08:00
parent 9547bada3a
commit 1630fbdafe
8 changed files with 1956 additions and 321 deletions

View File

@@ -0,0 +1,168 @@
"""
Test script for the CrawlerMonitor component.
This script simulates a crawler with multiple tasks to demonstrate the real-time monitoring capabilities.
"""
import time
import uuid
import random
import threading
import sys
import os
# Add the parent directory to the path to import crawl4ai
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from crawl4ai.components.crawler_monitor import CrawlerMonitor
from crawl4ai.models import CrawlStatus
def simulate_crawler_task(monitor, task_id, url, simulate_failure=False):
"""Simulate a crawler task with different states."""
# Task starts in the QUEUED state
wait_time = random.uniform(0.5, 3.0)
time.sleep(wait_time)
# Update to IN_PROGRESS state
monitor.update_task(
task_id=task_id,
status=CrawlStatus.IN_PROGRESS,
start_time=time.time(),
wait_time=wait_time
)
# Simulate task running
process_time = random.uniform(1.0, 5.0)
for i in range(int(process_time * 2)):
# Simulate memory usage changes
memory_usage = random.uniform(5.0, 25.0)
monitor.update_task(
task_id=task_id,
memory_usage=memory_usage,
peak_memory=max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0))
)
time.sleep(0.5)
# Update to COMPLETED or FAILED state
if simulate_failure and random.random() < 0.8: # 80% chance of failure if simulate_failure is True
monitor.update_task(
task_id=task_id,
status=CrawlStatus.FAILED,
end_time=time.time(),
error_message="Simulated failure: Connection timeout",
memory_usage=0.0
)
else:
monitor.update_task(
task_id=task_id,
status=CrawlStatus.COMPLETED,
end_time=time.time(),
memory_usage=0.0
)
def update_queue_stats(monitor, num_queued_tasks):
"""Update queue statistics periodically."""
while monitor.is_running:
queued_tasks = [
task for task_id, task in monitor.get_all_task_stats().items()
if task["status"] == CrawlStatus.QUEUED.name
]
total_queued = len(queued_tasks)
if total_queued > 0:
current_time = time.time()
wait_times = [
current_time - task.get("enqueue_time", current_time)
for task in queued_tasks
]
highest_wait_time = max(wait_times) if wait_times else 0.0
avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0
else:
highest_wait_time = 0.0
avg_wait_time = 0.0
monitor.update_queue_statistics(
total_queued=total_queued,
highest_wait_time=highest_wait_time,
avg_wait_time=avg_wait_time
)
# Simulate memory pressure based on number of active tasks
active_tasks = len([
task for task_id, task in monitor.get_all_task_stats().items()
if task["status"] == CrawlStatus.IN_PROGRESS.name
])
if active_tasks > 8:
monitor.update_memory_status("CRITICAL")
elif active_tasks > 4:
monitor.update_memory_status("PRESSURE")
else:
monitor.update_memory_status("NORMAL")
time.sleep(1.0)
def test_crawler_monitor():
"""Test the CrawlerMonitor with simulated crawler tasks."""
# Total number of URLs to crawl
total_urls = 50
# Initialize the monitor
monitor = CrawlerMonitor(urls_total=total_urls, refresh_rate=0.5)
# Start the monitor
monitor.start()
# Start thread to update queue statistics
queue_stats_thread = threading.Thread(target=update_queue_stats, args=(monitor, total_urls))
queue_stats_thread.daemon = True
queue_stats_thread.start()
try:
# Create task threads
threads = []
for i in range(total_urls):
task_id = str(uuid.uuid4())
url = f"https://example.com/page{i}"
# Add task to monitor
monitor.add_task(task_id, url)
# Determine if this task should simulate failure
simulate_failure = (i % 10 == 0) # Every 10th task
# Create and start thread for this task
thread = threading.Thread(
target=simulate_crawler_task,
args=(monitor, task_id, url, simulate_failure)
)
thread.daemon = True
threads.append(thread)
# Start threads with delay to simulate tasks being added over time
batch_size = 5
for i in range(0, len(threads), batch_size):
batch = threads[i:i+batch_size]
for thread in batch:
thread.start()
time.sleep(0.5) # Small delay between starting threads
# Wait a bit before starting the next batch
time.sleep(2.0)
# Wait for all threads to complete
for thread in threads:
thread.join()
# Keep monitor running a bit longer to see the final state
time.sleep(5.0)
except KeyboardInterrupt:
print("\nTest interrupted by user")
finally:
# Stop the monitor
monitor.stop()
print("\nCrawler monitor test completed")
if __name__ == "__main__":
test_crawler_monitor()

View File

@@ -0,0 +1,410 @@
import asyncio
import time
import psutil
import logging
import random
from typing import List, Dict
import uuid
import sys
import os
# Import your crawler components
from crawl4ai.models import DisplayMode, CrawlStatus, CrawlResult
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig, CacheMode
from crawl4ai import AsyncWebCrawler
from crawl4ai import MemoryAdaptiveDispatcher, CrawlerMonitor
# Global configuration
STREAM = False # Toggle between streaming and non-streaming modes
# Configure logging to file only (to avoid breaking the rich display)
os.makedirs("logs", exist_ok=True)
file_handler = logging.FileHandler("logs/memory_stress_test.log")
file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
# Root logger - only to file, not console
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
# Our test logger also writes to file only
logger = logging.getLogger("memory_stress_test")
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.propagate = False # Don't propagate to root logger
# Create a memory restrictor to simulate limited memory environment
class MemorySimulator:
def __init__(self, target_percent: float = 85.0, aggressive: bool = False):
"""Simulates memory pressure by allocating memory"""
self.target_percent = target_percent
self.memory_blocks: List[bytearray] = []
self.aggressive = aggressive
def apply_pressure(self, additional_percent: float = 0.0):
"""Fill memory until we reach target percentage"""
current_percent = psutil.virtual_memory().percent
target = self.target_percent + additional_percent
if current_percent >= target:
return # Already at target
logger.info(f"Current memory: {current_percent}%, target: {target}%")
# Calculate how much memory we need to allocate
total_memory = psutil.virtual_memory().total
target_usage = (target / 100.0) * total_memory
current_usage = (current_percent / 100.0) * total_memory
bytes_to_allocate = int(target_usage - current_usage)
if bytes_to_allocate <= 0:
return
# Allocate in smaller chunks to avoid overallocation
if self.aggressive:
# Use larger chunks for faster allocation in aggressive mode
chunk_size = min(bytes_to_allocate, 200 * 1024 * 1024) # 200MB chunks
else:
chunk_size = min(bytes_to_allocate, 50 * 1024 * 1024) # 50MB chunks
try:
logger.info(f"Allocating {chunk_size / (1024 * 1024):.1f}MB to reach target memory usage")
self.memory_blocks.append(bytearray(chunk_size))
time.sleep(0.5) # Give system time to register the allocation
except MemoryError:
logger.warning("Unable to allocate more memory")
def release_pressure(self, percent: float = None):
"""
Release allocated memory
If percent is specified, release that percentage of blocks
"""
if not self.memory_blocks:
return
if percent is None:
# Release all
logger.info(f"Releasing all {len(self.memory_blocks)} memory blocks")
self.memory_blocks.clear()
else:
# Release specified percentage
blocks_to_release = int(len(self.memory_blocks) * (percent / 100.0))
if blocks_to_release > 0:
logger.info(f"Releasing {blocks_to_release} of {len(self.memory_blocks)} memory blocks ({percent}%)")
self.memory_blocks = self.memory_blocks[blocks_to_release:]
def spike_pressure(self, duration: float = 5.0):
"""
Create a temporary spike in memory pressure then release
Useful for forcing requeues
"""
logger.info(f"Creating memory pressure spike for {duration} seconds")
# Save current blocks count
initial_blocks = len(self.memory_blocks)
# Create spike with extra 5%
self.apply_pressure(additional_percent=5.0)
# Schedule release after duration
asyncio.create_task(self._delayed_release(duration, initial_blocks))
async def _delayed_release(self, delay: float, target_blocks: int):
"""Helper for spike_pressure - releases extra blocks after delay"""
await asyncio.sleep(delay)
# Remove blocks added since spike started
if len(self.memory_blocks) > target_blocks:
logger.info(f"Releasing memory spike ({len(self.memory_blocks) - target_blocks} blocks)")
self.memory_blocks = self.memory_blocks[:target_blocks]
# Test statistics collector
class TestResults:
def __init__(self):
self.start_time = time.time()
self.completed_urls: List[str] = []
self.failed_urls: List[str] = []
self.requeued_count = 0
self.memory_warnings = 0
self.max_memory_usage = 0.0
self.max_queue_size = 0
self.max_wait_time = 0.0
self.url_to_attempt: Dict[str, int] = {} # Track retries per URL
def log_summary(self):
duration = time.time() - self.start_time
logger.info("===== TEST SUMMARY =====")
logger.info(f"Stream mode: {'ON' if STREAM else 'OFF'}")
logger.info(f"Total duration: {duration:.1f} seconds")
logger.info(f"Completed URLs: {len(self.completed_urls)}")
logger.info(f"Failed URLs: {len(self.failed_urls)}")
logger.info(f"Requeue events: {self.requeued_count}")
logger.info(f"Memory warnings: {self.memory_warnings}")
logger.info(f"Max memory usage: {self.max_memory_usage:.1f}%")
logger.info(f"Max queue size: {self.max_queue_size}")
logger.info(f"Max wait time: {self.max_wait_time:.1f} seconds")
# Log URLs with multiple attempts
retried_urls = {url: count for url, count in self.url_to_attempt.items() if count > 1}
if retried_urls:
logger.info(f"URLs with retries: {len(retried_urls)}")
# Log the top 5 most retried
top_retries = sorted(retried_urls.items(), key=lambda x: x[1], reverse=True)[:5]
for url, count in top_retries:
logger.info(f" URL {url[-30:]} had {count} attempts")
# Write summary to a separate human-readable file
with open("logs/test_summary.txt", "w") as f:
f.write(f"Stream mode: {'ON' if STREAM else 'OFF'}\n")
f.write(f"Total duration: {duration:.1f} seconds\n")
f.write(f"Completed URLs: {len(self.completed_urls)}\n")
f.write(f"Failed URLs: {len(self.failed_urls)}\n")
f.write(f"Requeue events: {self.requeued_count}\n")
f.write(f"Memory warnings: {self.memory_warnings}\n")
f.write(f"Max memory usage: {self.max_memory_usage:.1f}%\n")
f.write(f"Max queue size: {self.max_queue_size}\n")
f.write(f"Max wait time: {self.max_wait_time:.1f} seconds\n")
# Custom monitor with stats tracking
# Custom monitor that extends CrawlerMonitor with test-specific tracking
class StressTestMonitor(CrawlerMonitor):
def __init__(self, test_results: TestResults, **kwargs):
# Initialize the parent CrawlerMonitor
super().__init__(**kwargs)
self.test_results = test_results
def update_memory_status(self, status: str):
if status != self.memory_status:
logger.info(f"Memory status changed: {self.memory_status} -> {status}")
if "CRITICAL" in status or "PRESSURE" in status:
self.test_results.memory_warnings += 1
# Track peak memory usage in test results
current_memory = psutil.virtual_memory().percent
self.test_results.max_memory_usage = max(self.test_results.max_memory_usage, current_memory)
# Call parent method to update the dashboard
super().update_memory_status(status)
def update_queue_statistics(self, total_queued: int, highest_wait_time: float, avg_wait_time: float):
# Track queue metrics in test results
self.test_results.max_queue_size = max(self.test_results.max_queue_size, total_queued)
self.test_results.max_wait_time = max(self.test_results.max_wait_time, highest_wait_time)
# Call parent method to update the dashboard
super().update_queue_statistics(total_queued, highest_wait_time, avg_wait_time)
def update_task(self, task_id: str, **kwargs):
# Track URL status changes for test results
if task_id in self.stats:
old_status = self.stats[task_id].status
# If this is a requeue event (requeued due to memory pressure)
if 'error_message' in kwargs and 'requeued' in kwargs['error_message']:
if not hasattr(self.stats[task_id], 'counted_requeue') or not self.stats[task_id].counted_requeue:
self.test_results.requeued_count += 1
self.stats[task_id].counted_requeue = True
# Track completion status for test results
if 'status' in kwargs:
new_status = kwargs['status']
if old_status != new_status:
if new_status == CrawlStatus.COMPLETED:
if task_id not in self.test_results.completed_urls:
self.test_results.completed_urls.append(task_id)
elif new_status == CrawlStatus.FAILED:
if task_id not in self.test_results.failed_urls:
self.test_results.failed_urls.append(task_id)
# Call parent method to update the dashboard
super().update_task(task_id, **kwargs)
self.live.update(self._create_table())
# Generate test URLs - use example.com with unique paths to avoid browser caching
def generate_test_urls(count: int) -> List[str]:
urls = []
for i in range(count):
# Add random path and query parameters to create unique URLs
path = f"/path/{uuid.uuid4()}"
query = f"?test={i}&random={random.randint(1, 100000)}"
urls.append(f"https://example.com{path}{query}")
return urls
# Process result callback
async def process_result(result, test_results: TestResults):
# Track attempt counts
if result.url not in test_results.url_to_attempt:
test_results.url_to_attempt[result.url] = 1
else:
test_results.url_to_attempt[result.url] += 1
if "requeued" in result.error_message:
test_results.requeued_count += 1
logger.debug(f"Requeued due to memory pressure: {result.url}")
elif result.success:
test_results.completed_urls.append(result.url)
logger.debug(f"Successfully processed: {result.url}")
else:
test_results.failed_urls.append(result.url)
logger.warning(f"Failed to process: {result.url} - {result.error_message}")
# Process multiple results (used in non-streaming mode)
async def process_results(results, test_results: TestResults):
for result in results:
await process_result(result, test_results)
# Main test function for extreme memory pressure simulation
async def run_memory_stress_test(
url_count: int = 100,
target_memory_percent: float = 92.0, # Push to dangerous levels
chunk_size: int = 20, # Larger chunks for more chaos
aggressive: bool = False,
spikes: bool = True
):
test_results = TestResults()
memory_simulator = MemorySimulator(target_percent=target_memory_percent, aggressive=aggressive)
logger.info(f"Starting stress test with {url_count} URLs in {'STREAM' if STREAM else 'NON-STREAM'} mode")
logger.info(f"Target memory usage: {target_memory_percent}%")
# First, elevate memory usage to create pressure
logger.info("Creating initial memory pressure...")
memory_simulator.apply_pressure()
# Create test URLs in chunks to simulate real-world crawling where URLs are discovered
all_urls = generate_test_urls(url_count)
url_chunks = [all_urls[i:i+chunk_size] for i in range(0, len(all_urls), chunk_size)]
# Set up the crawler components - low memory thresholds to create more requeues
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
verbose=False,
stream=STREAM # Use the global STREAM variable to set mode
)
# Create monitor with reference to test results
monitor = StressTestMonitor(
test_results=test_results,
display_mode=DisplayMode.DETAILED,
max_visible_rows=20,
total_urls=url_count # Pass total URLs count
)
# Create dispatcher with EXTREME settings - pure survival mode
# These settings are designed to create a memory battleground
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=63.0, # Start throttling at just 60% memory
critical_threshold_percent=70.0, # Start requeuing at 70% - incredibly aggressive
recovery_threshold_percent=55.0, # Only resume normal ops when plenty of memory available
check_interval=0.1, # Check extremely frequently (100ms)
max_session_permit=20 if aggressive else 10, # Double the concurrent sessions - pure chaos
fairness_timeout=10.0, # Extremely low timeout - rapid priority changes
monitor=monitor
)
# Set up spike schedule if enabled
if spikes:
spike_intervals = []
# Create 3-5 random spike times
num_spikes = random.randint(3, 5)
for _ in range(num_spikes):
# Schedule spikes at random chunks
chunk_index = random.randint(1, len(url_chunks) - 1)
spike_intervals.append(chunk_index)
logger.info(f"Scheduled memory spikes at chunks: {spike_intervals}")
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
# Process URLs in chunks to simulate discovering URLs over time
for chunk_index, url_chunk in enumerate(url_chunks):
logger.info(f"Processing chunk {chunk_index+1}/{len(url_chunks)} ({len(url_chunk)} URLs)")
# Regular pressure increases
if chunk_index % 2 == 0:
logger.info("Increasing memory pressure...")
memory_simulator.apply_pressure()
# Memory spike if scheduled for this chunk
if spikes and chunk_index in spike_intervals:
logger.info(f"⚠️ CREATING MASSIVE MEMORY SPIKE at chunk {chunk_index+1} ⚠️")
# Create a nightmare scenario - multiple overlapping spikes
memory_simulator.spike_pressure(duration=10.0) # 10-second spike
# 50% chance of double-spike (pure evil)
if random.random() < 0.5:
await asyncio.sleep(2.0) # Wait 2 seconds
logger.info("💀 DOUBLE SPIKE - EXTREME MEMORY PRESSURE 💀")
memory_simulator.spike_pressure(duration=8.0) # 8-second overlapping spike
if STREAM:
# Stream mode - process results as they come in
async for result in dispatcher.run_urls_stream(
urls=url_chunk,
crawler=crawler,
config=run_config
):
await process_result(result, test_results)
else:
# Non-stream mode - get all results at once
results = await dispatcher.run_urls(
urls=url_chunk,
crawler=crawler,
config=run_config
)
await process_results(results, test_results)
# Simulate discovering more URLs while others are still processing
await asyncio.sleep(1)
# RARELY release pressure - make the system fight for resources
if chunk_index % 5 == 4: # Less frequent releases
release_percent = random.choice([10, 15, 20]) # Smaller, inconsistent releases
logger.info(f"Releasing {release_percent}% of memory blocks - brief respite")
memory_simulator.release_pressure(percent=release_percent)
except Exception as e:
logger.error(f"Test error: {str(e)}")
raise
finally:
# Release memory pressure
memory_simulator.release_pressure()
# Log final results
test_results.log_summary()
# Check for success criteria
if len(test_results.completed_urls) + len(test_results.failed_urls) < url_count:
logger.error(f"TEST FAILED: Not all URLs were processed. {url_count - len(test_results.completed_urls) - len(test_results.failed_urls)} URLs missing.")
return False
logger.info("TEST PASSED: All URLs were processed without crashing.")
return True
# Command-line entry point
if __name__ == "__main__":
# Parse command line arguments
url_count = int(sys.argv[1]) if len(sys.argv) > 1 else 100
target_memory = float(sys.argv[2]) if len(sys.argv) > 2 else 85.0
# Check if stream mode is specified
if len(sys.argv) > 3:
STREAM = sys.argv[3].lower() in ('true', 'yes', '1', 'stream')
# Check if aggressive mode is specified
aggressive = False
if len(sys.argv) > 4:
aggressive = sys.argv[4].lower() in ('true', 'yes', '1', 'aggressive')
print(f"Starting test with {url_count} URLs, {target_memory}% memory target")
print(f"Stream mode: {STREAM}, Aggressive: {aggressive}")
print("Logs will be written to the logs directory")
print("Live display starting now...")
# Run the test
result = asyncio.run(run_memory_stress_test(
url_count=url_count,
target_memory_percent=target_memory,
aggressive=aggressive
))
# Exit with status code
sys.exit(0 if result else 1)