Files
crawl4ai/tests/test_cdp_changes.py
2026-01-31 11:07:26 +00:00

507 lines
18 KiB
Python

"""
Tests for CDP connection caching and configurable close delay.
Two test suites:
1. Regression — default behavior unchanged, basic CDP still works
2. Stress — race conditions, parallel crawlers, locking, cache correctness
All tests are real (no mocks). Requires a running Chrome on port 9222:
chrome --headless=new --no-sandbox --remote-debugging-port=9222
"""
import asyncio
import time
import pytest
from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.browser_manager import _CDPConnectionCache, BrowserManager
CDP_URL = "http://localhost:9222"
TEST_URL = "https://example.com"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _quick_crawl(browser_cfg: BrowserConfig, url: str = TEST_URL) -> bool:
"""Run a single crawl, return True if the page loaded successfully."""
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
result = await crawler.arun(url=url, config=run_cfg)
return result.success and result.status_code == 200
# ===========================================================================
# SUITE 1 — REGRESSION (default behaviour, no new flags)
# ===========================================================================
class TestRegression:
"""Verify nothing is broken when the new parameters keep their defaults."""
@pytest.mark.asyncio
async def test_default_cdp_crawl(self):
"""Basic CDP crawl with default settings still works."""
cfg = BrowserConfig(cdp_url=CDP_URL, headless=True)
assert await _quick_crawl(cfg)
@pytest.mark.asyncio
async def test_default_params_values(self):
"""BrowserConfig defaults are backward-compatible."""
cfg = BrowserConfig()
assert cfg.cdp_close_delay == 1.0
assert cfg.cache_cdp_connection is False
@pytest.mark.asyncio
async def test_cdp_cleanup_on_close_still_works(self):
"""cdp_cleanup_on_close=True path (existing feature) still works."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cdp_cleanup_on_close=True,
)
assert await _quick_crawl(cfg)
@pytest.mark.asyncio
async def test_sequential_crawls_default(self):
"""Two sequential crawls with default settings both succeed."""
cfg = BrowserConfig(cdp_url=CDP_URL, headless=True)
assert await _quick_crawl(cfg)
assert await _quick_crawl(cfg)
@pytest.mark.asyncio
async def test_cdp_close_delay_default_timing(self):
"""Default close delay is ~1s (not 0, not much more)."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cdp_cleanup_on_close=True,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
t0 = time.monotonic()
async with AsyncWebCrawler(config=cfg) as crawler:
await crawler.arun(url=TEST_URL, config=run_cfg)
elapsed = time.monotonic() - t0
# The 1s sleep must be present (at least ~0.9s of close overhead)
assert elapsed >= 0.9, f"Close was too fast ({elapsed:.2f}s), sleep may be missing"
# ===========================================================================
# SUITE 2 — STRESS (cache, parallelism, race conditions, locking)
# ===========================================================================
class TestStress:
"""Hammer the CDP cache and configurable delay under pressure."""
# -- cdp_close_delay -------------------------------------------------------
@pytest.mark.asyncio
async def test_close_delay_zero_skips_sleep(self):
"""cdp_close_delay=0 should make close noticeably faster."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cdp_cleanup_on_close=True,
cdp_close_delay=0,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
t0 = time.monotonic()
async with AsyncWebCrawler(config=cfg) as crawler:
await crawler.arun(url=TEST_URL, config=run_cfg)
elapsed = time.monotonic() - t0
# Without the 1s sleep the whole thing should be well under 1s of close overhead
assert elapsed < 5.0, f"Close too slow with delay=0 ({elapsed:.2f}s)"
@pytest.mark.asyncio
async def test_close_delay_custom_value(self):
"""A custom delay value is respected."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cdp_cleanup_on_close=True,
cdp_close_delay=0.2,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
t0 = time.monotonic()
async with AsyncWebCrawler(config=cfg) as crawler:
await crawler.arun(url=TEST_URL, config=run_cfg)
elapsed = time.monotonic() - t0
# Should include at least the 0.2s delay
assert elapsed >= 0.2
# -- cache_cdp_connection: basic -----------------------------------------
@pytest.mark.asyncio
async def test_cache_basic_crawl(self):
"""A single crawl with caching enabled works."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
assert await _quick_crawl(cfg)
# Clean up cache after test
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_cache_sequential_reuse(self):
"""Two sequential crawlers reuse the same cached connection."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
# First crawl — creates the cache entry
async with AsyncWebCrawler(config=cfg) as crawler:
r1 = await crawler.arun(
url=TEST_URL,
config=CrawlerRunConfig(wait_until="domcontentloaded", page_timeout=15000, verbose=False),
)
assert r1.success
# Cache should still have an entry (ref went 1 -> 0, so it closed).
# But if we acquire again immediately it should create a fresh one.
# The key thing: this must not crash or hang.
async with AsyncWebCrawler(config=cfg) as crawler:
r2 = await crawler.arun(
url=TEST_URL,
config=CrawlerRunConfig(wait_until="domcontentloaded", page_timeout=15000, verbose=False),
)
assert r2.success
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_cache_overlapping_instances(self):
"""Two crawlers alive at the same time share the cache (ref_count=2)."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
crawler1 = AsyncWebCrawler(config=cfg)
await crawler1.start()
crawler2 = AsyncWebCrawler(config=cfg)
await crawler2.start()
# Both should work concurrently
r1, r2 = await asyncio.gather(
crawler1.arun(url=TEST_URL, config=run_cfg),
crawler2.arun(url=TEST_URL, config=run_cfg),
)
assert r1.success
assert r2.success
# Close one — connection must survive for the other
await crawler1.close()
# Second crawler should still work after first closed
r3 = await crawler2.arun(url=TEST_URL, config=run_cfg)
assert r3.success
await crawler2.close()
await _CDPConnectionCache.close_all()
# -- cache: ref counting -------------------------------------------------
@pytest.mark.asyncio
async def test_cache_ref_count_lifecycle(self):
"""Verify ref count goes up and comes back down correctly."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
c1 = AsyncWebCrawler(config=cfg)
await c1.start()
# Cache should have ref_count=1
assert CDP_URL in _CDPConnectionCache._cache
_, _, count = _CDPConnectionCache._cache[CDP_URL]
assert count == 1
c2 = AsyncWebCrawler(config=cfg)
await c2.start()
_, _, count = _CDPConnectionCache._cache[CDP_URL]
assert count == 2
await c1.close()
_, _, count = _CDPConnectionCache._cache[CDP_URL]
assert count == 1
await c2.close()
# Last reference released — entry should be gone
assert CDP_URL not in _CDPConnectionCache._cache
# -- cache: speed benefit ------------------------------------------------
@pytest.mark.asyncio
async def test_cache_faster_than_uncached(self):
"""Cached sequential crawls should be faster than uncached."""
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
# Uncached: two sequential crawls (each does full playwright start/stop)
cfg_no_cache = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cdp_cleanup_on_close=True,
cdp_close_delay=0.5,
)
t0 = time.monotonic()
for _ in range(2):
async with AsyncWebCrawler(config=cfg_no_cache) as crawler:
await crawler.arun(url=TEST_URL, config=run_cfg)
uncached_time = time.monotonic() - t0
# Cached: two sequential crawls (share playwright/CDP)
cfg_cache = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
t0 = time.monotonic()
for _ in range(2):
async with AsyncWebCrawler(config=cfg_cache) as crawler:
await crawler.arun(url=TEST_URL, config=run_cfg)
cached_time = time.monotonic() - t0
await _CDPConnectionCache.close_all()
# Cached should be faster (the uncached has 2x 0.5s delay alone)
assert cached_time < uncached_time, (
f"Cached ({cached_time:.2f}s) was not faster than uncached ({uncached_time:.2f}s)"
)
# -- parallel stress -----------------------------------------------------
@pytest.mark.asyncio
async def test_parallel_cached_crawlers(self):
"""Launch 5 crawlers in parallel, all sharing the cache."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
create_isolated_context=True,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=20000,
verbose=False,
)
async def crawl_one(idx: int):
async with AsyncWebCrawler(config=cfg) as crawler:
result = await crawler.arun(url=TEST_URL, config=run_cfg)
return idx, result.success
results = await asyncio.gather(*[crawl_one(i) for i in range(5)])
for idx, success in results:
assert success, f"Crawler {idx} failed"
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_parallel_mixed_cached_uncached(self):
"""Mix of cached and uncached crawlers running in parallel.
Uses create_isolated_context=True because parallel crawlers sharing
a single default context will cause navigation conflicts — this is
the expected pattern for concurrent CDP access.
"""
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=20000,
verbose=False,
)
async def crawl_cached():
cfg = BrowserConfig(
cdp_url=CDP_URL, headless=True,
cache_cdp_connection=True, create_isolated_context=True,
)
async with AsyncWebCrawler(config=cfg) as crawler:
r = await crawler.arun(url=TEST_URL, config=run_cfg)
return "cached", r.success
async def crawl_uncached():
cfg = BrowserConfig(
cdp_url=CDP_URL, headless=True,
create_isolated_context=True,
)
async with AsyncWebCrawler(config=cfg) as crawler:
r = await crawler.arun(url=TEST_URL, config=run_cfg)
return "uncached", r.success
tasks = [crawl_cached(), crawl_uncached(), crawl_cached(), crawl_uncached()]
results = await asyncio.gather(*tasks)
for label, success in results:
assert success, f"{label} crawler failed"
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_rapid_open_close_cache(self):
"""Rapidly open and close 10 crawlers sequentially — no leaks/hangs."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
for i in range(10):
async with AsyncWebCrawler(config=cfg) as crawler:
result = await crawler.arun(url=TEST_URL, config=run_cfg)
assert result.success, f"Iteration {i} failed"
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_cache_close_all_idempotent(self):
"""Calling close_all() multiple times doesn't crash."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
async with AsyncWebCrawler(config=cfg) as crawler:
r = await crawler.arun(
url=TEST_URL,
config=CrawlerRunConfig(wait_until="domcontentloaded", page_timeout=15000, verbose=False),
)
assert r.success
await _CDPConnectionCache.close_all()
await _CDPConnectionCache.close_all() # second call must not raise
await _CDPConnectionCache.close_all() # third call must not raise
@pytest.mark.asyncio
async def test_stale_connection_recovery(self):
"""If the cached browser disconnects, next acquire recovers."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
run_cfg = CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=15000,
verbose=False,
)
# Build up a cache entry
c1 = AsyncWebCrawler(config=cfg)
await c1.start()
# Forcibly disconnect the cached browser to simulate staleness
if CDP_URL in _CDPConnectionCache._cache:
_, browser, _ = _CDPConnectionCache._cache[CDP_URL]
try:
await browser.close()
except Exception:
pass
await c1.close()
# Next crawler should detect stale and create a fresh connection
async with AsyncWebCrawler(config=cfg) as crawler:
result = await crawler.arun(url=TEST_URL, config=run_cfg)
assert result.success
await _CDPConnectionCache.close_all()
@pytest.mark.asyncio
async def test_parallel_start_race(self):
"""Multiple crawlers calling start() simultaneously — lock prevents races."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
crawlers = [AsyncWebCrawler(config=cfg) for _ in range(5)]
# Start all at once — this hammers _CDPConnectionCache.acquire() concurrently
await asyncio.gather(*[c.start() for c in crawlers])
# All should have the same browser reference
browsers = set()
for c in crawlers:
bm = c.crawler_strategy.browser_manager
browsers.add(id(bm.browser))
# With caching, they should all share the same browser object
assert len(browsers) == 1, f"Expected 1 shared browser, got {len(browsers)}"
# Ref count should be 5
_, _, count = _CDPConnectionCache._cache[CDP_URL]
assert count == 5
# Close all
await asyncio.gather(*[c.close() for c in crawlers])
# Cache should be empty
assert CDP_URL not in _CDPConnectionCache._cache
@pytest.mark.asyncio
async def test_parallel_close_race(self):
"""Multiple crawlers closing simultaneously — no double-free."""
cfg = BrowserConfig(
cdp_url=CDP_URL,
headless=True,
cache_cdp_connection=True,
)
crawlers = [AsyncWebCrawler(config=cfg) for _ in range(5)]
await asyncio.gather(*[c.start() for c in crawlers])
# Close all at once — hammers _CDPConnectionCache.release() concurrently
await asyncio.gather(*[c.close() for c in crawlers])
# Cache must be clean
assert CDP_URL not in _CDPConnectionCache._cache
# Must still work after everything is closed
async with AsyncWebCrawler(config=cfg) as crawler:
r = await crawler.arun(
url=TEST_URL,
config=CrawlerRunConfig(wait_until="domcontentloaded", page_timeout=15000, verbose=False),
)
assert r.success
await _CDPConnectionCache.close_all()