feat(pipeline): add high-level Crawler utility class for simplified web crawling

Add new Crawler class that provides a simplified interface for both single and batch URL crawling operations. Key features include:
- Simple single URL crawling with configurable options
- Parallel batch crawling with concurrency control
- Shared browser hub support for resource efficiency
- Progress tracking and custom retry strategies
- Comprehensive error handling and retry logic

Remove demo and extended test files in favor of new focused test suite.
This commit is contained in:
UncleCode
2025-04-07 22:50:44 +08:00
parent 67a790b4a6
commit 72d8e679ad
13 changed files with 1039 additions and 1117 deletions

View File

@@ -1,405 +1,163 @@
# test_batch_crawl.py
"""Test the Crawler class for batch crawling capabilities."""
import asyncio
import unittest
from unittest.mock import Mock, patch, AsyncMock
import pytest
from typing import List, Dict, Any, Optional, Tuple
from crawl4ai.pipeline import Pipeline, create_pipeline, batch_crawl
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai import Crawler
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResult, CrawlResultContainer
from crawl4ai.browser_hub_manager import BrowserHubManager
from crawl4ai.browser import BrowserHub
from crawl4ai.cache_context import CacheMode
# Test URLs for crawling
SAFE_URLS = [
"https://example.com",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpstat.us/200",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/comments/1",
"https://iana.org",
"https://www.python.org"
]
# Utility function for tests
async def create_mock_result(url, success=True, status_code=200, html="<html></html>"):
"""Create a mock crawl result for testing"""
result = CrawlResult(
url=url,
html=html,
success=success,
status_code=status_code,
error_message="" if success else f"Error crawling {url}"
# Simple test for batch crawling
@pytest.mark.asyncio
async def test_batch_crawl_simple():
"""Test simple batch crawling with multiple URLs."""
# Use a few test URLs
urls = SAFE_URLS[:3]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
return CrawlResultContainer(result)
# Crawl multiple URLs using batch crawl
results = await Crawler.crawl(
urls,
crawler_config=crawler_config
)
# Verify the results
assert isinstance(results, dict)
assert len(results) == len(urls)
for url in urls:
assert url in results
assert results[url].success
assert results[url].html is not None
# Test parallel batch crawling
@pytest.mark.asyncio
async def test_parallel_batch_crawl():
"""Test parallel batch crawling with multiple URLs."""
# Use several URLs for parallel crawling
urls = SAFE_URLS[:5]
# Basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Crawl in parallel
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
crawler_config=crawler_config
)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
class TestBatchCrawl(unittest.IsolatedAsyncioTestCase):
"""Test cases for the batch_crawl function"""
# Test batch crawling with different configurations
@pytest.mark.asyncio
async def test_batch_crawl_mixed_configs():
"""Test batch crawling with different configurations for different URLs."""
# Create URL batches with different configurations
batch1 = (SAFE_URLS[:2], CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False))
batch2 = (SAFE_URLS[2:4], CrawlerRunConfig(wait_until="networkidle", screenshot=True))
async def asyncSetUp(self):
"""Set up test environment"""
self.logger = AsyncLogger(verbose=False)
self.browser_config = BrowserConfig(headless=True)
self.crawler_config = CrawlerRunConfig()
# URLs for testing
self.test_urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
"https://example.com/4",
"https://example.com/5"
]
# Mock pipeline to avoid actual crawling
self.mock_pipeline = AsyncMock()
self.mock_pipeline.crawl = AsyncMock()
# Set up pipeline to return success for most URLs, but failure for one
async def mock_crawl(url, config=None):
if url == "https://example.com/3":
return await create_mock_result(url, success=False, status_code=404)
return await create_mock_result(url, success=True)
self.mock_pipeline.crawl.side_effect = mock_crawl
# Patch the create_pipeline function
self.create_pipeline_patch = patch(
'crawl4ai.pipeline.create_pipeline',
return_value=self.mock_pipeline
)
self.mock_create_pipeline = self.create_pipeline_patch.start()
# Crawl with mixed configurations
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl([batch1, batch2])
end_time = asyncio.get_event_loop().time()
async def asyncTearDown(self):
"""Clean up after tests"""
self.create_pipeline_patch.stop()
await BrowserHubManager.shutdown_all()
# Extract all URLs
all_urls = batch1[0] + batch2[0]
# === Basic Functionality Tests ===
# Verify results
assert len(results) == len(all_urls)
async def test_simple_batch_with_single_config(self):
"""Test basic batch crawling with one configuration for all URLs"""
# Call the batch_crawl function with a list of URLs and single config
results = await batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config
)
# Verify we got results for all URLs
self.assertEqual(len(results), len(self.test_urls))
# Check that pipeline.crawl was called for each URL
self.assertEqual(self.mock_pipeline.crawl.call_count, len(self.test_urls))
# Check success/failure as expected
success_count = sum(1 for r in results if r.success)
self.assertEqual(success_count, len(self.test_urls) - 1) # All except URL 3
# Verify URLs in results match input URLs
result_urls = sorted([r.url for r in results])
self.assertEqual(result_urls, sorted(self.test_urls))
# Check that screenshots are present only for batch2
for url in batch1[0]:
assert results[url].screenshot is None
async def test_batch_with_crawl_specs(self):
"""Test batch crawling with different configurations per URL"""
# Create different configs for each URL
crawl_specs = [
{"url": url, "crawler_config": CrawlerRunConfig(screenshot=i % 2 == 0)}
for i, url in enumerate(self.test_urls)
]
# Call batch_crawl with crawl specs
results = await batch_crawl(
crawl_specs=crawl_specs,
browser_config=self.browser_config
)
# Verify results
self.assertEqual(len(results), len(crawl_specs))
# Verify each URL was crawled with its specific config
for i, spec in enumerate(crawl_specs):
call_args = self.mock_pipeline.crawl.call_args_list[i]
self.assertEqual(call_args[1]['url'], spec['url'])
self.assertEqual(
call_args[1]['config'].screenshot,
spec['crawler_config'].screenshot
)
for url in batch2[0]:
assert results[url].screenshot is not None
# === Advanced Configuration Tests ===
print(f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s")
# Test shared browser hub
@pytest.mark.asyncio
async def test_batch_crawl_shared_hub():
"""Test batch crawling with a shared browser hub."""
# Create and initialize a browser hub
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
async def test_with_multiple_browser_configs(self):
"""Test using different browser configurations for different URLs"""
# Create different browser configs
browser_config1 = BrowserConfig(headless=True, browser_type="chromium")
browser_config2 = BrowserConfig(headless=True, browser_type="firefox")
# Create crawl specs with different browser configs
crawl_specs = [
{
"url": self.test_urls[0],
"browser_config": browser_config1,
"crawler_config": self.crawler_config
},
{
"url": self.test_urls[1],
"browser_config": browser_config2,
"crawler_config": self.crawler_config
}
]
# Call batch_crawl with mixed browser configs
results = await batch_crawl(crawl_specs=crawl_specs)
# Verify results
self.assertEqual(len(results), len(crawl_specs))
# Verify create_pipeline was called with different browser configs
self.assertEqual(self.mock_create_pipeline.call_count, 2)
# Check call arguments for create_pipeline
call_args_list = self.mock_create_pipeline.call_args_list
self.assertEqual(call_args_list[0][1]['browser_config'], browser_config1)
self.assertEqual(call_args_list[1][1]['browser_config'], browser_config2)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
max_browsers_per_config=3,
max_pages_per_browser=4,
initial_pool_size=1
)
async def test_with_existing_browser_hub(self):
"""Test using a pre-initialized browser hub"""
# Create a mock browser hub
mock_hub = AsyncMock()
try:
# Use the hub for parallel crawling
urls = SAFE_URLS[:3]
# Call batch_crawl with browser hub
results = await batch_crawl(
urls=self.test_urls,
browser_hub=mock_hub,
crawler_config=self.crawler_config
)
# Verify create_pipeline was called with the browser hub
self.mock_create_pipeline.assert_called_with(
browser_hub=mock_hub,
logger=self.logger
)
# Verify results
self.assertEqual(len(results), len(self.test_urls))
# === Error Handling and Retry Tests ===
async def test_retry_on_failure(self):
"""Test retrying failed URLs up to max_tries"""
# Modify mock to fail first 2 times for URL 3, then succeed
attempt_counts = {url: 0 for url in self.test_urls}
async def mock_crawl_with_retries(url, config=None):
attempt_counts[url] += 1
if url == "https://example.com/3" and attempt_counts[url] <= 2:
return await create_mock_result(url, success=False, status_code=500)
return await create_mock_result(url, success=True)
self.mock_pipeline.crawl.side_effect = mock_crawl_with_retries
# Call batch_crawl with retry configuration
results = await batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config,
max_tries=3
)
# Verify all URLs succeeded after retries
self.assertTrue(all(r.success for r in results))
# Check retry count for URL 3
self.assertEqual(attempt_counts["https://example.com/3"], 3)
# Check other URLs were only tried once
for url in self.test_urls:
if url != "https://example.com/3":
self.assertEqual(attempt_counts[url], 1)
async def test_give_up_after_max_tries(self):
"""Test that crawling gives up after max_tries"""
# Modify mock to always fail for URL 3
async def mock_crawl_always_fail(url, config=None):
if url == "https://example.com/3":
return await create_mock_result(url, success=False, status_code=500)
return await create_mock_result(url, success=True)
self.mock_pipeline.crawl.side_effect = mock_crawl_always_fail
# Call batch_crawl with retry configuration
results = await batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config,
max_tries=3
)
# Find result for URL 3
url3_result = next(r for r in results if r.url == "https://example.com/3")
# Verify URL 3 still failed after max retries
self.assertFalse(url3_result.success)
# Verify retry metadata is present (assuming we add this to the result)
self.assertEqual(url3_result.attempt_count, 3)
self.assertTrue(hasattr(url3_result, 'retry_error_messages'))
async def test_exception_during_crawl(self):
"""Test handling exceptions during crawling"""
# Modify mock to raise exception for URL 4
async def mock_crawl_with_exception(url, config=None):
if url == "https://example.com/4":
raise RuntimeError("Simulated crawler exception")
return await create_mock_result(url, success=True)
self.mock_pipeline.crawl.side_effect = mock_crawl_with_exception
# Call batch_crawl
results = await batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config
)
# Verify we still get results for all URLs
self.assertEqual(len(results), len(self.test_urls))
# Find result for URL 4
url4_result = next(r for r in results if r.url == "https://example.com/4")
# Verify URL 4 is marked as failed
self.assertFalse(url4_result.success)
# Verify exception info is captured
self.assertIn("Simulated crawler exception", url4_result.error_message)
# === Performance and Control Tests ===
async def test_concurrency_limit(self):
"""Test limiting concurrent crawls"""
# Create a slow mock crawl function to test concurrency
crawl_started = {url: asyncio.Event() for url in self.test_urls}
crawl_proceed = {url: asyncio.Event() for url in self.test_urls}
async def slow_mock_crawl(url, config=None):
crawl_started[url].set()
await crawl_proceed[url].wait()
return await create_mock_result(url)
self.mock_pipeline.crawl.side_effect = slow_mock_crawl
# Start batch_crawl with concurrency limit of 2
task = asyncio.create_task(
batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config,
concurrency=2
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
browser_hub=browser_hub,
crawler_config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
)
end_time = asyncio.get_event_loop().time()
# Wait for first 2 crawls to start
await asyncio.wait(
[crawl_started[self.test_urls[0]].wait(),
crawl_started[self.test_urls[1]].wait()],
timeout=1
)
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
# Verify only 2 crawls started
started_count = sum(1 for url in self.test_urls if crawl_started[url].is_set())
self.assertEqual(started_count, 2)
print(f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
print(f"Success rate: {successful}/{len(urls)}")
# Allow first crawl to complete
crawl_proceed[self.test_urls[0]].set()
# Get browser hub statistics
hub_stats = await browser_hub.get_pool_status()
print(f"Browser hub stats: {hub_stats}")
# Wait for next crawl to start
await asyncio.wait([crawl_started[self.test_urls[2]].wait()], timeout=1)
# At least 80% should succeed
assert successful / len(urls) >= 0.8
# Now 3 total should have started (2 running, 1 completed)
started_count = sum(1 for url in self.test_urls if crawl_started[url].is_set())
self.assertEqual(started_count, 3)
# Allow all remaining crawls to complete
for url in self.test_urls:
crawl_proceed[url].set()
# Wait for batch_crawl to complete
results = await task
# Verify all URLs were crawled
self.assertEqual(len(results), len(self.test_urls))
async def test_cancel_batch_crawl(self):
"""Test cancelling a batch crawl operation"""
# Create a crawl function that won't complete unless signaled
crawl_started = {url: asyncio.Event() for url in self.test_urls}
async def endless_mock_crawl(url, config=None):
crawl_started[url].set()
# This will wait forever unless cancelled
await asyncio.Future()
self.mock_pipeline.crawl.side_effect = endless_mock_crawl
# Start batch_crawl
task = asyncio.create_task(
batch_crawl(
urls=self.test_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config
)
)
# Wait for at least one crawl to start
await asyncio.wait(
[crawl_started[self.test_urls[0]].wait()],
timeout=1
)
# Cancel the task
task.cancel()
# Verify task was cancelled
with self.assertRaises(asyncio.CancelledError):
await task
# === Edge Cases Tests ===
async def test_empty_url_list(self):
"""Test behavior with empty URL list"""
results = await batch_crawl(
urls=[],
browser_config=self.browser_config,
crawler_config=self.crawler_config
)
# Should return empty list
self.assertEqual(results, [])
# Verify crawl wasn't called
self.mock_pipeline.crawl.assert_not_called()
async def test_mix_of_valid_and_invalid_urls(self):
"""Test with a mix of valid and invalid URLs"""
# Include some invalid URLs
mixed_urls = [
"https://example.com/valid",
"invalid-url",
"http:/missing-slash",
"https://example.com/valid2"
]
# Call batch_crawl
results = await batch_crawl(
urls=mixed_urls,
browser_config=self.browser_config,
crawler_config=self.crawler_config
)
# Should have results for all URLs
self.assertEqual(len(results), len(mixed_urls))
# Check invalid URLs were marked as failed
for result in results:
if result.url in ["invalid-url", "http:/missing-slash"]:
self.assertFalse(result.success)
self.assertIn("Invalid URL", result.error_message)
else:
self.assertTrue(result.success)
if __name__ == "__main__":
unittest.main()
finally:
# Clean up the browser hub
await browser_hub.close()

View File

@@ -0,0 +1,447 @@
# test_crawler.py
import asyncio
import warnings
import pytest
import pytest_asyncio
from typing import Optional, Tuple
# Define test fixtures
@pytest_asyncio.fixture
async def clean_browser_hub():
"""Fixture to ensure clean browser hub state between tests."""
# Yield control to the test
yield
# After test, cleanup all browser hubs
from crawl4ai.browser import BrowserHub
try:
await BrowserHub.shutdown_all()
except Exception as e:
print(f"Error during browser cleanup: {e}")
from crawl4ai import Crawler
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResultContainer
from crawl4ai.browser import BrowserHub
from crawl4ai.cache_context import CacheMode
import warnings
from pydantic import PydanticDeprecatedSince20
# Test URLs for crawling
SAFE_URLS = [
"https://example.com",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpstat.us/200",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/comments/1",
"https://iana.org",
"https://www.python.org",
]
class TestCrawlerBasic:
"""Basic tests for the Crawler utility class"""
@pytest.mark.asyncio
async def test_simple_crawl_single_url(self, clean_browser_hub):
"""Test crawling a single URL with default configuration"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=Warning)
# Basic logger
logger = AsyncLogger(verbose=True)
# Basic single URL crawl with default configuration
url = "https://example.com"
result = await Crawler.crawl(url)
# Verify the result
assert isinstance(result, CrawlResultContainer)
assert result.success
assert result.url == url
assert result.html is not None
assert len(result.html) > 0
@pytest.mark.asyncio
async def test_crawl_with_custom_config(self, clean_browser_hub):
"""Test crawling with custom browser and crawler configuration"""
# Custom browser config
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1280,
viewport_height=800,
)
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="networkidle", screenshot=True
)
# Crawl with custom configuration
url = "https://httpbin.org/html"
result = await Crawler.crawl(
url, browser_config=browser_config, crawler_config=crawler_config
)
# Verify the result
assert result.success
assert result.url == url
assert result.screenshot is not None
@pytest.mark.asyncio
async def test_crawl_multiple_urls_sequential(self, clean_browser_hub):
"""Test crawling multiple URLs sequentially"""
# Use a few test URLs
urls = SAFE_URLS[:3]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Crawl multiple URLs sequentially
results = await Crawler.crawl(urls, crawler_config=crawler_config)
# Verify the results
assert isinstance(results, dict)
assert len(results) == len(urls)
for url in urls:
assert url in results
assert results[url].success
assert results[url].html is not None
@pytest.mark.asyncio
async def test_crawl_with_error_handling(self, clean_browser_hub):
"""Test error handling during crawling"""
# Include a valid URL and a non-existent URL
urls = ["https://example.com", "https://non-existent-domain-123456789.com"]
# Crawl with retries
results = await Crawler.crawl(urls, max_retries=2, retry_delay=1.0)
# Verify results for both URLs
assert len(results) == 2
# Valid URL should succeed
assert results[urls[0]].success
# Invalid URL should fail but be in results
assert urls[1] in results
assert not results[urls[1]].success
assert results[urls[1]].error_message is not None
class TestCrawlerParallel:
"""Tests for the parallel crawling capabilities of Crawler"""
@pytest.mark.asyncio
async def test_parallel_crawl_simple(self, clean_browser_hub):
"""Test basic parallel crawling with same configuration"""
# Use several URLs for parallel crawling
urls = SAFE_URLS[:5]
# Basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Crawl in parallel with default concurrency
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(urls, crawler_config=crawler_config)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
@pytest.mark.asyncio
async def test_parallel_crawl_with_concurrency_limit(self, clean_browser_hub):
"""Test parallel crawling with concurrency limit"""
# Use more URLs to test concurrency control
urls = SAFE_URLS[:8]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Limited concurrency
concurrency = 2
# Time the crawl
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls, crawler_config=crawler_config, concurrency=concurrency
)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Parallel crawl with concurrency={concurrency} of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
@pytest.mark.asyncio
async def test_parallel_crawl_with_different_configs(self, clean_browser_hub):
"""Test parallel crawling with different configurations for different URLs"""
# Create URL batches with different configurations
batch1 = (
SAFE_URLS[:2],
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
)
batch2 = (
SAFE_URLS[2:4],
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
)
batch3 = (
SAFE_URLS[4:6],
CrawlerRunConfig(wait_until="load", scan_full_page=True),
)
# Crawl with mixed configurations
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl([batch1, batch2, batch3])
end_time = asyncio.get_event_loop().time()
# Extract all URLs
all_urls = batch1[0] + batch2[0] + batch3[0]
# Verify results
assert len(results) == len(all_urls)
# Check that screenshots are present only for batch2
for url in batch1[0]:
assert not results[url].screenshot
for url in batch2[0]:
assert results[url].screenshot
print(
f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s"
)
@pytest.mark.asyncio
async def test_parallel_crawl_with_shared_browser_hub(self, clean_browser_hub):
"""Test parallel crawling with a shared browser hub"""
# Create and initialize a browser hub
browser_config = BrowserConfig(browser_type="chromium", headless=True)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
max_browsers_per_config=3,
max_pages_per_browser=4,
initial_pool_size=1,
)
try:
# Use the hub for parallel crawling
urls = SAFE_URLS[:6]
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
browser_hub=browser_hub,
crawler_config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
),
)
end_time = asyncio.get_event_loop().time()
# Verify results
# assert (len(results), len(urls))
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# Get browser hub statistics
hub_stats = await browser_hub.get_pool_status()
print(f"Browser hub stats: {hub_stats}")
# At least 80% should succeed
# assert (successful / len(urls), 0.8)
assert successful / len(urls) >= 0.8
finally:
# Clean up the browser hub
await browser_hub.close()
class TestCrawlerAdvanced:
"""Advanced tests for the Crawler utility class"""
@pytest.mark.asyncio
async def test_crawl_with_customized_batch_config(self, clean_browser_hub):
"""Test crawling with fully customized batch configuration"""
# Create URL batches with different browser and crawler configurations
browser_config1 = BrowserConfig(browser_type="chromium", headless=True)
browser_config2 = BrowserConfig(
browser_type="chromium", headless=False, viewport_width=1920
)
crawler_config1 = CrawlerRunConfig(wait_until="domcontentloaded")
crawler_config2 = CrawlerRunConfig(wait_until="networkidle", screenshot=True)
batch1 = (SAFE_URLS[:2], browser_config1, crawler_config1)
batch2 = (SAFE_URLS[2:4], browser_config2, crawler_config2)
# Crawl with mixed configurations
results = await Crawler.parallel_crawl([batch1, batch2])
# Extract all URLs
all_urls = batch1[0] + batch2[0]
# Verify results
# assert (len(results), len(all_urls))
assert len(results) == len(all_urls)
# Verify batch-specific processing
for url in batch1[0]:
assert results[url].screenshot is None # No screenshots for batch1
for url in batch2[0]:
assert results[url].screenshot is not None # Should have screenshots for batch2
@pytest.mark.asyncio
async def test_crawl_with_progress_callback(self, clean_browser_hub):
"""Test crawling with progress callback"""
# Use several URLs
urls = SAFE_URLS[:5]
# Track progress
progress_data = {"started": 0, "completed": 0, "failed": 0, "updates": []}
# Progress callback
async def on_progress(
status: str, url: str, result: Optional[CrawlResultContainer] = None
):
if status == "started":
progress_data["started"] += 1
elif status == "completed":
progress_data["completed"] += 1
if not result.success:
progress_data["failed"] += 1
progress_data["updates"].append((status, url))
print(f"Progress: {status} - {url}")
# Crawl with progress tracking
results = await Crawler.parallel_crawl(
urls,
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
progress_callback=on_progress,
)
# Verify progress tracking
assert progress_data["started"] == len(urls)
assert progress_data["completed"] == len(urls)
assert len(progress_data["updates"]) == len(urls) * 2 # start + complete events
@pytest.mark.asyncio
async def test_crawl_with_dynamic_retry_strategy(self, clean_browser_hub):
"""Test crawling with a dynamic retry strategy"""
# Include URLs that might fail
urls = [
"https://example.com",
"https://httpstat.us/500",
"https://httpstat.us/404",
]
# Custom retry strategy
async def retry_strategy(
url: str, attempt: int, error: Exception
) -> Tuple[bool, float]:
# Only retry 500 errors, not 404s
if "500" in url:
return True, 1.0 # Retry with 1 second delay
return False, 0.0 # Don't retry other errors
# Crawl with custom retry strategy
results = await Crawler.parallel_crawl(
urls,
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
retry_strategy=retry_strategy,
max_retries=3,
)
# Verify results
assert len(results) == len(urls)
# Example.com should succeed
assert results[urls[0]].success
# httpstat.us pages return content even for error status codes
# so our crawler marks them as successful since it got HTML content
# Verify that we got the expected status code
assert results[urls[1]].status_code == 500
# 404 should have the correct status code
assert results[urls[2]].status_code == 404
@pytest.mark.asyncio
async def test_crawl_with_very_large_batch(self, clean_browser_hub):
"""Test crawling with a very large batch of URLs"""
# Create a batch by repeating our safe URLs
# Note: In a real test, we'd use more URLs, but for simplicity we'll use a smaller set
large_batch = list(dict.fromkeys(SAFE_URLS[:5] * 2)) # ~10 unique URLs
# Set a reasonable concurrency limit
concurrency = 10
# Time the crawl
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
large_batch,
crawler_config=CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=10000, # Shorter timeout for large batch
),
concurrency=concurrency,
)
end_time = asyncio.get_event_loop().time()
# Verify results
# assert (len(results), len(large_batch))
assert len(results) == len(large_batch)
successful = sum(1 for r in results.values() if r.success)
print(
f"Large batch crawl of {len(large_batch)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(large_batch)}")
print(
f"Average time per URL: {(end_time - start_time) / len(large_batch):.2f}s"
)
# At least 80% should succeed (from our unique URLs)
assert successful / len(results) >= 0.8
if __name__ == "__main__":
# Use pytest for async tests
pytest.main(["-xvs", __file__])