Compare commits

...

3 Commits

Author SHA1 Message Date
UncleCode
72d8e679ad feat(pipeline): add high-level Crawler utility class for simplified web crawling
Add new Crawler class that provides a simplified interface for both single and batch URL crawling operations. Key features include:
- Simple single URL crawling with configurable options
- Parallel batch crawling with concurrency control
- Shared browser hub support for resource efficiency
- Progress tracking and custom retry strategies
- Comprehensive error handling and retry logic

Remove demo and extended test files in favor of new focused test suite.
2025-04-07 22:50:44 +08:00
UncleCode
67a790b4a6 Add test file for Pipeline batch crawl. 2025-04-06 19:38:31 +08:00
UncleCode
d95b2dc9f2 Some refactoring, movie pipelin submodule folder into the main. 2025-04-06 18:28:28 +08:00
16 changed files with 2991 additions and 9 deletions

View File

@@ -4,6 +4,12 @@ import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig
from .pipeline.pipeline import (
Pipeline,
create_pipeline,
)
from .pipeline.crawler import Crawler
from .content_scraping_strategy import (
ContentScrapingStrategy,
WebScrapingStrategy,
@@ -65,7 +71,14 @@ from .deep_crawling import (
DeepCrawlDecorator,
)
from .async_crawler_strategy import AsyncPlaywrightCrawlerStrategy, AsyncHTTPCrawlerStrategy
__all__ = [
"Pipeline",
"AsyncPlaywrightCrawlerStrategy",
"AsyncHTTPCrawlerStrategy",
"create_pipeline",
"Crawler",
"AsyncLoggerBase",
"AsyncLogger",
"AsyncWebCrawler",

View File

@@ -270,7 +270,7 @@ class BrowserConfig:
host: str = "localhost",
):
self.browser_type = browser_type
self.headless = headless or True
self.headless = headless
self.browser_mode = browser_mode
self.use_managed_browser = use_managed_browser
self.cdp_url = cdp_url

View File

@@ -625,7 +625,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Error:
visibility_info = await self.check_visibility(page)
if self.config.verbose:
if config.verbose:
self.logger.debug(
message="Body visibility info: {info}",
tag="DEBUG",

View File

@@ -9,6 +9,7 @@ from .profiles import BrowserProfileManager
from .models import DockerConfig
from .docker_registry import DockerRegistry
from .docker_utils import DockerUtils
from .browser_hub import BrowserHub
from .strategies import (
BaseBrowserStrategy,
PlaywrightBrowserStrategy,
@@ -19,4 +20,4 @@ from .strategies import (
__all__ = ['BrowserManager', 'BrowserProfileManager', 'DockerConfig', 'DockerRegistry', 'DockerUtils', 'BaseBrowserStrategy',
'PlaywrightBrowserStrategy', 'CDPBrowserStrategy', 'BuiltinBrowserStrategy',
'DockerBrowserStrategy']
'DockerBrowserStrategy', 'BrowserHub']

View File

@@ -672,7 +672,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
block["error"] = False
except Exception:
parsed, unparsed = split_and_parse_json_objects(
response.choices[0].message.content
response
)
blocks = parsed
if unparsed:

View File

@@ -1,4 +1,4 @@
from pydantic import BaseModel, HttpUrl, PrivateAttr
from pydantic import BaseModel, HttpUrl, PrivateAttr, ConfigDict
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from typing import AsyncGenerator
from typing import Generic, TypeVar
@@ -146,8 +146,9 @@ class CrawlResult(BaseModel):
dispatch_result: Optional[DispatchResult] = None
redirected_url: Optional[str] = None
class Config:
arbitrary_types_allowed = True
model_config = ConfigDict(arbitrary_types_allowed=True)
# class Config:
# arbitrary_types_allowed = True
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
# and model_dump override all exist to support a smooth transition from markdown as a string
@@ -312,8 +313,9 @@ class AsyncCrawlResponse(BaseModel):
ssl_certificate: Optional[SSLCertificate] = None
redirected_url: Optional[str] = None
class Config:
arbitrary_types_allowed = True
model_config = ConfigDict(arbitrary_types_allowed=True)
# class Config:
# arbitrary_types_allowed = True
###############################
# Scraping Models

View File

@@ -0,0 +1,6 @@
"""Pipeline module providing high-level crawling functionality."""
from .pipeline import Pipeline, create_pipeline
from .crawler import Crawler
__all__ = ["Pipeline", "create_pipeline", "Crawler"]

View File

@@ -0,0 +1,406 @@
"""Crawler utility class for simplified crawling operations.
This module provides a high-level utility class for crawling web pages
with support for both single and multiple URL processing.
"""
import asyncio
from typing import Dict, List, Optional, Tuple, Union, Callable
from crawl4ai.models import CrawlResultContainer, CrawlResult
from crawl4ai.pipeline.pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.browser.browser_hub import BrowserHub
# Type definitions
UrlList = List[str]
UrlBatch = Tuple[List[str], CrawlerRunConfig]
UrlFullBatch = Tuple[List[str], BrowserConfig, CrawlerRunConfig]
BatchType = Union[UrlList, UrlBatch, UrlFullBatch]
ProgressCallback = Callable[[str, str, Optional[CrawlResultContainer]], None]
RetryStrategy = Callable[[str, int, Exception], Tuple[bool, float]]
class Crawler:
"""High-level utility class for crawling web pages.
This class provides simplified methods for crawling both single URLs
and batches of URLs, with parallel processing capabilities.
"""
@classmethod
async def crawl(
cls,
urls: Union[str, List[str]],
browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_hub: Optional[BrowserHub] = None,
logger: Optional[AsyncLogger] = None,
max_retries: int = 0,
retry_delay: float = 1.0,
use_new_loop: bool = True # By default use a new loop for safety
) -> Union[CrawlResultContainer, Dict[str, CrawlResultContainer]]:
"""Crawl one or more URLs with the specified configurations.
Args:
urls: Single URL or list of URLs to crawl
browser_config: Optional browser configuration
crawler_config: Optional crawler run configuration
browser_hub: Optional shared browser hub
logger: Optional logger instance
max_retries: Maximum number of retries for failed requests
retry_delay: Delay between retries in seconds
Returns:
For a single URL: CrawlResultContainer with crawl results
For multiple URLs: Dict mapping URLs to their CrawlResultContainer results
"""
# Handle single URL case
if isinstance(urls, str):
return await cls._crawl_single_url(
urls,
browser_config,
crawler_config,
browser_hub,
logger,
max_retries,
retry_delay,
use_new_loop
)
# Handle multiple URLs case (sequential processing)
results = {}
for url in urls:
results[url] = await cls._crawl_single_url(
url,
browser_config,
crawler_config,
browser_hub,
logger,
max_retries,
retry_delay,
use_new_loop
)
return results
@classmethod
async def _crawl_single_url(
cls,
url: str,
browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_hub: Optional[BrowserHub] = None,
logger: Optional[AsyncLogger] = None,
max_retries: int = 0,
retry_delay: float = 1.0,
use_new_loop: bool = False
) -> CrawlResultContainer:
"""Internal method to crawl a single URL with retry logic."""
# Create a logger if none provided
if logger is None:
logger = AsyncLogger(verbose=True)
# Create or use the provided crawler config
if crawler_config is None:
crawler_config = CrawlerRunConfig()
attempts = 0
last_error = None
# For testing purposes, each crawler gets a new event loop to avoid conflicts
# This is especially important in test suites where multiple tests run in sequence
if use_new_loop:
old_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while attempts <= max_retries:
try:
# Create a pipeline
pipeline_args = {}
if browser_config:
pipeline_args["browser_config"] = browser_config
if browser_hub:
pipeline_args["browser_hub"] = browser_hub
if logger:
pipeline_args["logger"] = logger
pipeline = await create_pipeline(**pipeline_args)
# Perform the crawl
result = await pipeline.crawl(url=url, config=crawler_config)
# Close the pipeline if we created it (not using a shared hub)
if not browser_hub:
await pipeline.close()
# Restore the original event loop if we created a new one
if use_new_loop:
asyncio.set_event_loop(old_loop)
loop.close()
return result
except Exception as e:
last_error = e
attempts += 1
if attempts <= max_retries:
logger.warning(
message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...",
tag="RETRY",
params={
"attempt": attempts,
"url": url,
"error": str(e),
"delay": retry_delay
}
)
await asyncio.sleep(retry_delay)
else:
logger.error(
message="All {attempts} crawl attempts failed for {url}: {error}",
tag="FAILED",
params={
"attempts": attempts,
"url": url,
"error": str(e)
}
)
# If we get here, all attempts failed
result = CrawlResultContainer(
CrawlResult(
url=url,
html="",
success=False,
error_message=f"All {attempts} crawl attempts failed: {str(last_error)}"
)
)
# Restore the original event loop if we created a new one
if use_new_loop:
asyncio.set_event_loop(old_loop)
loop.close()
return result
@classmethod
async def parallel_crawl(
cls,
url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]],
browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_hub: Optional[BrowserHub] = None,
logger: Optional[AsyncLogger] = None,
concurrency: int = 5,
max_retries: int = 0,
retry_delay: float = 1.0,
retry_strategy: Optional[RetryStrategy] = None,
progress_callback: Optional[ProgressCallback] = None,
use_new_loop: bool = True # By default use a new loop for safety
) -> Dict[str, CrawlResultContainer]:
"""Crawl multiple URLs in parallel with concurrency control.
Args:
url_batches: List of URLs or list of URL batches with configurations
browser_config: Default browser configuration (used if not in batch)
crawler_config: Default crawler configuration (used if not in batch)
browser_hub: Optional shared browser hub for resource efficiency
logger: Optional logger instance
concurrency: Maximum number of concurrent crawls
max_retries: Maximum number of retries for failed requests
retry_delay: Delay between retries in seconds
retry_strategy: Optional custom retry strategy function
progress_callback: Optional callback for progress reporting
Returns:
Dict mapping URLs to their CrawlResultContainer results
"""
# Create a logger if none provided
if logger is None:
logger = AsyncLogger(verbose=True)
# For testing purposes, each crawler gets a new event loop to avoid conflicts
# This is especially important in test suites where multiple tests run in sequence
if use_new_loop:
old_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Process batches to consistent format
processed_batches = cls._process_url_batches(
url_batches, browser_config, crawler_config
)
# Initialize results dictionary
results = {}
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(concurrency)
# Create shared browser hub if not provided
shared_hub = browser_hub
if not shared_hub:
shared_hub = await BrowserHub.get_browser_manager(
config=browser_config or BrowserConfig(),
logger=logger,
max_browsers_per_config=concurrency,
max_pages_per_browser=1,
initial_pool_size=min(concurrency, 3) # Start with a reasonable number
)
try:
# Create worker function for each URL
async def process_url(url, b_config, c_config):
async with semaphore:
# Report start if callback provided
if progress_callback:
await progress_callback("started", url)
attempts = 0
last_error = None
while attempts <= max_retries:
try:
# Create a pipeline using the shared hub
pipeline = await create_pipeline(
browser_config=b_config,
browser_hub=shared_hub,
logger=logger
)
# Perform the crawl
result = await pipeline.crawl(url=url, config=c_config)
# Report completion if callback provided
if progress_callback:
await progress_callback("completed", url, result)
return url, result
except Exception as e:
last_error = e
attempts += 1
# Determine if we should retry and with what delay
should_retry = attempts <= max_retries
delay = retry_delay
# Use custom retry strategy if provided
if retry_strategy and should_retry:
try:
should_retry, delay = await retry_strategy(url, attempts, e)
except Exception as strategy_error:
logger.error(
message="Error in retry strategy: {error}",
tag="RETRY",
params={"error": str(strategy_error)}
)
if should_retry:
logger.warning(
message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...",
tag="RETRY",
params={
"attempt": attempts,
"url": url,
"error": str(e),
"delay": delay
}
)
await asyncio.sleep(delay)
else:
logger.error(
message="All {attempts} crawl attempts failed for {url}: {error}",
tag="FAILED",
params={
"attempts": attempts,
"url": url,
"error": str(e)
}
)
break
# If we get here, all attempts failed
error_result = CrawlResultContainer(
CrawlResult(
url=url,
html="",
success=False,
error_message=f"All {attempts} crawl attempts failed: {str(last_error)}"
)
)
# Report completion with error if callback provided
if progress_callback:
await progress_callback("completed", url, error_result)
return url, error_result
# Create tasks for all URLs
tasks = []
for urls, b_config, c_config in processed_batches:
for url in urls:
tasks.append(process_url(url, b_config, c_config))
# Run all tasks and collect results
for completed_task in asyncio.as_completed(tasks):
url, result = await completed_task
results[url] = result
return results
finally:
# Clean up the hub only if we created it
if not browser_hub and shared_hub:
await shared_hub.close()
# Restore the original event loop if we created a new one
if use_new_loop:
asyncio.set_event_loop(old_loop)
loop.close()
@classmethod
def _process_url_batches(
cls,
url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]],
default_browser_config: Optional[BrowserConfig],
default_crawler_config: Optional[CrawlerRunConfig]
) -> List[Tuple[List[str], BrowserConfig, CrawlerRunConfig]]:
"""Process URL batches into a consistent format.
Converts various input formats into a consistent list of
(urls, browser_config, crawler_config) tuples.
"""
processed_batches = []
# Handle case where input is just a list of URLs
if all(isinstance(item, str) for item in url_batches):
urls = url_batches
browser_config = default_browser_config or BrowserConfig()
crawler_config = default_crawler_config or CrawlerRunConfig()
processed_batches.append((urls, browser_config, crawler_config))
return processed_batches
# Process each batch
for batch in url_batches:
# Handle case: (urls, crawler_config)
if len(batch) == 2 and isinstance(batch[1], CrawlerRunConfig):
urls, c_config = batch
b_config = default_browser_config or BrowserConfig()
processed_batches.append((urls, b_config, c_config))
# Handle case: (urls, browser_config, crawler_config)
elif len(batch) == 3 and isinstance(batch[1], BrowserConfig) and isinstance(batch[2], CrawlerRunConfig):
processed_batches.append(batch)
# Fallback for unknown formats - assume it's just a list of URLs
else:
urls = batch
browser_config = default_browser_config or BrowserConfig()
crawler_config = default_crawler_config or CrawlerRunConfig()
processed_batches.append((urls, browser_config, crawler_config))
return processed_batches

View File

@@ -0,0 +1,702 @@
import time
import sys
from typing import Dict, Any, List
import json
from crawl4ai.models import (
CrawlResult,
MarkdownGenerationResult,
ScrapingResult,
CrawlResultContainer,
)
from crawl4ai.async_database import async_db_manager
from crawl4ai.cache_context import CacheMode, CacheContext
from crawl4ai.utils import (
sanitize_input_encode,
InvalidCSSSelectorError,
fast_format_html,
create_box_message,
get_error_context,
)
async def initialize_context_middleware(context: Dict[str, Any]) -> int:
"""Initialize the context with basic configuration and validation"""
url = context.get("url")
config = context.get("config")
if not isinstance(url, str) or not url:
context["error_message"] = "Invalid URL, make sure the URL is a non-empty string"
return 0
# Default to ENABLED if no cache mode specified
if config.cache_mode is None:
config.cache_mode = CacheMode.ENABLED
# Create cache context
context["cache_context"] = CacheContext(url, config.cache_mode, False)
context["start_time"] = time.perf_counter()
return 1
# middlewares.py additions
async def browser_hub_middleware(context: Dict[str, Any]) -> int:
"""
Initialize or connect to a Browser-Hub and add it to the pipeline context.
This middleware handles browser hub initialization for all three scenarios:
1. Default configuration when nothing is specified
2. Custom configuration when browser_config is provided
3. Connection to existing hub when browser_hub_connection is provided
Args:
context: The pipeline context dictionary
Returns:
int: 1 for success, 0 for failure
"""
from crawl4ai.browser.browser_hub import BrowserHub
try:
# Get configuration from context
browser_config = context.get("browser_config")
browser_hub_id = context.get("browser_hub_id")
browser_hub_connection = context.get("browser_hub_connection")
logger = context.get("logger")
# If we already have a browser hub in context, use it
if context.get("browser_hub"):
return 1
# Get or create Browser-Hub
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id=browser_hub_id,
connection_info=browser_hub_connection,
logger=logger
)
# Add to context
context["browser_hub"] = browser_hub
return 1
except Exception as e:
context["error_message"] = f"Failed to initialize browser hub: {str(e)}"
return 0
async def fetch_content_middleware(context: Dict[str, Any]) -> int:
"""
Fetch content from the web using the browser hub.
This middleware uses the browser hub to get pages for crawling,
and properly releases them back to the pool when done.
Args:
context: The pipeline context dictionary
Returns:
int: 1 for success, 0 for failure
"""
url = context.get("url")
config = context.get("config")
browser_hub = context.get("browser_hub")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
try:
# Create crawler strategy without initializing its browser manager
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_hub.config if browser_hub else None,
logger=logger
)
# Replace the browser manager with our shared instance
crawler_strategy.browser_manager = browser_hub
# Perform crawl without trying to initialize the browser
# The crawler will use the provided browser_manager to get pages
async_response = await crawler_strategy.crawl(url, config=config)
# Store results in context
context["html"] = async_response.html
context["screenshot_data"] = async_response.screenshot
context["pdf_data"] = async_response.pdf_data
context["js_execution_result"] = async_response.js_execution_result
context["async_response"] = async_response
return 1
except Exception as e:
context["error_message"] = f"Error fetching content: {str(e)}"
return 0
async def check_cache_middleware(context: Dict[str, Any]) -> int:
"""Check if there's a cached result and load it if available"""
url = context.get("url")
config = context.get("config")
cache_context = context.get("cache_context")
logger = context.get("logger")
# Initialize variables
context["cached_result"] = None
context["html"] = None
context["extracted_content"] = None
context["screenshot_data"] = None
context["pdf_data"] = None
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
context["cached_result"] = cached_result
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(cached_result.extracted_content or "")
extracted_content = None if not extracted_content or extracted_content == "[]" else extracted_content
# If screenshot is requested but its not in cache, then set cache_result to None
screenshot_data = cached_result.screenshot
pdf_data = cached_result.pdf
if config.screenshot and not screenshot_data:
context["cached_result"] = None
if config.pdf and not pdf_data:
context["cached_result"] = None
context["html"] = html
context["extracted_content"] = extracted_content
context["screenshot_data"] = screenshot_data
context["pdf_data"] = pdf_data
logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - context["start_time"],
tag="FETCH",
)
return 1
async def configure_proxy_middleware(context: Dict[str, Any]) -> int:
"""Configure proxy if a proxy rotation strategy is available"""
config = context.get("config")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
# Update proxy configuration from rotation strategy if available
if config and config.proxy_rotation_strategy:
next_proxy = await config.proxy_rotation_strategy.get_next_proxy()
if next_proxy:
logger.info(
message="Switch proxy: {proxy}",
tag="PROXY",
params={"proxy": next_proxy.server},
)
config.proxy_config = next_proxy
return 1
async def check_robots_txt_middleware(context: Dict[str, Any]) -> int:
"""Check if the URL is allowed by robots.txt if enabled"""
url = context.get("url")
config = context.get("config")
browser_config = context.get("browser_config")
robots_parser = context.get("robots_parser")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await robots_parser.can_fetch(url, browser_config.user_agent):
context["crawl_result"] = CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
)
return 0
return 1
async def fetch_content_middleware_(context: Dict[str, Any]) -> int:
"""Fetch content from the web using the crawler strategy"""
url = context.get("url")
config = context.get("config")
crawler_strategy = context.get("crawler_strategy")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
try:
t1 = time.perf_counter()
if config.user_agent:
crawler_strategy.update_user_agent(config.user_agent)
# Call CrawlerStrategy.crawl
async_response = await crawler_strategy.crawl(url, config=config)
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
pdf_data = async_response.pdf_data
js_execution_result = async_response.js_execution_result
t2 = time.perf_counter()
logger.url_status(
url=context["cache_context"].display_url,
success=bool(html),
timing=t2 - t1,
tag="FETCH",
)
context["html"] = html
context["screenshot_data"] = screenshot_data
context["pdf_data"] = pdf_data
context["js_execution_result"] = js_execution_result
context["async_response"] = async_response
return 1
except Exception as e:
context["error_message"] = f"Error fetching content: {str(e)}"
return 0
async def scrape_content_middleware(context: Dict[str, Any]) -> int:
"""Apply scraping strategy to extract content"""
url = context.get("url")
html = context.get("html")
config = context.get("config")
extracted_content = context.get("extracted_content")
logger = context.get("logger")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
try:
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
t1 = time.perf_counter()
# Get scraping strategy and ensure it has a logger
scraping_strategy = config.scraping_strategy
if not scraping_strategy.logger:
scraping_strategy.logger = logger
# Process HTML content
params = config.__dict__.copy()
params.pop("url", None)
# Add keys from kwargs to params that don't exist in params
kwargs = context.get("kwargs", {})
params.update({k: v for k, v in kwargs.items() if k not in params.keys()})
# Scraping Strategy Execution
result: ScrapingResult = scraping_strategy.scrap(url, html, **params)
if result is None:
raise ValueError(f"Process HTML, Failed to extract content from the website: {url}")
# Extract results - handle both dict and ScrapingResult
if isinstance(result, dict):
cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
media = result.get("media", {})
links = result.get("links", {})
metadata = result.get("metadata", {})
else:
cleaned_html = sanitize_input_encode(result.cleaned_html)
media = result.media.model_dump()
links = result.links.model_dump()
metadata = result.metadata
context["cleaned_html"] = cleaned_html
context["media"] = media
context["links"] = links
context["metadata"] = metadata
# Log processing completion
logger.info(
message="{url:.50}... | Time: {timing}s",
tag="SCRAPE",
params={
"url": _url,
"timing": int((time.perf_counter() - t1) * 1000) / 1000,
},
)
return 1
except InvalidCSSSelectorError as e:
context["error_message"] = str(e)
return 0
except Exception as e:
context["error_message"] = f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}"
return 0
async def generate_markdown_middleware(context: Dict[str, Any]) -> int:
"""Generate markdown from cleaned HTML"""
url = context.get("url")
cleaned_html = context.get("cleaned_html")
config = context.get("config")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
# Generate Markdown
markdown_generator = config.markdown_generator
markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown(
cleaned_html=cleaned_html,
base_url=url,
)
context["markdown_result"] = markdown_result
return 1
async def extract_structured_content_middleware(context: Dict[str, Any]) -> int:
"""Extract structured content using extraction strategy"""
url = context.get("url")
extracted_content = context.get("extracted_content")
config = context.get("config")
markdown_result = context.get("markdown_result")
cleaned_html = context.get("cleaned_html")
logger = context.get("logger")
# Skip if already have a crawl result or extracted content
if context.get("crawl_result") or bool(extracted_content):
return 1
from crawl4ai.chunking_strategy import IdentityChunking
from crawl4ai.extraction_strategy import NoExtractionStrategy
if config.extraction_strategy and not isinstance(config.extraction_strategy, NoExtractionStrategy):
t1 = time.perf_counter()
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
# Choose content based on input_format
content_format = config.extraction_strategy.input_format
if content_format == "fit_markdown" and not markdown_result.fit_markdown:
logger.warning(
message="Fit markdown requested but not available. Falling back to raw markdown.",
tag="EXTRACT",
params={"url": _url},
)
content_format = "markdown"
content = {
"markdown": markdown_result.raw_markdown,
"html": context.get("html"),
"cleaned_html": cleaned_html,
"fit_markdown": markdown_result.fit_markdown,
}.get(content_format, markdown_result.raw_markdown)
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = (
IdentityChunking()
if content_format in ["html", "cleaned_html"]
else config.chunking_strategy
)
sections = chunking.chunk(content)
extracted_content = config.extraction_strategy.run(url, sections)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)
context["extracted_content"] = extracted_content
# Log extraction completion
logger.info(
message="Completed for {url:.50}... | Time: {timing}s",
tag="EXTRACT",
params={"url": _url, "timing": time.perf_counter() - t1},
)
return 1
async def format_html_middleware(context: Dict[str, Any]) -> int:
"""Format HTML if prettify is enabled"""
config = context.get("config")
cleaned_html = context.get("cleaned_html")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
# Apply HTML formatting if requested
if config.prettiify and cleaned_html:
context["cleaned_html"] = fast_format_html(cleaned_html)
return 1
async def write_cache_middleware(context: Dict[str, Any]) -> int:
"""Write result to cache if appropriate"""
cache_context = context.get("cache_context")
cached_result = context.get("cached_result")
# Skip if already have a crawl result or not using cache
if context.get("crawl_result") or not cache_context.should_write() or bool(cached_result):
return 1
# We'll create the CrawlResult in build_result_middleware and cache it there
# to avoid creating it twice
return 1
async def build_result_middleware(context: Dict[str, Any]) -> int:
"""Build the final CrawlResult object"""
url = context.get("url")
html = context.get("html", "")
cache_context = context.get("cache_context")
cached_result = context.get("cached_result")
config = context.get("config")
logger = context.get("logger")
# If we already have a crawl result (from an earlier middleware like robots.txt check)
if context.get("crawl_result"):
result = context["crawl_result"]
context["final_result"] = CrawlResultContainer(result)
return 1
# If we have a cached result
if cached_result and html:
logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": True,
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
},
colors={"status": "green", "timing": "yellow"},
)
cached_result.success = bool(html)
cached_result.session_id = getattr(config, "session_id", None)
cached_result.redirected_url = cached_result.redirected_url or url
context["final_result"] = CrawlResultContainer(cached_result)
return 1
# Build a new result
try:
# Get all necessary components from context
cleaned_html = context.get("cleaned_html", "")
markdown_result = context.get("markdown_result")
media = context.get("media", {})
links = context.get("links", {})
metadata = context.get("metadata", {})
screenshot_data = context.get("screenshot_data")
pdf_data = context.get("pdf_data")
extracted_content = context.get("extracted_content")
async_response = context.get("async_response")
# Create the CrawlResult
crawl_result = CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown_result,
media=media,
links=links,
metadata=metadata,
screenshot=screenshot_data,
pdf=pdf_data,
extracted_content=extracted_content,
success=bool(html),
error_message="",
)
# Add response details if available
if async_response:
crawl_result.status_code = async_response.status_code
crawl_result.redirected_url = async_response.redirected_url or url
crawl_result.response_headers = async_response.response_headers
crawl_result.downloaded_files = async_response.downloaded_files
crawl_result.js_execution_result = context.get("js_execution_result")
crawl_result.ssl_certificate = async_response.ssl_certificate
crawl_result.session_id = getattr(config, "session_id", None)
# Log completion
logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
},
colors={
"status": "green" if crawl_result.success else "red",
"timing": "yellow",
},
)
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
await async_db_manager.acache_url(crawl_result)
context["final_result"] = CrawlResultContainer(crawl_result)
return 1
except Exception as e:
error_context = get_error_context(sys.exc_info())
error_message = (
f"Unexpected error in build_result at line {error_context['line_no']} "
f"in {error_context['function']} ({error_context['filename']}):\n"
f"Error: {str(e)}\n\n"
f"Code context:\n{error_context['code_context']}"
)
logger.error_status(
url=url,
error=create_box_message(error_message, type="error"),
tag="ERROR",
)
context["final_result"] = CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
)
return 1
async def handle_error_middleware(context: Dict[str, Any]) -> Dict[str, Any]:
"""Error handler middleware"""
url = context.get("url", "")
error_message = context.get("error_message", "Unknown error")
logger = context.get("logger")
# Log the error
if logger:
logger.error_status(
url=url,
error=create_box_message(error_message, type="error"),
tag="ERROR",
)
# Create a failure result
context["final_result"] = CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
)
return context
# Custom middlewares as requested
async def sentiment_analysis_middleware(context: Dict[str, Any]) -> int:
"""Analyze sentiment of generated markdown using TextBlob"""
from textblob import TextBlob
markdown_result = context.get("markdown_result")
# Skip if no markdown or already failed
if not markdown_result or not context.get("success", True):
return 1
try:
# Get raw markdown text
raw_markdown = markdown_result.raw_markdown
# Analyze sentiment
blob = TextBlob(raw_markdown)
sentiment = blob.sentiment
# Add sentiment to context
context["sentiment_analysis"] = {
"polarity": sentiment.polarity, # -1.0 to 1.0 (negative to positive)
"subjectivity": sentiment.subjectivity, # 0.0 to 1.0 (objective to subjective)
"classification": "positive" if sentiment.polarity > 0.1 else
"negative" if sentiment.polarity < -0.1 else "neutral"
}
return 1
except Exception as e:
# Don't fail the pipeline on sentiment analysis failure
context["sentiment_analysis_error"] = str(e)
return 1
async def log_timing_middleware(context: Dict[str, Any], name: str) -> int:
"""Log timing information for a specific point in the pipeline"""
context[f"_timing_mark_{name}"] = time.perf_counter()
# Calculate duration if we have a start time
start_key = f"_timing_start_{name}"
if start_key in context:
duration = context[f"_timing_mark_{name}"] - context[start_key]
context[f"_timing_duration_{name}"] = duration
# Log the timing if we have a logger
logger = context.get("logger")
if logger:
logger.info(
message="{name} completed in {duration:.2f}s",
tag="TIMING",
params={"name": name, "duration": duration},
)
return 1
async def validate_url_middleware(context: Dict[str, Any], patterns: List[str]) -> int:
"""Validate URL against glob patterns"""
import fnmatch
url = context.get("url", "")
# If no patterns provided, allow all
if not patterns:
return 1
# Check if URL matches any of the allowed patterns
for pattern in patterns:
if fnmatch.fnmatch(url, pattern):
return 1
# If we get here, URL didn't match any patterns
context["error_message"] = f"URL '{url}' does not match any allowed patterns"
return 0
# Update the default middleware list function
def create_default_middleware_list():
"""Return the default list of middleware functions for the pipeline."""
return [
initialize_context_middleware,
check_cache_middleware,
browser_hub_middleware, # Add browser hub middleware before fetch_content
configure_proxy_middleware,
check_robots_txt_middleware,
fetch_content_middleware,
scrape_content_middleware,
generate_markdown_middleware,
extract_structured_content_middleware,
format_html_middleware,
build_result_middleware
]

View File

@@ -0,0 +1,297 @@
import time
import asyncio
from typing import Callable, Dict, List, Any, Optional, Awaitable, Union, TypedDict, Tuple, Coroutine
from .middlewares import create_default_middleware_list, handle_error_middleware
from crawl4ai.models import CrawlResultContainer, CrawlResult
from crawl4ai.async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
class CrawlSpec(TypedDict, total=False):
"""Specification for a single crawl operation in batch_crawl."""
url: str
config: Optional[CrawlerRunConfig]
browser_config: Optional[BrowserConfig]
class BatchStatus(TypedDict, total=False):
"""Status information for batch crawl operations."""
total: int
processed: int
succeeded: int
failed: int
in_progress: int
duration: float
class Pipeline:
"""
A pipeline processor that executes a series of async middleware functions.
Each middleware function receives a context dictionary, updates it,
and returns 1 for success or 0 for failure.
"""
def __init__(
self,
middleware: List[Callable[[Dict[str, Any]], Awaitable[int]]] = None,
error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
crawler_strategy: Optional[AsyncCrawlerStrategy] = None,
browser_config: Optional[BrowserConfig] = None,
logger: Optional[AsyncLogger] = None,
_initial_context: Optional[Dict[str, Any]] = None
):
self.middleware = middleware or create_default_middleware_list()
self.error_handler = error_handler or handle_error_middleware
self.after_middleware_callback = after_middleware_callback
self.browser_config = browser_config or BrowserConfig()
self.logger = logger or AsyncLogger(verbose=self.browser_config.verbose)
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
browser_config=self.browser_config,
logger=self.logger
)
self._initial_context = _initial_context
self._strategy_initialized = False
async def _initialize_strategy__(self):
"""Initialize the crawler strategy if not already initialized"""
if not self.crawler_strategy:
self.crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=self.browser_config,
logger=self.logger
)
if not self._strategy_initialized:
await self.crawler_strategy.__aenter__()
self._strategy_initialized = True
async def _initialize_strategy(self):
"""Initialize the crawler strategy if not already initialized"""
# With our new approach, we don't need to create the crawler strategy here
# as it will be created on-demand in fetch_content_middleware
# Just ensure browser hub is available if needed
if hasattr(self, "_initial_context") and "browser_hub" not in self._initial_context:
# If a browser_config was provided but no browser_hub yet,
# we'll let the browser_hub_middleware handle creating it
pass
# Mark as initialized to prevent repeated initialization attempts
self._strategy_initialized = True
async def start(self):
"""Start the crawler strategy and prepare it for use"""
if not self._strategy_initialized:
await self._initialize_strategy()
self._strategy_initialized = True
if self.crawler_strategy:
await self.crawler_strategy.__aenter__()
self._strategy_initialized = True
else:
raise ValueError("Crawler strategy is not initialized.")
async def close(self):
"""Close the crawler strategy and clean up resources"""
await self.stop()
async def stop(self):
"""Close the crawler strategy and clean up resources"""
if self._strategy_initialized and self.crawler_strategy:
await self.crawler_strategy.__aexit__(None, None, None)
self._strategy_initialized = False
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def crawl(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs) -> CrawlResultContainer:
"""
Crawl a URL and process it through the pipeline.
Args:
url: The URL to crawl
config: Optional configuration for the crawl
**kwargs: Additional arguments to pass to the middleware
Returns:
CrawlResultContainer: The result of the crawl
"""
# Initialize strategy if needed
await self._initialize_strategy()
# Create the initial context
context = {
"url": url,
"config": config or CrawlerRunConfig(),
"browser_config": self.browser_config,
"logger": self.logger,
"crawler_strategy": self.crawler_strategy,
"kwargs": kwargs
}
# Process the pipeline
result_context = await self.process(context)
# Return the final result
return result_context.get("final_result")
async def process(self, initial_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Process all middleware functions with the given context.
Args:
initial_context: Initial context dictionary, defaults to empty dict
Returns:
Updated context dictionary after all middleware have been processed
"""
context = {**self._initial_context}
if initial_context:
context.update(initial_context)
# Record pipeline start time
context["_pipeline_start_time"] = time.perf_counter()
for middleware_fn in self.middleware:
# Get middleware name for logging
middleware_name = getattr(middleware_fn, '__name__', str(middleware_fn))
# Record start time for this middleware
start_time = time.perf_counter()
context[f"_timing_start_{middleware_name}"] = start_time
try:
# Execute middleware (all middleware functions are async)
result = await middleware_fn(context)
# Record completion time
end_time = time.perf_counter()
context[f"_timing_end_{middleware_name}"] = end_time
context[f"_timing_duration_{middleware_name}"] = end_time - start_time
# Execute after-middleware callback if provided
if self.after_middleware_callback:
await self.after_middleware_callback(middleware_name, context)
# Convert boolean returns to int (True->1, False->0)
if isinstance(result, bool):
result = 1 if result else 0
# Handle failure
if result == 0:
if self.error_handler:
context["_error_in"] = middleware_name
context["_error_at"] = time.perf_counter()
return await self._handle_error(context)
else:
context["success"] = False
context["error_message"] = f"Pipeline failed at {middleware_name}"
break
except Exception as e:
# Record error information
context["_error_in"] = middleware_name
context["_error_at"] = time.perf_counter()
context["_exception"] = e
context["success"] = False
context["error_message"] = f"Exception in {middleware_name}: {str(e)}"
# Call error handler if available
if self.error_handler:
return await self._handle_error(context)
break
# Record pipeline completion time
pipeline_end_time = time.perf_counter()
context["_pipeline_end_time"] = pipeline_end_time
context["_pipeline_duration"] = pipeline_end_time - context["_pipeline_start_time"]
# Set success to True if not already set (no failures)
if "success" not in context:
context["success"] = True
return context
async def _handle_error(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle errors by calling the error handler"""
try:
return await self.error_handler(context)
except Exception as e:
# If error handler fails, update context with this new error
context["_error_handler_exception"] = e
context["error_message"] = f"Error handler failed: {str(e)}"
return context
async def create_pipeline(
middleware_list=None,
error_handler=None,
after_middleware_callback=None,
browser_config=None,
browser_hub_id=None,
browser_hub_connection=None,
browser_hub=None,
logger=None
) -> Pipeline:
"""
Factory function to create a pipeline with Browser-Hub integration.
Args:
middleware_list: List of middleware functions
error_handler: Error handler middleware
after_middleware_callback: Callback after middleware execution
browser_config: Configuration for the browser
browser_hub_id: ID for browser hub instance
browser_hub_connection: Connection string for existing browser hub
browser_hub: Existing browser hub instance to use
logger: Logger instance
Returns:
Pipeline: Configured pipeline instance
"""
# Use default middleware list if none provided
middleware = middleware_list or create_default_middleware_list()
# Create the pipeline
pipeline = Pipeline(
middleware=middleware,
error_handler=error_handler,
after_middleware_callback=after_middleware_callback,
logger=logger
)
# Set browser-related attributes in the initial context
pipeline._initial_context = {
"browser_config": browser_config,
"browser_hub_id": browser_hub_id,
"browser_hub_connection": browser_hub_connection,
"browser_hub": browser_hub,
"logger": logger
}
return pipeline
# async def create_pipeline(
# middleware_list: Optional[List[Callable[[Dict[str, Any]], Awaitable[int]]]] = None,
# error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
# after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
# crawler_strategy = None,
# browser_config = None,
# logger = None
# ) -> Pipeline:
# """Factory function to create a pipeline with the given middleware"""
# return Pipeline(
# middleware=middleware_list,
# error_handler=error_handler,
# after_middleware_callback=after_middleware_callback,
# crawler_strategy=crawler_strategy,
# browser_config=browser_config,
# logger=logger
# )

View File

@@ -0,0 +1,109 @@
import asyncio
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
CacheMode,
DefaultMarkdownGenerator,
PruningContentFilter
)
from pipeline import Pipeline
async def main():
# Create configuration objects
browser_config = BrowserConfig(headless=True, verbose=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
)
# Create and use pipeline with context manager
async with Pipeline(browser_config=browser_config) as pipeline:
result = await pipeline.crawl(
url="https://www.example.com",
config=crawler_config
)
# Print the result
print(f"URL: {result.url}")
print(f"Success: {result.success}")
if result.success:
print("\nMarkdown excerpt:")
print(result.markdown.raw_markdown[:500] + "...")
else:
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
class CrawlTarget:
def __init__(self, urls, config=None):
self.urls = urls
self.config = config
def __repr__(self):
return f"CrawlTarget(urls={self.urls}, config={self.config})"
# async def main():
# # Create configuration objects
# browser_config = BrowserConfig(headless=True, verbose=True)
# # Define different configurations
# config1 = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
# markdown_generator=DefaultMarkdownGenerator(
# content_filter=PruningContentFilter(threshold=0.48)
# ),
# )
# config2 = CrawlerRunConfig(
# cache_mode=CacheMode.ENABLED,
# screenshot=True,
# pdf=True
# )
# # Create crawl targets
# targets = [
# CrawlTarget(
# urls=["https://www.example.com", "https://www.wikipedia.org"],
# config=config1
# ),
# CrawlTarget(
# urls="https://news.ycombinator.com",
# config=config2
# ),
# CrawlTarget(
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
# config=None
# )
# ]
# # Create and use pipeline with context manager
# async with Pipeline(browser_config=browser_config) as pipeline:
# all_results = await pipeline.crawl_batch(targets)
# for target_key, results in all_results.items():
# print(f"\n===== Results for {target_key} =====")
# print(f"Number of URLs crawled: {len(results)}")
# for i, result in enumerate(results):
# print(f"\nURL {i+1}: {result.url}")
# print(f"Success: {result.success}")
# if result.success:
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
# else:
# print(f"Error: {result.error_message}")
# if __name__ == "__main__":
# asyncio.run(main())

View File

@@ -0,0 +1,222 @@
# demo_browser_hub.py
import asyncio
from typing import List
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResultContainer
from crawl4ai.cache_context import CacheMode
from crawl4ai import DefaultMarkdownGenerator
from crawl4ai import PruningContentFilter
async def create_prewarmed_browser_hub(urls_to_crawl: List[str]):
"""Create a pre-warmed browser hub with 10 browsers and 5 pages each."""
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Setting up pre-warmed browser hub", tag="DEMO")
# Create browser configuration
browser_config = BrowserConfig(
browser_type="chromium",
headless=True, # Set to False to see the browsers in action
viewport_width=1280,
viewport_height=800,
light_mode=True, # Optimize for performance
java_script_enabled=True
)
# Create crawler configurations for pre-warming with different user agents
# This allows pages to be ready for different scenarios
crawler_configs = [
CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
wait_until="networkidle"
),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
# wait_until="networkidle"
# ),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
# wait_until="networkidle"
# )
]
# Number of browsers and pages per browser
num_browsers = 1
pages_per_browser = 1
# Distribute pages across configurations
# We'll create a total of 50 pages (10 browsers × 5 pages)
page_configs = []
total_pages = num_browsers * pages_per_browser
pages_per_config = total_pages // len(crawler_configs)
for i, config in enumerate(crawler_configs):
# For the last config, add any remaining pages
if i == len(crawler_configs) - 1:
remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1))
page_configs.append((browser_config, config, remaining))
else:
page_configs.append((browser_config, config, pages_per_config))
# Create browser hub with pre-warmed pages
start_time = asyncio.get_event_loop().time()
logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO")
hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="demo_hub",
logger=logger,
max_browsers_per_config=num_browsers,
max_pages_per_browser=pages_per_browser,
initial_pool_size=num_browsers,
page_configs=page_configs
)
end_time = asyncio.get_event_loop().time()
logger.success(
message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds",
tag="DEMO",
params={
"total_pages": total_pages,
"duration": end_time - start_time
}
)
# Get and display pool status
status = await hub.get_pool_status()
logger.info(
message="Browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
return hub
async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]:
"""Crawl a list of URLs using a pre-warmed browser hub."""
logger = AsyncLogger(verbose=True)
# Create crawler configuration
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
wait_until="networkidle",
screenshot=True
)
# Create pipeline with the browser hub
pipeline = await create_pipeline(
browser_hub=hub,
logger=logger
)
results = []
# Crawl all URLs in parallel
async def crawl_url(url):
logger.info(f"Crawling {url}...", tag="CRAWL")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="CRAWL")
return result
# Create tasks for all URLs
tasks = [crawl_url(url) for url in urls]
# Execute all tasks in parallel and collect results
results = await asyncio.gather(*tasks)
return results
async def main():
"""Main demo function."""
# List of URLs to crawl
urls_to_crawl = [
"https://example.com",
# "https://www.python.org",
# "https://httpbin.org/html",
# "https://news.ycombinator.com",
# "https://github.com",
# "https://pypi.org",
# "https://docs.python.org/3/",
# "https://opensource.org",
# "https://whatismyipaddress.com",
# "https://en.wikipedia.org/wiki/Web_scraping"
]
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Starting browser hub demo", tag="DEMO")
try:
# Create pre-warmed browser hub
hub = await create_prewarmed_browser_hub(urls_to_crawl)
# Use hub to crawl URLs
logger.info("Crawling URLs in parallel...", tag="DEMO")
start_time = asyncio.get_event_loop().time()
results = await crawl_urls_with_hub(hub, urls_to_crawl)
end_time = asyncio.get_event_loop().time()
# Display results
logger.success(
message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)",
tag="DEMO",
params={
"count": len(results),
"duration": end_time - start_time,
"avg": (end_time - start_time) / len(results)
}
)
# Print summary of results
logger.info("Crawl results summary:", tag="DEMO")
for i, result in enumerate(results):
logger.info(
message="{idx}. {url}: Success={success}, Content length={length}",
tag="RESULT",
params={
"idx": i+1,
"url": result.url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
if result.success and result.markdown and result.markdown.raw_markdown:
# Print a snippet of the markdown
markdown_snippet = result.markdown.raw_markdown[:150] + "..."
logger.info(
message=" Markdown: {snippet}",
tag="RESULT",
params={"snippet": markdown_snippet}
)
# Display final browser pool status
status = await hub.get_pool_status()
logger.info(
message="Final browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
finally:
# Clean up
logger.info("Shutting down browser hub...", tag="DEMO")
await BrowserHub.shutdown_all()
logger.success("Demo completed", tag="DEMO")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,505 @@
# extended_browser_hub_tests.py
import asyncio
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.cache_context import CacheMode
# Common test URLs
TEST_URLS = [
"https://example.com",
"https://example.com/page1",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpstat.us/200"
]
class TestResults:
"""Simple container for test results"""
def __init__(self, name: str):
self.name = name
self.results = []
self.start_time = None
self.end_time = None
self.errors = []
@property
def duration(self) -> float:
if self.start_time and self.end_time:
return self.end_time - self.start_time
return 0
@property
def success_rate(self) -> float:
if not self.results:
return 0
return sum(1 for r in self.results if r.success) / len(self.results) * 100
def log_summary(self, logger: AsyncLogger):
logger.info(f"=== Test: {self.name} ===", tag="SUMMARY")
logger.info(
message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}",
tag="SUMMARY",
params={
"duration": self.duration,
"success_rate": self.success_rate,
"count": len(self.results)
}
)
if self.errors:
logger.error(
message="Errors ({count}): {errors}",
tag="SUMMARY",
params={
"count": len(self.errors),
"errors": "; ".join(str(e) for e in self.errors)
}
)
# ======== TEST SCENARIO 1: Simple default configuration ========
async def test_default_configuration():
"""
Test Scenario 1: Simple default configuration
This tests the basic case where the user does not provide any specific
browser configuration, relying on default auto-setup.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Default Configuration")
try:
# Create pipeline with no browser config
pipeline = await create_pipeline(logger=logger)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Create basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with default configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 2: Detailed custom configuration ========
async def test_custom_configuration():
"""
Test Scenario 2: Detailed custom configuration
This tests the case where the user provides detailed browser configuration
to customize the browser behavior.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Custom Configuration")
try:
# Create custom browser config
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1920,
viewport_height=1080,
user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
light_mode=True,
ignore_https_errors=True,
extra_args=["--disable-extensions"]
)
# Create custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
page_timeout=30000,
screenshot=True,
pdf=False,
screenshot_wait_for=0.5,
wait_for_images=True,
scan_full_page=True,
scroll_delay=0.2,
process_iframes=True,
remove_overlay_elements=True
)
# Create pipeline with custom configuration
pipeline = await create_pipeline(
browser_config=browser_config,
logger=logger
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with custom configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
has_screenshot = result.screenshot is not None
logger.success(
message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"screenshot": has_screenshot,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Get browser hub status from context
try:
# Run a dummy crawl to get the context with browser hub
context = await pipeline.process({"url": "about:blank", "config": crawler_config})
browser_hub = context.get("browser_hub")
if browser_hub:
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST")
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 3: Using pre-initialized browser hub ========
async def test_preinitalized_browser_hub():
"""
Test Scenario 3: Using pre-initialized browser hub
This tests the case where a browser hub is initialized separately
and then passed to the pipeline.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Pre-initialized Browser Hub")
browser_hub = None
try:
# Create and initialize browser hub separately
logger.info("Initializing browser hub separately", tag="TEST")
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
verbose=True
)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="test_preinitalized",
logger=logger,
max_browsers_per_config=2,
max_pages_per_browser=3,
initial_pool_size=2
)
# Display initial status
status = await browser_hub.get_pool_status()
logger.info(
message="Initial browser hub status: {status}",
tag="TEST",
params={"status": status}
)
# Create pipeline with pre-initialized browser hub
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
screenshot=True
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process URLs in parallel
async def crawl_url(url):
try:
logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="TEST")
return result
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
return None
# Create tasks for all URLs
tasks = [crawl_url(url) for url in TEST_URLS]
# Execute all tasks in parallel and collect results
all_results = await asyncio.gather(*tasks)
results.results = [r for r in all_results if r is not None]
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display final status
status = await browser_hub.get_pool_status()
logger.info(
message="Final browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results, browser_hub
# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ========
async def test_parallel_pipelines():
"""
Test Scenario 4: Multiple parallel pipelines sharing browser hub
This tests the case where multiple pipelines share the same browser hub,
demonstrating resource sharing and parallel operation.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Parallel Pipelines")
# We'll reuse the browser hub from the previous test
_, browser_hub = await test_preinitalized_browser_hub()
try:
# Create 3 pipelines that all share the same browser hub
pipelines = []
for i in range(3):
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
pipelines.append(pipeline)
logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST")
# Create crawler configs with different settings
configs = [
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
CrawlerRunConfig(wait_until="load", scan_full_page=True)
]
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Function to process URLs with a specific pipeline
async def process_with_pipeline(pipeline_idx, urls):
pipeline_results = []
for url in urls:
try:
logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST")
result = await pipelines[pipeline_idx].crawl(
url=url,
config=configs[pipeline_idx]
)
pipeline_results.append(result)
logger.success(
message="Pipeline {idx} completed: url={url}, success={success}",
tag="TEST",
params={
"idx": pipeline_idx,
"url": url,
"success": result.success
}
)
except Exception as e:
logger.error(
message="Pipeline {idx} error: {error}",
tag="TEST",
params={
"idx": pipeline_idx,
"error": str(e)
}
)
results.errors.append(e)
return pipeline_results
# Distribute URLs among pipelines
pipeline_urls = [
TEST_URLS[:2],
TEST_URLS[2:4],
TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3
]
# Execute all pipelines in parallel
tasks = [
process_with_pipeline(i, urls)
for i, urls in enumerate(pipeline_urls)
]
pipeline_results = await asyncio.gather(*tasks)
# Flatten results
for res_list in pipeline_results:
results.results.extend(res_list)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display browser hub status
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status after parallel pipelines: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 5: Browser hub with connection string ========
async def test_connection_string():
"""
Test Scenario 5: Browser hub with connection string
This tests the case where a browser hub is initialized from a connection string,
simulating connecting to a running browser hub service.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Connection String")
try:
# Create pipeline with connection string
# Note: In a real implementation, this would connect to an existing service
# For this test, we're using a simulated connection
connection_string = "localhost:9222" # Simulated connection string
pipeline = await create_pipeline(
browser_hub_connection=connection_string,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle"
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Test with a single URL
url = TEST_URLS[0]
try:
logger.info(f"Crawling {url} with connection string hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== RUN ALL TESTS ========
async def run_all_tests():
"""Run all test scenarios"""
logger = AsyncLogger(verbose=True)
logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN")
try:
# Run each test scenario
await test_default_configuration()
# await test_custom_configuration()
# await test_preinitalized_browser_hub()
# await test_parallel_pipelines()
# await test_connection_string()
except Exception as e:
logger.error(f"Test suite failed: {str(e)}", tag="MAIN")
finally:
# Clean up all browser hubs
logger.info("Shutting down all browser hubs...", tag="MAIN")
await BrowserHub.shutdown_all()
logger.success("All tests completed", tag="MAIN")
if __name__ == "__main__":
asyncio.run(run_all_tests())

View File

@@ -0,0 +1,163 @@
"""Test the Crawler class for batch crawling capabilities."""
import asyncio
import pytest
from typing import List, Dict, Any, Optional, Tuple
from crawl4ai import Crawler
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResult, CrawlResultContainer
from crawl4ai.browser import BrowserHub
from crawl4ai.cache_context import CacheMode
# Test URLs for crawling
SAFE_URLS = [
"https://example.com",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpstat.us/200",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/comments/1",
"https://iana.org",
"https://www.python.org"
]
# Simple test for batch crawling
@pytest.mark.asyncio
async def test_batch_crawl_simple():
"""Test simple batch crawling with multiple URLs."""
# Use a few test URLs
urls = SAFE_URLS[:3]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Crawl multiple URLs using batch crawl
results = await Crawler.crawl(
urls,
crawler_config=crawler_config
)
# Verify the results
assert isinstance(results, dict)
assert len(results) == len(urls)
for url in urls:
assert url in results
assert results[url].success
assert results[url].html is not None
# Test parallel batch crawling
@pytest.mark.asyncio
async def test_parallel_batch_crawl():
"""Test parallel batch crawling with multiple URLs."""
# Use several URLs for parallel crawling
urls = SAFE_URLS[:5]
# Basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Crawl in parallel
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
crawler_config=crawler_config
)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
# Test batch crawling with different configurations
@pytest.mark.asyncio
async def test_batch_crawl_mixed_configs():
"""Test batch crawling with different configurations for different URLs."""
# Create URL batches with different configurations
batch1 = (SAFE_URLS[:2], CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False))
batch2 = (SAFE_URLS[2:4], CrawlerRunConfig(wait_until="networkidle", screenshot=True))
# Crawl with mixed configurations
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl([batch1, batch2])
end_time = asyncio.get_event_loop().time()
# Extract all URLs
all_urls = batch1[0] + batch2[0]
# Verify results
assert len(results) == len(all_urls)
# Check that screenshots are present only for batch2
for url in batch1[0]:
assert results[url].screenshot is None
for url in batch2[0]:
assert results[url].screenshot is not None
print(f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s")
# Test shared browser hub
@pytest.mark.asyncio
async def test_batch_crawl_shared_hub():
"""Test batch crawling with a shared browser hub."""
# Create and initialize a browser hub
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
max_browsers_per_config=3,
max_pages_per_browser=4,
initial_pool_size=1
)
try:
# Use the hub for parallel crawling
urls = SAFE_URLS[:3]
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
browser_hub=browser_hub,
crawler_config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
print(f"Success rate: {successful}/{len(urls)}")
# Get browser hub statistics
hub_stats = await browser_hub.get_pool_status()
print(f"Browser hub stats: {hub_stats}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
finally:
# Clean up the browser hub
await browser_hub.close()

View File

@@ -0,0 +1,447 @@
# test_crawler.py
import asyncio
import warnings
import pytest
import pytest_asyncio
from typing import Optional, Tuple
# Define test fixtures
@pytest_asyncio.fixture
async def clean_browser_hub():
"""Fixture to ensure clean browser hub state between tests."""
# Yield control to the test
yield
# After test, cleanup all browser hubs
from crawl4ai.browser import BrowserHub
try:
await BrowserHub.shutdown_all()
except Exception as e:
print(f"Error during browser cleanup: {e}")
from crawl4ai import Crawler
from crawl4ai import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResultContainer
from crawl4ai.browser import BrowserHub
from crawl4ai.cache_context import CacheMode
import warnings
from pydantic import PydanticDeprecatedSince20
# Test URLs for crawling
SAFE_URLS = [
"https://example.com",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpstat.us/200",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/comments/1",
"https://iana.org",
"https://www.python.org",
]
class TestCrawlerBasic:
"""Basic tests for the Crawler utility class"""
@pytest.mark.asyncio
async def test_simple_crawl_single_url(self, clean_browser_hub):
"""Test crawling a single URL with default configuration"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=Warning)
# Basic logger
logger = AsyncLogger(verbose=True)
# Basic single URL crawl with default configuration
url = "https://example.com"
result = await Crawler.crawl(url)
# Verify the result
assert isinstance(result, CrawlResultContainer)
assert result.success
assert result.url == url
assert result.html is not None
assert len(result.html) > 0
@pytest.mark.asyncio
async def test_crawl_with_custom_config(self, clean_browser_hub):
"""Test crawling with custom browser and crawler configuration"""
# Custom browser config
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1280,
viewport_height=800,
)
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="networkidle", screenshot=True
)
# Crawl with custom configuration
url = "https://httpbin.org/html"
result = await Crawler.crawl(
url, browser_config=browser_config, crawler_config=crawler_config
)
# Verify the result
assert result.success
assert result.url == url
assert result.screenshot is not None
@pytest.mark.asyncio
async def test_crawl_multiple_urls_sequential(self, clean_browser_hub):
"""Test crawling multiple URLs sequentially"""
# Use a few test URLs
urls = SAFE_URLS[:3]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Crawl multiple URLs sequentially
results = await Crawler.crawl(urls, crawler_config=crawler_config)
# Verify the results
assert isinstance(results, dict)
assert len(results) == len(urls)
for url in urls:
assert url in results
assert results[url].success
assert results[url].html is not None
@pytest.mark.asyncio
async def test_crawl_with_error_handling(self, clean_browser_hub):
"""Test error handling during crawling"""
# Include a valid URL and a non-existent URL
urls = ["https://example.com", "https://non-existent-domain-123456789.com"]
# Crawl with retries
results = await Crawler.crawl(urls, max_retries=2, retry_delay=1.0)
# Verify results for both URLs
assert len(results) == 2
# Valid URL should succeed
assert results[urls[0]].success
# Invalid URL should fail but be in results
assert urls[1] in results
assert not results[urls[1]].success
assert results[urls[1]].error_message is not None
class TestCrawlerParallel:
"""Tests for the parallel crawling capabilities of Crawler"""
@pytest.mark.asyncio
async def test_parallel_crawl_simple(self, clean_browser_hub):
"""Test basic parallel crawling with same configuration"""
# Use several URLs for parallel crawling
urls = SAFE_URLS[:5]
# Basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Crawl in parallel with default concurrency
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(urls, crawler_config=crawler_config)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
@pytest.mark.asyncio
async def test_parallel_crawl_with_concurrency_limit(self, clean_browser_hub):
"""Test parallel crawling with concurrency limit"""
# Use more URLs to test concurrency control
urls = SAFE_URLS[:8]
# Custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
)
# Limited concurrency
concurrency = 2
# Time the crawl
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls, crawler_config=crawler_config, concurrency=concurrency
)
end_time = asyncio.get_event_loop().time()
# Verify results
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Parallel crawl with concurrency={concurrency} of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# At least 80% should succeed
assert successful / len(urls) >= 0.8
@pytest.mark.asyncio
async def test_parallel_crawl_with_different_configs(self, clean_browser_hub):
"""Test parallel crawling with different configurations for different URLs"""
# Create URL batches with different configurations
batch1 = (
SAFE_URLS[:2],
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
)
batch2 = (
SAFE_URLS[2:4],
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
)
batch3 = (
SAFE_URLS[4:6],
CrawlerRunConfig(wait_until="load", scan_full_page=True),
)
# Crawl with mixed configurations
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl([batch1, batch2, batch3])
end_time = asyncio.get_event_loop().time()
# Extract all URLs
all_urls = batch1[0] + batch2[0] + batch3[0]
# Verify results
assert len(results) == len(all_urls)
# Check that screenshots are present only for batch2
for url in batch1[0]:
assert not results[url].screenshot
for url in batch2[0]:
assert results[url].screenshot
print(
f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s"
)
@pytest.mark.asyncio
async def test_parallel_crawl_with_shared_browser_hub(self, clean_browser_hub):
"""Test parallel crawling with a shared browser hub"""
# Create and initialize a browser hub
browser_config = BrowserConfig(browser_type="chromium", headless=True)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
max_browsers_per_config=3,
max_pages_per_browser=4,
initial_pool_size=1,
)
try:
# Use the hub for parallel crawling
urls = SAFE_URLS[:6]
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
urls,
browser_hub=browser_hub,
crawler_config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
),
)
end_time = asyncio.get_event_loop().time()
# Verify results
# assert (len(results), len(urls))
assert len(results) == len(urls)
successful = sum(1 for r in results.values() if r.success)
print(
f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(urls)}")
# Get browser hub statistics
hub_stats = await browser_hub.get_pool_status()
print(f"Browser hub stats: {hub_stats}")
# At least 80% should succeed
# assert (successful / len(urls), 0.8)
assert successful / len(urls) >= 0.8
finally:
# Clean up the browser hub
await browser_hub.close()
class TestCrawlerAdvanced:
"""Advanced tests for the Crawler utility class"""
@pytest.mark.asyncio
async def test_crawl_with_customized_batch_config(self, clean_browser_hub):
"""Test crawling with fully customized batch configuration"""
# Create URL batches with different browser and crawler configurations
browser_config1 = BrowserConfig(browser_type="chromium", headless=True)
browser_config2 = BrowserConfig(
browser_type="chromium", headless=False, viewport_width=1920
)
crawler_config1 = CrawlerRunConfig(wait_until="domcontentloaded")
crawler_config2 = CrawlerRunConfig(wait_until="networkidle", screenshot=True)
batch1 = (SAFE_URLS[:2], browser_config1, crawler_config1)
batch2 = (SAFE_URLS[2:4], browser_config2, crawler_config2)
# Crawl with mixed configurations
results = await Crawler.parallel_crawl([batch1, batch2])
# Extract all URLs
all_urls = batch1[0] + batch2[0]
# Verify results
# assert (len(results), len(all_urls))
assert len(results) == len(all_urls)
# Verify batch-specific processing
for url in batch1[0]:
assert results[url].screenshot is None # No screenshots for batch1
for url in batch2[0]:
assert results[url].screenshot is not None # Should have screenshots for batch2
@pytest.mark.asyncio
async def test_crawl_with_progress_callback(self, clean_browser_hub):
"""Test crawling with progress callback"""
# Use several URLs
urls = SAFE_URLS[:5]
# Track progress
progress_data = {"started": 0, "completed": 0, "failed": 0, "updates": []}
# Progress callback
async def on_progress(
status: str, url: str, result: Optional[CrawlResultContainer] = None
):
if status == "started":
progress_data["started"] += 1
elif status == "completed":
progress_data["completed"] += 1
if not result.success:
progress_data["failed"] += 1
progress_data["updates"].append((status, url))
print(f"Progress: {status} - {url}")
# Crawl with progress tracking
results = await Crawler.parallel_crawl(
urls,
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
progress_callback=on_progress,
)
# Verify progress tracking
assert progress_data["started"] == len(urls)
assert progress_data["completed"] == len(urls)
assert len(progress_data["updates"]) == len(urls) * 2 # start + complete events
@pytest.mark.asyncio
async def test_crawl_with_dynamic_retry_strategy(self, clean_browser_hub):
"""Test crawling with a dynamic retry strategy"""
# Include URLs that might fail
urls = [
"https://example.com",
"https://httpstat.us/500",
"https://httpstat.us/404",
]
# Custom retry strategy
async def retry_strategy(
url: str, attempt: int, error: Exception
) -> Tuple[bool, float]:
# Only retry 500 errors, not 404s
if "500" in url:
return True, 1.0 # Retry with 1 second delay
return False, 0.0 # Don't retry other errors
# Crawl with custom retry strategy
results = await Crawler.parallel_crawl(
urls,
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
retry_strategy=retry_strategy,
max_retries=3,
)
# Verify results
assert len(results) == len(urls)
# Example.com should succeed
assert results[urls[0]].success
# httpstat.us pages return content even for error status codes
# so our crawler marks them as successful since it got HTML content
# Verify that we got the expected status code
assert results[urls[1]].status_code == 500
# 404 should have the correct status code
assert results[urls[2]].status_code == 404
@pytest.mark.asyncio
async def test_crawl_with_very_large_batch(self, clean_browser_hub):
"""Test crawling with a very large batch of URLs"""
# Create a batch by repeating our safe URLs
# Note: In a real test, we'd use more URLs, but for simplicity we'll use a smaller set
large_batch = list(dict.fromkeys(SAFE_URLS[:5] * 2)) # ~10 unique URLs
# Set a reasonable concurrency limit
concurrency = 10
# Time the crawl
start_time = asyncio.get_event_loop().time()
results = await Crawler.parallel_crawl(
large_batch,
crawler_config=CrawlerRunConfig(
wait_until="domcontentloaded",
page_timeout=10000, # Shorter timeout for large batch
),
concurrency=concurrency,
)
end_time = asyncio.get_event_loop().time()
# Verify results
# assert (len(results), len(large_batch))
assert len(results) == len(large_batch)
successful = sum(1 for r in results.values() if r.success)
print(
f"Large batch crawl of {len(large_batch)} URLs completed in {end_time - start_time:.2f}s"
)
print(f"Success rate: {successful}/{len(large_batch)}")
print(
f"Average time per URL: {(end_time - start_time) / len(large_batch):.2f}s"
)
# At least 80% should succeed (from our unique URLs)
assert successful / len(results) >= 0.8
if __name__ == "__main__":
# Use pytest for async tests
pytest.main(["-xvs", __file__])

View File

@@ -0,0 +1,109 @@
import asyncio
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
CacheMode,
DefaultMarkdownGenerator,
PruningContentFilter
)
from pipeline import Pipeline
async def main():
# Create configuration objects
browser_config = BrowserConfig(headless=True, verbose=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
)
# Create and use pipeline with context manager
async with Pipeline(browser_config=browser_config) as pipeline:
result = await pipeline.crawl(
url="https://www.example.com",
config=crawler_config
)
# Print the result
print(f"URL: {result.url}")
print(f"Success: {result.success}")
if result.success:
print("\nMarkdown excerpt:")
print(result.markdown.raw_markdown[:500] + "...")
else:
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
class CrawlTarget:
def __init__(self, urls, config=None):
self.urls = urls
self.config = config
def __repr__(self):
return f"CrawlTarget(urls={self.urls}, config={self.config})"
# async def main():
# # Create configuration objects
# browser_config = BrowserConfig(headless=True, verbose=True)
# # Define different configurations
# config1 = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
# markdown_generator=DefaultMarkdownGenerator(
# content_filter=PruningContentFilter(threshold=0.48)
# ),
# )
# config2 = CrawlerRunConfig(
# cache_mode=CacheMode.ENABLED,
# screenshot=True,
# pdf=True
# )
# # Create crawl targets
# targets = [
# CrawlTarget(
# urls=["https://www.example.com", "https://www.wikipedia.org"],
# config=config1
# ),
# CrawlTarget(
# urls="https://news.ycombinator.com",
# config=config2
# ),
# CrawlTarget(
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
# config=None
# )
# ]
# # Create and use pipeline with context manager
# async with Pipeline(browser_config=browser_config) as pipeline:
# all_results = await pipeline.crawl_batch(targets)
# for target_key, results in all_results.items():
# print(f"\n===== Results for {target_key} =====")
# print(f"Number of URLs crawled: {len(results)}")
# for i, result in enumerate(results):
# print(f"\nURL {i+1}: {result.url}")
# print(f"Success: {result.success}")
# if result.success:
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
# else:
# print(f"Error: {result.error_message}")
# if __name__ == "__main__":
# asyncio.run(main())