feat(tests): add comprehensive E2E CLI test suite with 32 tests

Implemented complete end-to-end testing framework for crwl server CLI with:

Test Coverage:
- Basic operations: 8 tests (start, stop, status, logs, restart, cleanup)
- Advanced features: 8 tests (scaling, modes, custom configs)
- Edge cases: 10 tests (error handling, validation, recovery)
- Resource tests: 5 tests (memory, CPU, stress, cleanup, stability)
- Dashboard UI: 1 test (Playwright-based visual testing)

Test Results:
- 29/32 tests executed with 100% pass rate
- All core functionality verified and working
- Error handling robust with clear messages
- Resource management thoroughly tested

Infrastructure:
- Modular test structure (basic/advanced/resource/edge/dashboard)
- Master test runner with colored output and statistics
- Comprehensive documentation (README, TEST_RESULTS, TEST_SUMMARY)
- Reorganized existing tests into codebase_test/ and monitor/ folders

Files:
- 32 shell script tests (all categories)
- 1 Python dashboard UI test with Playwright
- 1 master test runner script
- 3 documentation files
- Modified .gitignore to allow test scripts

All tests are production-ready and can be run individually or as a suite.
This commit is contained in:
unclecode
2025-10-20 12:42:18 +08:00
parent 91f7b9d129
commit 342fc52b47
49 changed files with 3201 additions and 0 deletions

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Test 1: Basic Container Health + Single Endpoint
- Starts container
- Hits /health endpoint 10 times
- Reports success rate and basic latency
"""
import asyncio
import time
import docker
import httpx
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 10
async def test_endpoint(url: str, count: int):
"""Hit endpoint multiple times, return stats."""
results = []
async with httpx.AsyncClient(timeout=30.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.get(url)
elapsed = (time.time() - start) * 1000 # ms
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
"status": resp.status_code
})
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({
"success": False,
"latency_ms": None,
"error": str(e)
})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container, return container object."""
# Clean up existing
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container '{name}'...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container '{name}' from image '{image}'...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
environment={"PYTHON_ENV": "production"}
)
# Wait for health
print(f"⏳ Waiting for container to be healthy...")
for _ in range(30): # 30s timeout
time.sleep(1)
container.reload()
if container.status == "running":
try:
# Quick health check
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop and remove container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
print(f"✅ Container removed")
async def main():
print("="*60)
print("TEST 1: Basic Container Health + Single Endpoint")
print("="*60)
client = docker.from_env()
container = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Test /health endpoint
print(f"\n📊 Testing /health endpoint ({REQUESTS} requests)...")
url = f"http://localhost:{PORT}/health"
results = await test_endpoint(url, REQUESTS)
# Calculate stats
successes = sum(1 for r in results if r["success"])
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if r["latency_ms"] is not None]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
if latencies:
print(f" Min Latency: {min(latencies):.0f}ms")
print(f" Max Latency: {max(latencies):.0f}ms")
print(f"{'='*60}")
# Pass/Fail
if success_rate >= 100:
print(f"✅ TEST PASSED")
return 0
else:
print(f"❌ TEST FAILED (expected 100% success rate)")
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
return 1
finally:
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
Test 2: Docker Stats Monitoring
- Extends Test 1 with real-time container stats
- Monitors memory % and CPU during requests
- Reports baseline, peak, and final memory
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 20 # More requests to see memory usage
# Stats tracking
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background thread to collect container stats."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
# Extract memory stats
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024) # MB
mem_limit = stat['memory_stats'].get('limit', 1) / (1024 * 1024)
mem_percent = (mem_usage / mem_limit * 100) if mem_limit > 0 else 0
# Extract CPU stats (handle missing fields on Mac)
cpu_percent = 0
try:
cpu_delta = stat['cpu_stats']['cpu_usage']['total_usage'] - \
stat['precpu_stats']['cpu_usage']['total_usage']
system_delta = stat['cpu_stats'].get('system_cpu_usage', 0) - \
stat['precpu_stats'].get('system_cpu_usage', 0)
if system_delta > 0:
num_cpus = stat['cpu_stats'].get('online_cpus', 1)
cpu_percent = (cpu_delta / system_delta * num_cpus * 100.0)
except (KeyError, ZeroDivisionError):
pass
stats_history.append({
'timestamp': time.time(),
'memory_mb': mem_usage,
'memory_percent': mem_percent,
'cpu_percent': cpu_percent
})
except Exception as e:
# Skip malformed stats
pass
time.sleep(0.5) # Sample every 500ms
async def test_endpoint(url: str, count: int):
"""Hit endpoint, return stats."""
results = []
async with httpx.AsyncClient(timeout=30.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.get(url)
elapsed = (time.time() - start) * 1000
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
})
if (i + 1) % 5 == 0: # Print every 5 requests
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({"success": False, "error": str(e)})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container '{name}'...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container '{name}'...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
mem_limit="4g", # Set explicit memory limit
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
async def main():
print("="*60)
print("TEST 2: Docker Stats Monitoring")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Start stats monitoring in background
print(f"\n📊 Starting stats monitor...")
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
# Wait a bit for baseline
await asyncio.sleep(2)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline memory: {baseline_mem:.1f} MB")
# Test /health endpoint
print(f"\n🔄 Running {REQUESTS} requests to /health...")
url = f"http://localhost:{PORT}/health"
results = await test_endpoint(url, REQUESTS)
# Wait a bit to capture peak
await asyncio.sleep(1)
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Calculate stats
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Memory stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
mem_delta = final_mem - baseline_mem
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
print(f"\n Memory Stats:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {mem_delta:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
if success_rate >= 100 and mem_delta < 100: # No significant memory growth
print(f"✅ TEST PASSED")
return 0
else:
if success_rate < 100:
print(f"❌ TEST FAILED (success rate < 100%)")
if mem_delta >= 100:
print(f"⚠️ WARNING: Memory grew by {mem_delta:.1f} MB")
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
return 1
finally:
stop_monitoring.set()
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
Test 3: Pool Validation - Permanent Browser Reuse
- Tests /html endpoint (should use permanent browser)
- Monitors container logs for pool hit markers
- Validates browser reuse rate
- Checks memory after browser creation
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 30
# Stats tracking
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({
'timestamp': time.time(),
'memory_mb': mem_usage,
})
except:
pass
time.sleep(0.5)
def count_log_markers(container):
"""Extract pool usage markers from logs."""
logs = container.logs().decode('utf-8')
permanent_hits = logs.count("🔥 Using permanent browser")
hot_hits = logs.count("♨️ Using hot pool browser")
cold_hits = logs.count("❄️ Using cold pool browser")
new_created = logs.count("🆕 Creating new browser")
return {
'permanent_hits': permanent_hits,
'hot_hits': hot_hits,
'cold_hits': cold_hits,
'new_created': new_created,
'total_hits': permanent_hits + hot_hits + cold_hits
}
async def test_endpoint(url: str, count: int):
"""Hit endpoint multiple times."""
results = []
async with httpx.AsyncClient(timeout=60.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"})
elapsed = (time.time() - start) * 1000
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
})
if (i + 1) % 10 == 0:
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({"success": False, "error": str(e)})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
async def main():
print("="*60)
print("TEST 3: Pool Validation - Permanent Browser Reuse")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Wait for permanent browser initialization
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start stats monitoring
print(f"📊 Starting stats monitor...")
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline (with permanent browser): {baseline_mem:.1f} MB")
# Test /html endpoint (uses permanent browser for default config)
print(f"\n🔄 Running {REQUESTS} requests to /html...")
url = f"http://localhost:{PORT}/html"
results = await test_endpoint(url, REQUESTS)
# Wait a bit
await asyncio.sleep(1)
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze logs for pool markers
print(f"\n📋 Analyzing pool usage...")
pool_stats = count_log_markers(container)
# Calculate request stats
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Memory stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
mem_delta = final_mem - baseline_mem
# Calculate reuse rate
total_requests = len(results)
total_pool_hits = pool_stats['total_hits']
reuse_rate = (total_pool_hits / total_requests * 100) if total_requests > 0 else 0
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
print(f"\n Pool Stats:")
print(f" 🔥 Permanent Hits: {pool_stats['permanent_hits']}")
print(f" ♨️ Hot Pool Hits: {pool_stats['hot_hits']}")
print(f" ❄️ Cold Pool Hits: {pool_stats['cold_hits']}")
print(f" 🆕 New Created: {pool_stats['new_created']}")
print(f" 📊 Reuse Rate: {reuse_rate:.1f}%")
print(f"\n Memory Stats:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {mem_delta:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
if success_rate < 100:
print(f"❌ FAIL: Success rate {success_rate:.1f}% < 100%")
passed = False
if reuse_rate < 80:
print(f"❌ FAIL: Reuse rate {reuse_rate:.1f}% < 80% (expected high permanent browser usage)")
passed = False
if pool_stats['permanent_hits'] < (total_requests * 0.8):
print(f"⚠️ WARNING: Only {pool_stats['permanent_hits']} permanent hits out of {total_requests} requests")
if mem_delta > 200:
print(f"⚠️ WARNING: Memory grew by {mem_delta:.1f} MB (possible browser leak)")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""
Test 4: Concurrent Load Testing
- Tests pool under concurrent load
- Escalates: 10 → 50 → 100 concurrent requests
- Validates latency distribution (P50, P95, P99)
- Monitors memory stability
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
from collections import defaultdict
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
LOAD_LEVELS = [
{"name": "Light", "concurrent": 10, "requests": 20},
{"name": "Medium", "concurrent": 50, "requests": 100},
{"name": "Heavy", "concurrent": 100, "requests": 200},
]
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
def count_log_markers(container):
"""Extract pool markers."""
logs = container.logs().decode('utf-8')
return {
'permanent': logs.count("🔥 Using permanent browser"),
'hot': logs.count("♨️ Using hot pool browser"),
'cold': logs.count("❄️ Using cold pool browser"),
'new': logs.count("🆕 Creating new browser"),
}
async def hit_endpoint(client, url, payload, semaphore):
"""Single request with concurrency control."""
async with semaphore:
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=60.0)
elapsed = (time.time() - start) * 1000
return {"success": resp.status_code == 200, "latency_ms": elapsed}
except Exception as e:
return {"success": False, "error": str(e)}
async def run_concurrent_test(url, payload, concurrent, total_requests):
"""Run concurrent requests."""
semaphore = asyncio.Semaphore(concurrent)
async with httpx.AsyncClient() as client:
tasks = [hit_endpoint(client, url, payload, semaphore) for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
return results
def calculate_percentiles(latencies):
"""Calculate P50, P95, P99."""
if not latencies:
return 0, 0, 0
sorted_lat = sorted(latencies)
n = len(sorted_lat)
return (
sorted_lat[int(n * 0.50)],
sorted_lat[int(n * 0.95)],
sorted_lat[int(n * 0.99)],
)
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 4: Concurrent Load Testing")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
url = f"http://localhost:{PORT}/html"
payload = {"url": "https://httpbin.org/html"}
all_results = []
level_stats = []
# Run load levels
for level in LOAD_LEVELS:
print(f"{'='*60}")
print(f"🔄 {level['name']} Load: {level['concurrent']} concurrent, {level['requests']} total")
print(f"{'='*60}")
start_time = time.time()
results = await run_concurrent_test(url, payload, level['concurrent'], level['requests'])
duration = time.time() - start_time
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
p50, p95, p99 = calculate_percentiles(latencies)
avg_lat = sum(latencies) / len(latencies) if latencies else 0
print(f" Duration: {duration:.1f}s")
print(f" Success: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_lat:.0f}ms")
print(f" P50/P95/P99: {p50:.0f}ms / {p95:.0f}ms / {p99:.0f}ms")
level_stats.append({
'name': level['name'],
'concurrent': level['concurrent'],
'success_rate': success_rate,
'avg_latency': avg_lat,
'p50': p50, 'p95': p95, 'p99': p99,
})
all_results.extend(results)
await asyncio.sleep(2) # Cool down between levels
# Stop monitoring
await asyncio.sleep(1)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Final stats
pool_stats = count_log_markers(container)
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"FINAL RESULTS:")
print(f"{'='*60}")
print(f" Total Requests: {len(all_results)}")
print(f"\n Pool Utilization:")
print(f" 🔥 Permanent: {pool_stats['permanent']}")
print(f" ♨️ Hot: {pool_stats['hot']}")
print(f" ❄️ Cold: {pool_stats['cold']}")
print(f" 🆕 New: {pool_stats['new']}")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
for ls in level_stats:
if ls['success_rate'] < 99:
print(f"❌ FAIL: {ls['name']} success rate {ls['success_rate']:.1f}% < 99%")
passed = False
if ls['p99'] > 10000: # 10s threshold
print(f"⚠️ WARNING: {ls['name']} P99 latency {ls['p99']:.0f}ms very high")
if final_mem - baseline_mem > 300:
print(f"⚠️ WARNING: Memory grew {final_mem - baseline_mem:.1f} MB")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
Test 5: Pool Stress - Mixed Configs
- Tests hot/cold pool with different browser configs
- Uses different viewports to create config variants
- Validates cold → hot promotion after 3 uses
- Monitors pool tier distribution
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
import random
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS_PER_CONFIG = 5 # 5 requests per config variant
# Different viewport configs to test pool tiers
VIEWPORT_CONFIGS = [
None, # Default (permanent browser)
{"width": 1920, "height": 1080}, # Desktop
{"width": 1024, "height": 768}, # Tablet
{"width": 375, "height": 667}, # Mobile
]
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
def analyze_pool_logs(container):
"""Extract detailed pool stats from logs."""
logs = container.logs().decode('utf-8')
permanent = logs.count("🔥 Using permanent browser")
hot = logs.count("♨️ Using hot pool browser")
cold = logs.count("❄️ Using cold pool browser")
new = logs.count("🆕 Creating new browser")
promotions = logs.count("⬆️ Promoting to hot pool")
return {
'permanent': permanent,
'hot': hot,
'cold': cold,
'new': new,
'promotions': promotions,
'total': permanent + hot + cold
}
async def crawl_with_viewport(client, url, viewport):
"""Single request with specific viewport."""
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {},
"crawler_config": {}
}
# Add viewport if specified
if viewport:
payload["browser_config"] = {
"type": "BrowserConfig",
"params": {
"viewport": {"type": "dict", "value": viewport},
"headless": True,
"text_mode": True,
"extra_args": [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-web-security",
"--allow-insecure-localhost",
"--ignore-certificate-errors"
]
}
}
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=60.0)
elapsed = (time.time() - start) * 1000
return {"success": resp.status_code == 200, "latency_ms": elapsed, "viewport": viewport}
except Exception as e:
return {"success": False, "error": str(e), "viewport": viewport}
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 5: Pool Stress - Mixed Configs")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
url = f"http://localhost:{PORT}/crawl"
print(f"Testing {len(VIEWPORT_CONFIGS)} different configs:")
for i, vp in enumerate(VIEWPORT_CONFIGS):
vp_str = "Default" if vp is None else f"{vp['width']}x{vp['height']}"
print(f" {i+1}. {vp_str}")
print()
# Run requests: repeat each config REQUESTS_PER_CONFIG times
all_results = []
config_sequence = []
for _ in range(REQUESTS_PER_CONFIG):
for viewport in VIEWPORT_CONFIGS:
config_sequence.append(viewport)
# Shuffle to mix configs
random.shuffle(config_sequence)
print(f"🔄 Running {len(config_sequence)} requests with mixed configs...")
async with httpx.AsyncClient() as http_client:
for i, viewport in enumerate(config_sequence):
result = await crawl_with_viewport(http_client, url, viewport)
all_results.append(result)
if (i + 1) % 5 == 0:
vp_str = "default" if result['viewport'] is None else f"{result['viewport']['width']}x{result['viewport']['height']}"
status = "" if result.get('success') else ""
lat = f"{result.get('latency_ms', 0):.0f}ms" if 'latency_ms' in result else "error"
print(f" [{i+1}/{len(config_sequence)}] {status} {vp_str} - {lat}")
# Stop monitoring
await asyncio.sleep(2)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze results
pool_stats = analyze_pool_logs(container)
successes = sum(1 for r in all_results if r.get("success"))
success_rate = (successes / len(all_results)) * 100
latencies = [r["latency_ms"] for r in all_results if "latency_ms" in r]
avg_lat = sum(latencies) / len(latencies) if latencies else 0
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
print(f" Requests: {len(all_results)}")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(all_results)})")
print(f" Avg Latency: {avg_lat:.0f}ms")
print(f"\n Pool Statistics:")
print(f" 🔥 Permanent: {pool_stats['permanent']}")
print(f" ♨️ Hot: {pool_stats['hot']}")
print(f" ❄️ Cold: {pool_stats['cold']}")
print(f" 🆕 New: {pool_stats['new']}")
print(f" ⬆️ Promotions: {pool_stats['promotions']}")
print(f" 📊 Reuse: {(pool_stats['total'] / len(all_results) * 100):.1f}%")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
if success_rate < 99:
print(f"❌ FAIL: Success rate {success_rate:.1f}% < 99%")
passed = False
# Should see promotions since we repeat each config 5 times
if pool_stats['promotions'] < (len(VIEWPORT_CONFIGS) - 1): # -1 for default
print(f"⚠️ WARNING: Only {pool_stats['promotions']} promotions (expected ~{len(VIEWPORT_CONFIGS)-1})")
# Should have created some browsers for different configs
if pool_stats['new'] == 0:
print(f"⚠️ NOTE: No new browsers created (all used default?)")
if pool_stats['permanent'] == len(all_results):
print(f"⚠️ NOTE: All requests used permanent browser (configs not varying enough?)")
if final_mem - baseline_mem > 500:
print(f"⚠️ WARNING: Memory grew {final_mem - baseline_mem:.1f} MB")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Test 6: Multi-Endpoint Testing
- Tests multiple endpoints together: /html, /screenshot, /pdf, /crawl
- Validates each endpoint works correctly
- Monitors success rates per endpoint
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS_PER_ENDPOINT = 10
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
async def test_html(client, base_url, count):
"""Test /html endpoint."""
url = f"{base_url}/html"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_screenshot(client, base_url, count):
"""Test /screenshot endpoint."""
url = f"{base_url}/screenshot"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_pdf(client, base_url, count):
"""Test /pdf endpoint."""
url = f"{base_url}/pdf"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_crawl(client, base_url, count):
"""Test /crawl endpoint."""
url = f"{base_url}/crawl"
results = []
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {},
"crawler_config": {}
}
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 6: Multi-Endpoint Testing")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
base_url = f"http://localhost:{PORT}"
# Test each endpoint
endpoints = {
"/html": test_html,
"/screenshot": test_screenshot,
"/pdf": test_pdf,
"/crawl": test_crawl,
}
all_endpoint_stats = {}
async with httpx.AsyncClient() as http_client:
for endpoint_name, test_func in endpoints.items():
print(f"🔄 Testing {endpoint_name} ({REQUESTS_PER_ENDPOINT} requests)...")
results = await test_func(http_client, base_url, REQUESTS_PER_ENDPOINT)
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_lat = sum(latencies) / len(latencies) if latencies else 0
all_endpoint_stats[endpoint_name] = {
'success_rate': success_rate,
'avg_latency': avg_lat,
'total': len(results),
'successes': successes
}
print(f" ✓ Success: {success_rate:.1f}% ({successes}/{len(results)}), Avg: {avg_lat:.0f}ms")
# Stop monitoring
await asyncio.sleep(1)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Final stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
for endpoint, stats in all_endpoint_stats.items():
print(f" {endpoint:12} Success: {stats['success_rate']:5.1f}% Avg: {stats['avg_latency']:6.0f}ms")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
for endpoint, stats in all_endpoint_stats.items():
if stats['success_rate'] < 100:
print(f"❌ FAIL: {endpoint} success rate {stats['success_rate']:.1f}% < 100%")
passed = False
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Test 7: Cleanup Verification (Janitor)
- Creates load spike then goes idle
- Verifies memory returns to near baseline
- Tests janitor cleanup of idle browsers
- Monitors memory recovery time
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
SPIKE_REQUESTS = 20 # Create some browsers
IDLE_TIME = 90 # Wait 90s for janitor (runs every 60s)
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(1) # Sample every 1s for this test
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 7: Cleanup Verification (Janitor)")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(2)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
# Create load spike with different configs to populate pool
print(f"🔥 Creating load spike ({SPIKE_REQUESTS} requests with varied configs)...")
url = f"http://localhost:{PORT}/crawl"
viewports = [
{"width": 1920, "height": 1080},
{"width": 1024, "height": 768},
{"width": 375, "height": 667},
]
async with httpx.AsyncClient(timeout=60.0) as http_client:
tasks = []
for i in range(SPIKE_REQUESTS):
vp = viewports[i % len(viewports)]
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {
"type": "BrowserConfig",
"params": {
"viewport": {"type": "dict", "value": vp},
"headless": True,
"text_mode": True,
"extra_args": [
"--no-sandbox", "--disable-dev-shm-usage",
"--disable-gpu", "--disable-software-rasterizer",
"--disable-web-security", "--allow-insecure-localhost",
"--ignore-certificate-errors"
]
}
},
"crawler_config": {}
}
tasks.append(http_client.post(url, json=payload))
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if hasattr(r, 'status_code') and r.status_code == 200)
print(f" ✓ Spike completed: {successes}/{len(results)} successful")
# Measure peak
await asyncio.sleep(2)
peak_mem = max([s['memory_mb'] for s in stats_history]) if stats_history else baseline_mem
print(f" 📊 Peak memory: {peak_mem:.1f} MB (+{peak_mem - baseline_mem:.1f} MB)")
# Now go idle and wait for janitor
print(f"\n⏸️ Going idle for {IDLE_TIME}s (janitor cleanup)...")
print(f" (Janitor runs every 60s, checking for idle browsers)")
for elapsed in range(0, IDLE_TIME, 10):
await asyncio.sleep(10)
current_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f" [{elapsed+10:3d}s] Memory: {current_mem:.1f} MB")
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze memory recovery
final_mem = stats_history[-1]['memory_mb'] if stats_history else 0
recovery_mb = peak_mem - final_mem
recovery_pct = (recovery_mb / (peak_mem - baseline_mem) * 100) if (peak_mem - baseline_mem) > 0 else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
print(f" Memory Journey:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB (+{peak_mem - baseline_mem:.1f} MB)")
print(f" Final: {final_mem:.1f} MB (+{final_mem - baseline_mem:.1f} MB)")
print(f" Recovered: {recovery_mb:.1f} MB ({recovery_pct:.1f}%)")
print(f"{'='*60}")
# Pass/Fail
passed = True
# Should have created some memory pressure
if peak_mem - baseline_mem < 100:
print(f"⚠️ WARNING: Peak increase only {peak_mem - baseline_mem:.1f} MB (expected more browsers)")
# Should recover most memory (within 100MB of baseline)
if final_mem - baseline_mem > 100:
print(f"⚠️ WARNING: Memory didn't recover well (still +{final_mem - baseline_mem:.1f} MB above baseline)")
else:
print(f"✅ Good memory recovery!")
# Baseline + 50MB tolerance
if final_mem - baseline_mem < 50:
print(f"✅ Excellent cleanup (within 50MB of baseline)")
print(f"✅ TEST PASSED")
return 0
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)