Implements a comprehensive monitoring and visualization system for tracking web crawler operations in real-time. The system includes: - Terminal-based dashboard with rich UI for displaying task statuses - Memory pressure monitoring and adaptive dispatch control - Queue statistics and performance metrics tracking - Detailed task progress visualization - Stress testing framework for memory management This addition helps operators track crawler performance and manage memory usage more effectively.
410 lines
18 KiB
Python
410 lines
18 KiB
Python
import asyncio
|
|
import time
|
|
import psutil
|
|
import logging
|
|
import random
|
|
from typing import List, Dict
|
|
import uuid
|
|
import sys
|
|
import os
|
|
|
|
# Import your crawler components
|
|
from crawl4ai.models import DisplayMode, CrawlStatus, CrawlResult
|
|
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig, CacheMode
|
|
from crawl4ai import AsyncWebCrawler
|
|
from crawl4ai import MemoryAdaptiveDispatcher, CrawlerMonitor
|
|
|
|
# Global configuration
|
|
STREAM = False # Toggle between streaming and non-streaming modes
|
|
|
|
# Configure logging to file only (to avoid breaking the rich display)
|
|
os.makedirs("logs", exist_ok=True)
|
|
file_handler = logging.FileHandler("logs/memory_stress_test.log")
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
|
|
|
# Root logger - only to file, not console
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.INFO)
|
|
root_logger.addHandler(file_handler)
|
|
|
|
# Our test logger also writes to file only
|
|
logger = logging.getLogger("memory_stress_test")
|
|
logger.setLevel(logging.INFO)
|
|
logger.addHandler(file_handler)
|
|
logger.propagate = False # Don't propagate to root logger
|
|
|
|
# Create a memory restrictor to simulate limited memory environment
|
|
class MemorySimulator:
|
|
def __init__(self, target_percent: float = 85.0, aggressive: bool = False):
|
|
"""Simulates memory pressure by allocating memory"""
|
|
self.target_percent = target_percent
|
|
self.memory_blocks: List[bytearray] = []
|
|
self.aggressive = aggressive
|
|
|
|
def apply_pressure(self, additional_percent: float = 0.0):
|
|
"""Fill memory until we reach target percentage"""
|
|
current_percent = psutil.virtual_memory().percent
|
|
target = self.target_percent + additional_percent
|
|
|
|
if current_percent >= target:
|
|
return # Already at target
|
|
|
|
logger.info(f"Current memory: {current_percent}%, target: {target}%")
|
|
|
|
# Calculate how much memory we need to allocate
|
|
total_memory = psutil.virtual_memory().total
|
|
target_usage = (target / 100.0) * total_memory
|
|
current_usage = (current_percent / 100.0) * total_memory
|
|
bytes_to_allocate = int(target_usage - current_usage)
|
|
|
|
if bytes_to_allocate <= 0:
|
|
return
|
|
|
|
# Allocate in smaller chunks to avoid overallocation
|
|
if self.aggressive:
|
|
# Use larger chunks for faster allocation in aggressive mode
|
|
chunk_size = min(bytes_to_allocate, 200 * 1024 * 1024) # 200MB chunks
|
|
else:
|
|
chunk_size = min(bytes_to_allocate, 50 * 1024 * 1024) # 50MB chunks
|
|
|
|
try:
|
|
logger.info(f"Allocating {chunk_size / (1024 * 1024):.1f}MB to reach target memory usage")
|
|
self.memory_blocks.append(bytearray(chunk_size))
|
|
time.sleep(0.5) # Give system time to register the allocation
|
|
except MemoryError:
|
|
logger.warning("Unable to allocate more memory")
|
|
|
|
def release_pressure(self, percent: float = None):
|
|
"""
|
|
Release allocated memory
|
|
If percent is specified, release that percentage of blocks
|
|
"""
|
|
if not self.memory_blocks:
|
|
return
|
|
|
|
if percent is None:
|
|
# Release all
|
|
logger.info(f"Releasing all {len(self.memory_blocks)} memory blocks")
|
|
self.memory_blocks.clear()
|
|
else:
|
|
# Release specified percentage
|
|
blocks_to_release = int(len(self.memory_blocks) * (percent / 100.0))
|
|
if blocks_to_release > 0:
|
|
logger.info(f"Releasing {blocks_to_release} of {len(self.memory_blocks)} memory blocks ({percent}%)")
|
|
self.memory_blocks = self.memory_blocks[blocks_to_release:]
|
|
|
|
def spike_pressure(self, duration: float = 5.0):
|
|
"""
|
|
Create a temporary spike in memory pressure then release
|
|
Useful for forcing requeues
|
|
"""
|
|
logger.info(f"Creating memory pressure spike for {duration} seconds")
|
|
# Save current blocks count
|
|
initial_blocks = len(self.memory_blocks)
|
|
|
|
# Create spike with extra 5%
|
|
self.apply_pressure(additional_percent=5.0)
|
|
|
|
# Schedule release after duration
|
|
asyncio.create_task(self._delayed_release(duration, initial_blocks))
|
|
|
|
async def _delayed_release(self, delay: float, target_blocks: int):
|
|
"""Helper for spike_pressure - releases extra blocks after delay"""
|
|
await asyncio.sleep(delay)
|
|
|
|
# Remove blocks added since spike started
|
|
if len(self.memory_blocks) > target_blocks:
|
|
logger.info(f"Releasing memory spike ({len(self.memory_blocks) - target_blocks} blocks)")
|
|
self.memory_blocks = self.memory_blocks[:target_blocks]
|
|
|
|
# Test statistics collector
|
|
class TestResults:
|
|
def __init__(self):
|
|
self.start_time = time.time()
|
|
self.completed_urls: List[str] = []
|
|
self.failed_urls: List[str] = []
|
|
self.requeued_count = 0
|
|
self.memory_warnings = 0
|
|
self.max_memory_usage = 0.0
|
|
self.max_queue_size = 0
|
|
self.max_wait_time = 0.0
|
|
self.url_to_attempt: Dict[str, int] = {} # Track retries per URL
|
|
|
|
def log_summary(self):
|
|
duration = time.time() - self.start_time
|
|
logger.info("===== TEST SUMMARY =====")
|
|
logger.info(f"Stream mode: {'ON' if STREAM else 'OFF'}")
|
|
logger.info(f"Total duration: {duration:.1f} seconds")
|
|
logger.info(f"Completed URLs: {len(self.completed_urls)}")
|
|
logger.info(f"Failed URLs: {len(self.failed_urls)}")
|
|
logger.info(f"Requeue events: {self.requeued_count}")
|
|
logger.info(f"Memory warnings: {self.memory_warnings}")
|
|
logger.info(f"Max memory usage: {self.max_memory_usage:.1f}%")
|
|
logger.info(f"Max queue size: {self.max_queue_size}")
|
|
logger.info(f"Max wait time: {self.max_wait_time:.1f} seconds")
|
|
|
|
# Log URLs with multiple attempts
|
|
retried_urls = {url: count for url, count in self.url_to_attempt.items() if count > 1}
|
|
if retried_urls:
|
|
logger.info(f"URLs with retries: {len(retried_urls)}")
|
|
# Log the top 5 most retried
|
|
top_retries = sorted(retried_urls.items(), key=lambda x: x[1], reverse=True)[:5]
|
|
for url, count in top_retries:
|
|
logger.info(f" URL {url[-30:]} had {count} attempts")
|
|
|
|
# Write summary to a separate human-readable file
|
|
with open("logs/test_summary.txt", "w") as f:
|
|
f.write(f"Stream mode: {'ON' if STREAM else 'OFF'}\n")
|
|
f.write(f"Total duration: {duration:.1f} seconds\n")
|
|
f.write(f"Completed URLs: {len(self.completed_urls)}\n")
|
|
f.write(f"Failed URLs: {len(self.failed_urls)}\n")
|
|
f.write(f"Requeue events: {self.requeued_count}\n")
|
|
f.write(f"Memory warnings: {self.memory_warnings}\n")
|
|
f.write(f"Max memory usage: {self.max_memory_usage:.1f}%\n")
|
|
f.write(f"Max queue size: {self.max_queue_size}\n")
|
|
f.write(f"Max wait time: {self.max_wait_time:.1f} seconds\n")
|
|
|
|
# Custom monitor with stats tracking
|
|
# Custom monitor that extends CrawlerMonitor with test-specific tracking
|
|
class StressTestMonitor(CrawlerMonitor):
|
|
def __init__(self, test_results: TestResults, **kwargs):
|
|
# Initialize the parent CrawlerMonitor
|
|
super().__init__(**kwargs)
|
|
self.test_results = test_results
|
|
|
|
def update_memory_status(self, status: str):
|
|
if status != self.memory_status:
|
|
logger.info(f"Memory status changed: {self.memory_status} -> {status}")
|
|
if "CRITICAL" in status or "PRESSURE" in status:
|
|
self.test_results.memory_warnings += 1
|
|
|
|
# Track peak memory usage in test results
|
|
current_memory = psutil.virtual_memory().percent
|
|
self.test_results.max_memory_usage = max(self.test_results.max_memory_usage, current_memory)
|
|
|
|
# Call parent method to update the dashboard
|
|
super().update_memory_status(status)
|
|
|
|
def update_queue_statistics(self, total_queued: int, highest_wait_time: float, avg_wait_time: float):
|
|
# Track queue metrics in test results
|
|
self.test_results.max_queue_size = max(self.test_results.max_queue_size, total_queued)
|
|
self.test_results.max_wait_time = max(self.test_results.max_wait_time, highest_wait_time)
|
|
|
|
# Call parent method to update the dashboard
|
|
super().update_queue_statistics(total_queued, highest_wait_time, avg_wait_time)
|
|
|
|
def update_task(self, task_id: str, **kwargs):
|
|
# Track URL status changes for test results
|
|
if task_id in self.stats:
|
|
old_status = self.stats[task_id].status
|
|
|
|
# If this is a requeue event (requeued due to memory pressure)
|
|
if 'error_message' in kwargs and 'requeued' in kwargs['error_message']:
|
|
if not hasattr(self.stats[task_id], 'counted_requeue') or not self.stats[task_id].counted_requeue:
|
|
self.test_results.requeued_count += 1
|
|
self.stats[task_id].counted_requeue = True
|
|
|
|
# Track completion status for test results
|
|
if 'status' in kwargs:
|
|
new_status = kwargs['status']
|
|
if old_status != new_status:
|
|
if new_status == CrawlStatus.COMPLETED:
|
|
if task_id not in self.test_results.completed_urls:
|
|
self.test_results.completed_urls.append(task_id)
|
|
elif new_status == CrawlStatus.FAILED:
|
|
if task_id not in self.test_results.failed_urls:
|
|
self.test_results.failed_urls.append(task_id)
|
|
|
|
# Call parent method to update the dashboard
|
|
super().update_task(task_id, **kwargs)
|
|
self.live.update(self._create_table())
|
|
|
|
# Generate test URLs - use example.com with unique paths to avoid browser caching
|
|
def generate_test_urls(count: int) -> List[str]:
|
|
urls = []
|
|
for i in range(count):
|
|
# Add random path and query parameters to create unique URLs
|
|
path = f"/path/{uuid.uuid4()}"
|
|
query = f"?test={i}&random={random.randint(1, 100000)}"
|
|
urls.append(f"https://example.com{path}{query}")
|
|
return urls
|
|
|
|
# Process result callback
|
|
async def process_result(result, test_results: TestResults):
|
|
# Track attempt counts
|
|
if result.url not in test_results.url_to_attempt:
|
|
test_results.url_to_attempt[result.url] = 1
|
|
else:
|
|
test_results.url_to_attempt[result.url] += 1
|
|
|
|
if "requeued" in result.error_message:
|
|
test_results.requeued_count += 1
|
|
logger.debug(f"Requeued due to memory pressure: {result.url}")
|
|
elif result.success:
|
|
test_results.completed_urls.append(result.url)
|
|
logger.debug(f"Successfully processed: {result.url}")
|
|
else:
|
|
test_results.failed_urls.append(result.url)
|
|
logger.warning(f"Failed to process: {result.url} - {result.error_message}")
|
|
|
|
# Process multiple results (used in non-streaming mode)
|
|
async def process_results(results, test_results: TestResults):
|
|
for result in results:
|
|
await process_result(result, test_results)
|
|
|
|
# Main test function for extreme memory pressure simulation
|
|
async def run_memory_stress_test(
|
|
url_count: int = 100,
|
|
target_memory_percent: float = 92.0, # Push to dangerous levels
|
|
chunk_size: int = 20, # Larger chunks for more chaos
|
|
aggressive: bool = False,
|
|
spikes: bool = True
|
|
):
|
|
test_results = TestResults()
|
|
memory_simulator = MemorySimulator(target_percent=target_memory_percent, aggressive=aggressive)
|
|
|
|
logger.info(f"Starting stress test with {url_count} URLs in {'STREAM' if STREAM else 'NON-STREAM'} mode")
|
|
logger.info(f"Target memory usage: {target_memory_percent}%")
|
|
|
|
# First, elevate memory usage to create pressure
|
|
logger.info("Creating initial memory pressure...")
|
|
memory_simulator.apply_pressure()
|
|
|
|
# Create test URLs in chunks to simulate real-world crawling where URLs are discovered
|
|
all_urls = generate_test_urls(url_count)
|
|
url_chunks = [all_urls[i:i+chunk_size] for i in range(0, len(all_urls), chunk_size)]
|
|
|
|
# Set up the crawler components - low memory thresholds to create more requeues
|
|
browser_config = BrowserConfig(headless=True, verbose=False)
|
|
run_config = CrawlerRunConfig(
|
|
cache_mode=CacheMode.BYPASS,
|
|
verbose=False,
|
|
stream=STREAM # Use the global STREAM variable to set mode
|
|
)
|
|
|
|
# Create monitor with reference to test results
|
|
monitor = StressTestMonitor(
|
|
test_results=test_results,
|
|
display_mode=DisplayMode.DETAILED,
|
|
max_visible_rows=20,
|
|
total_urls=url_count # Pass total URLs count
|
|
)
|
|
|
|
# Create dispatcher with EXTREME settings - pure survival mode
|
|
# These settings are designed to create a memory battleground
|
|
dispatcher = MemoryAdaptiveDispatcher(
|
|
memory_threshold_percent=63.0, # Start throttling at just 60% memory
|
|
critical_threshold_percent=70.0, # Start requeuing at 70% - incredibly aggressive
|
|
recovery_threshold_percent=55.0, # Only resume normal ops when plenty of memory available
|
|
check_interval=0.1, # Check extremely frequently (100ms)
|
|
max_session_permit=20 if aggressive else 10, # Double the concurrent sessions - pure chaos
|
|
fairness_timeout=10.0, # Extremely low timeout - rapid priority changes
|
|
monitor=monitor
|
|
)
|
|
|
|
# Set up spike schedule if enabled
|
|
if spikes:
|
|
spike_intervals = []
|
|
# Create 3-5 random spike times
|
|
num_spikes = random.randint(3, 5)
|
|
for _ in range(num_spikes):
|
|
# Schedule spikes at random chunks
|
|
chunk_index = random.randint(1, len(url_chunks) - 1)
|
|
spike_intervals.append(chunk_index)
|
|
logger.info(f"Scheduled memory spikes at chunks: {spike_intervals}")
|
|
|
|
try:
|
|
async with AsyncWebCrawler(config=browser_config) as crawler:
|
|
# Process URLs in chunks to simulate discovering URLs over time
|
|
for chunk_index, url_chunk in enumerate(url_chunks):
|
|
logger.info(f"Processing chunk {chunk_index+1}/{len(url_chunks)} ({len(url_chunk)} URLs)")
|
|
|
|
# Regular pressure increases
|
|
if chunk_index % 2 == 0:
|
|
logger.info("Increasing memory pressure...")
|
|
memory_simulator.apply_pressure()
|
|
|
|
# Memory spike if scheduled for this chunk
|
|
if spikes and chunk_index in spike_intervals:
|
|
logger.info(f"⚠️ CREATING MASSIVE MEMORY SPIKE at chunk {chunk_index+1} ⚠️")
|
|
# Create a nightmare scenario - multiple overlapping spikes
|
|
memory_simulator.spike_pressure(duration=10.0) # 10-second spike
|
|
|
|
# 50% chance of double-spike (pure evil)
|
|
if random.random() < 0.5:
|
|
await asyncio.sleep(2.0) # Wait 2 seconds
|
|
logger.info("💀 DOUBLE SPIKE - EXTREME MEMORY PRESSURE 💀")
|
|
memory_simulator.spike_pressure(duration=8.0) # 8-second overlapping spike
|
|
|
|
if STREAM:
|
|
# Stream mode - process results as they come in
|
|
async for result in dispatcher.run_urls_stream(
|
|
urls=url_chunk,
|
|
crawler=crawler,
|
|
config=run_config
|
|
):
|
|
await process_result(result, test_results)
|
|
else:
|
|
# Non-stream mode - get all results at once
|
|
results = await dispatcher.run_urls(
|
|
urls=url_chunk,
|
|
crawler=crawler,
|
|
config=run_config
|
|
)
|
|
await process_results(results, test_results)
|
|
|
|
# Simulate discovering more URLs while others are still processing
|
|
await asyncio.sleep(1)
|
|
|
|
# RARELY release pressure - make the system fight for resources
|
|
if chunk_index % 5 == 4: # Less frequent releases
|
|
release_percent = random.choice([10, 15, 20]) # Smaller, inconsistent releases
|
|
logger.info(f"Releasing {release_percent}% of memory blocks - brief respite")
|
|
memory_simulator.release_pressure(percent=release_percent)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Test error: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Release memory pressure
|
|
memory_simulator.release_pressure()
|
|
# Log final results
|
|
test_results.log_summary()
|
|
|
|
# Check for success criteria
|
|
if len(test_results.completed_urls) + len(test_results.failed_urls) < url_count:
|
|
logger.error(f"TEST FAILED: Not all URLs were processed. {url_count - len(test_results.completed_urls) - len(test_results.failed_urls)} URLs missing.")
|
|
return False
|
|
|
|
logger.info("TEST PASSED: All URLs were processed without crashing.")
|
|
return True
|
|
|
|
# Command-line entry point
|
|
if __name__ == "__main__":
|
|
# Parse command line arguments
|
|
url_count = int(sys.argv[1]) if len(sys.argv) > 1 else 100
|
|
target_memory = float(sys.argv[2]) if len(sys.argv) > 2 else 85.0
|
|
|
|
# Check if stream mode is specified
|
|
if len(sys.argv) > 3:
|
|
STREAM = sys.argv[3].lower() in ('true', 'yes', '1', 'stream')
|
|
|
|
# Check if aggressive mode is specified
|
|
aggressive = False
|
|
if len(sys.argv) > 4:
|
|
aggressive = sys.argv[4].lower() in ('true', 'yes', '1', 'aggressive')
|
|
|
|
print(f"Starting test with {url_count} URLs, {target_memory}% memory target")
|
|
print(f"Stream mode: {STREAM}, Aggressive: {aggressive}")
|
|
print("Logs will be written to the logs directory")
|
|
print("Live display starting now...")
|
|
|
|
# Run the test
|
|
result = asyncio.run(run_memory_stress_test(
|
|
url_count=url_count,
|
|
target_memory_percent=target_memory,
|
|
aggressive=aggressive
|
|
))
|
|
|
|
# Exit with status code
|
|
sys.exit(0 if result else 1) |