Replace crawler_manager.py with simpler crawler_pool.py implementation: - Add global page semaphore for hard concurrency cap - Implement browser pool with idle cleanup - Add playground UI for testing and stress testing - Update API handlers to use pooled crawlers - Enhance logging levels and symbols BREAKING CHANGE: Removes CrawlerManager class in favor of simpler pool-based approach
204 lines
7.9 KiB
Python
204 lines
7.9 KiB
Python
"""Lite Crawl4AI API stress‑tester.
|
||
|
||
✔ batch or stream mode (single unified path)
|
||
✔ global stats + JSON summary
|
||
✔ rich table progress
|
||
✔ Typer CLI with presets (quick / soak)
|
||
|
||
Usage examples:
|
||
python api_stress_test.py # uses quick preset
|
||
python api_stress_test.py soak # 5 K URLs stress run
|
||
python api_stress_test.py --urls 200 --concurrent 10 --chunk 20
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio, json, time, uuid, pathlib, statistics
|
||
from typing import List, Dict, Optional
|
||
|
||
import httpx, typer
|
||
from rich.console import Console
|
||
from rich.table import Table
|
||
|
||
# ───────────────────────── defaults / presets ──────────────────────────
|
||
PRESETS = {
|
||
"quick": dict(urls=1, concurrent=1, chunk=1, stream=False),
|
||
"debug": dict(urls=10, concurrent=2, chunk=5, stream=False),
|
||
"soak": dict(urls=5000, concurrent=20, chunk=50, stream=True),
|
||
}
|
||
|
||
API_HEALTH_ENDPOINT = "/health"
|
||
REQUEST_TIMEOUT = 180.0
|
||
|
||
console = Console()
|
||
app = typer.Typer(add_completion=False, rich_markup_mode="rich")
|
||
|
||
# ───────────────────────── helpers ─────────────────────────────────────
|
||
async def _check_health(client: httpx.AsyncClient) -> None:
|
||
resp = await client.get(API_HEALTH_ENDPOINT, timeout=10)
|
||
resp.raise_for_status()
|
||
console.print(f"[green]Server healthy — version {resp.json().get('version','?')}[/]")
|
||
|
||
async def _iter_results(resp: httpx.Response, stream: bool):
|
||
"""Yield result dicts from batch JSON or ND‑JSON stream."""
|
||
if stream:
|
||
async for line in resp.aiter_lines():
|
||
if not line:
|
||
continue
|
||
rec = json.loads(line)
|
||
if rec.get("status") == "completed":
|
||
break
|
||
yield rec
|
||
else:
|
||
data = resp.json()
|
||
for rec in data.get("results", []):
|
||
yield rec, data # rec + whole payload for memory delta/peak
|
||
|
||
async def _consume_stream(resp: httpx.Response) -> Dict:
|
||
stats = {"success_urls": 0, "failed_urls": 0, "mem_metric": 0.0}
|
||
async for line in resp.aiter_lines():
|
||
if not line:
|
||
continue
|
||
rec = json.loads(line)
|
||
if rec.get("status") == "completed":
|
||
break
|
||
if rec.get("success"):
|
||
stats["success_urls"] += 1
|
||
else:
|
||
stats["failed_urls"] += 1
|
||
mem = rec.get("server_memory_mb")
|
||
if mem is not None:
|
||
stats["mem_metric"] = max(stats["mem_metric"], float(mem))
|
||
return stats
|
||
|
||
def _consume_batch(body: Dict) -> Dict:
|
||
stats = {"success_urls": 0, "failed_urls": 0}
|
||
for rec in body.get("results", []):
|
||
if rec.get("success"):
|
||
stats["success_urls"] += 1
|
||
else:
|
||
stats["failed_urls"] += 1
|
||
stats["mem_metric"] = body.get("server_memory_delta_mb")
|
||
stats["peak"] = body.get("server_peak_memory_mb")
|
||
return stats
|
||
|
||
async def _fetch_chunk(
|
||
client: httpx.AsyncClient,
|
||
urls: List[str],
|
||
stream: bool,
|
||
semaphore: asyncio.Semaphore,
|
||
) -> Dict:
|
||
endpoint = "/crawl/stream" if stream else "/crawl"
|
||
payload = {
|
||
"urls": urls,
|
||
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
|
||
"crawler_config": {"type": "CrawlerRunConfig",
|
||
"params": {"cache_mode": "BYPASS", "stream": stream}},
|
||
}
|
||
|
||
async with semaphore:
|
||
start = time.perf_counter()
|
||
|
||
if stream:
|
||
# ---- streaming request ----
|
||
async with client.stream("POST", endpoint, json=payload) as resp:
|
||
resp.raise_for_status()
|
||
stats = await _consume_stream(resp)
|
||
else:
|
||
# ---- batch request ----
|
||
resp = await client.post(endpoint, json=payload)
|
||
resp.raise_for_status()
|
||
stats = _consume_batch(resp.json())
|
||
|
||
stats["elapsed"] = time.perf_counter() - start
|
||
return stats
|
||
|
||
|
||
# ───────────────────────── core runner ─────────────────────────────────
|
||
async def _run(api: str, urls: int, concurrent: int, chunk: int, stream: bool, report: pathlib.Path):
|
||
client = httpx.AsyncClient(base_url=api, timeout=REQUEST_TIMEOUT, limits=httpx.Limits(max_connections=concurrent+5))
|
||
await _check_health(client)
|
||
|
||
url_list = [f"https://httpbin.org/anything/{uuid.uuid4()}" for _ in range(urls)]
|
||
chunks = [url_list[i:i+chunk] for i in range(0, len(url_list), chunk)]
|
||
sem = asyncio.Semaphore(concurrent)
|
||
|
||
table = Table(show_header=True, header_style="bold magenta")
|
||
table.add_column("Batch", style="dim", width=6)
|
||
table.add_column("Success/Fail", width=12)
|
||
table.add_column("Mem", width=14)
|
||
table.add_column("Time (s)")
|
||
|
||
agg_success = agg_fail = 0
|
||
deltas, peaks = [], []
|
||
|
||
start = time.perf_counter()
|
||
tasks = [asyncio.create_task(_fetch_chunk(client, c, stream, sem)) for c in chunks]
|
||
for idx, coro in enumerate(asyncio.as_completed(tasks), 1):
|
||
res = await coro
|
||
agg_success += res["success_urls"]
|
||
agg_fail += res["failed_urls"]
|
||
if res["mem_metric"] is not None:
|
||
deltas.append(res["mem_metric"])
|
||
if res["peak"] is not None:
|
||
peaks.append(res["peak"])
|
||
|
||
mem_txt = f"{res['mem_metric']:.1f}" if res["mem_metric"] is not None else "‑"
|
||
if res["peak"] is not None:
|
||
mem_txt = f"{res['peak']:.1f}/{mem_txt}"
|
||
|
||
table.add_row(str(idx), f"{res['success_urls']}/{res['failed_urls']}", mem_txt, f"{res['elapsed']:.2f}")
|
||
|
||
console.print(table)
|
||
total_time = time.perf_counter() - start
|
||
|
||
summary = {
|
||
"urls": urls,
|
||
"concurrent": concurrent,
|
||
"chunk": chunk,
|
||
"stream": stream,
|
||
"success_urls": agg_success,
|
||
"failed_urls": agg_fail,
|
||
"elapsed_sec": round(total_time, 2),
|
||
"avg_mem": round(statistics.mean(deltas), 2) if deltas else None,
|
||
"max_mem": max(deltas) if deltas else None,
|
||
"avg_peak": round(statistics.mean(peaks), 2) if peaks else None,
|
||
"max_peak": max(peaks) if peaks else None,
|
||
}
|
||
console.print("\n[bold green]Done:[/]" , summary)
|
||
|
||
report.mkdir(parents=True, exist_ok=True)
|
||
path = report / f"api_test_{int(time.time())}.json"
|
||
path.write_text(json.dumps(summary, indent=2))
|
||
console.print(f"[green]Summary → {path}")
|
||
|
||
await client.aclose()
|
||
|
||
# ───────────────────────── Typer CLI ──────────────────────────────────
|
||
@app.command()
|
||
def main(
|
||
preset: str = typer.Argument("quick", help="quick / debug / soak or custom"),
|
||
api_url: str = typer.Option("http://localhost:8020", show_default=True),
|
||
urls: int = typer.Option(None, help="Total URLs to crawl"),
|
||
concurrent: int = typer.Option(None, help="Concurrent API requests"),
|
||
chunk: int = typer.Option(None, help="URLs per request"),
|
||
stream: bool = typer.Option(None, help="Use /crawl/stream"),
|
||
report: pathlib.Path = typer.Option("reports_api", help="Where to save JSON summary"),
|
||
):
|
||
"""Run a stress test against a running Crawl4AI API server."""
|
||
if preset not in PRESETS and any(v is None for v in (urls, concurrent, chunk, stream)):
|
||
console.print(f"[red]Unknown preset '{preset}' and custom params missing[/]")
|
||
raise typer.Exit(1)
|
||
|
||
cfg = PRESETS.get(preset, {})
|
||
urls = urls or cfg.get("urls")
|
||
concurrent = concurrent or cfg.get("concurrent")
|
||
chunk = chunk or cfg.get("chunk")
|
||
stream = stream if stream is not None else cfg.get("stream", False)
|
||
|
||
console.print(f"[cyan]API:[/] {api_url} | URLs: {urls} | Concurrency: {concurrent} | Chunk: {chunk} | Stream: {stream}")
|
||
asyncio.run(_run(api_url, urls, concurrent, chunk, stream, report))
|
||
|
||
if __name__ == "__main__":
|
||
app()
|