Compare commits
3 Commits
feature/ag
...
next-2-bat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72d8e679ad | ||
|
|
67a790b4a6 | ||
|
|
d95b2dc9f2 |
@@ -4,6 +4,12 @@ import warnings
|
|||||||
from .async_webcrawler import AsyncWebCrawler, CacheMode
|
from .async_webcrawler import AsyncWebCrawler, CacheMode
|
||||||
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig
|
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig
|
||||||
|
|
||||||
|
from .pipeline.pipeline import (
|
||||||
|
Pipeline,
|
||||||
|
create_pipeline,
|
||||||
|
)
|
||||||
|
from .pipeline.crawler import Crawler
|
||||||
|
|
||||||
from .content_scraping_strategy import (
|
from .content_scraping_strategy import (
|
||||||
ContentScrapingStrategy,
|
ContentScrapingStrategy,
|
||||||
WebScrapingStrategy,
|
WebScrapingStrategy,
|
||||||
@@ -65,7 +71,14 @@ from .deep_crawling import (
|
|||||||
DeepCrawlDecorator,
|
DeepCrawlDecorator,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .async_crawler_strategy import AsyncPlaywrightCrawlerStrategy, AsyncHTTPCrawlerStrategy
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
"Pipeline",
|
||||||
|
"AsyncPlaywrightCrawlerStrategy",
|
||||||
|
"AsyncHTTPCrawlerStrategy",
|
||||||
|
"create_pipeline",
|
||||||
|
"Crawler",
|
||||||
"AsyncLoggerBase",
|
"AsyncLoggerBase",
|
||||||
"AsyncLogger",
|
"AsyncLogger",
|
||||||
"AsyncWebCrawler",
|
"AsyncWebCrawler",
|
||||||
|
|||||||
@@ -270,7 +270,7 @@ class BrowserConfig:
|
|||||||
host: str = "localhost",
|
host: str = "localhost",
|
||||||
):
|
):
|
||||||
self.browser_type = browser_type
|
self.browser_type = browser_type
|
||||||
self.headless = headless or True
|
self.headless = headless
|
||||||
self.browser_mode = browser_mode
|
self.browser_mode = browser_mode
|
||||||
self.use_managed_browser = use_managed_browser
|
self.use_managed_browser = use_managed_browser
|
||||||
self.cdp_url = cdp_url
|
self.cdp_url = cdp_url
|
||||||
|
|||||||
@@ -625,7 +625,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
|||||||
except Error:
|
except Error:
|
||||||
visibility_info = await self.check_visibility(page)
|
visibility_info = await self.check_visibility(page)
|
||||||
|
|
||||||
if self.config.verbose:
|
if config.verbose:
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
message="Body visibility info: {info}",
|
message="Body visibility info: {info}",
|
||||||
tag="DEBUG",
|
tag="DEBUG",
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from .profiles import BrowserProfileManager
|
|||||||
from .models import DockerConfig
|
from .models import DockerConfig
|
||||||
from .docker_registry import DockerRegistry
|
from .docker_registry import DockerRegistry
|
||||||
from .docker_utils import DockerUtils
|
from .docker_utils import DockerUtils
|
||||||
|
from .browser_hub import BrowserHub
|
||||||
from .strategies import (
|
from .strategies import (
|
||||||
BaseBrowserStrategy,
|
BaseBrowserStrategy,
|
||||||
PlaywrightBrowserStrategy,
|
PlaywrightBrowserStrategy,
|
||||||
@@ -19,4 +20,4 @@ from .strategies import (
|
|||||||
|
|
||||||
__all__ = ['BrowserManager', 'BrowserProfileManager', 'DockerConfig', 'DockerRegistry', 'DockerUtils', 'BaseBrowserStrategy',
|
__all__ = ['BrowserManager', 'BrowserProfileManager', 'DockerConfig', 'DockerRegistry', 'DockerUtils', 'BaseBrowserStrategy',
|
||||||
'PlaywrightBrowserStrategy', 'CDPBrowserStrategy', 'BuiltinBrowserStrategy',
|
'PlaywrightBrowserStrategy', 'CDPBrowserStrategy', 'BuiltinBrowserStrategy',
|
||||||
'DockerBrowserStrategy']
|
'DockerBrowserStrategy', 'BrowserHub']
|
||||||
@@ -672,7 +672,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
|
|||||||
block["error"] = False
|
block["error"] = False
|
||||||
except Exception:
|
except Exception:
|
||||||
parsed, unparsed = split_and_parse_json_objects(
|
parsed, unparsed = split_and_parse_json_objects(
|
||||||
response.choices[0].message.content
|
response
|
||||||
)
|
)
|
||||||
blocks = parsed
|
blocks = parsed
|
||||||
if unparsed:
|
if unparsed:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from pydantic import BaseModel, HttpUrl, PrivateAttr
|
from pydantic import BaseModel, HttpUrl, PrivateAttr, ConfigDict
|
||||||
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
|
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
|
||||||
from typing import AsyncGenerator
|
from typing import AsyncGenerator
|
||||||
from typing import Generic, TypeVar
|
from typing import Generic, TypeVar
|
||||||
@@ -146,8 +146,9 @@ class CrawlResult(BaseModel):
|
|||||||
dispatch_result: Optional[DispatchResult] = None
|
dispatch_result: Optional[DispatchResult] = None
|
||||||
redirected_url: Optional[str] = None
|
redirected_url: Optional[str] = None
|
||||||
|
|
||||||
class Config:
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||||
arbitrary_types_allowed = True
|
# class Config:
|
||||||
|
# arbitrary_types_allowed = True
|
||||||
|
|
||||||
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
|
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
|
||||||
# and model_dump override all exist to support a smooth transition from markdown as a string
|
# and model_dump override all exist to support a smooth transition from markdown as a string
|
||||||
@@ -312,8 +313,9 @@ class AsyncCrawlResponse(BaseModel):
|
|||||||
ssl_certificate: Optional[SSLCertificate] = None
|
ssl_certificate: Optional[SSLCertificate] = None
|
||||||
redirected_url: Optional[str] = None
|
redirected_url: Optional[str] = None
|
||||||
|
|
||||||
class Config:
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||||
arbitrary_types_allowed = True
|
# class Config:
|
||||||
|
# arbitrary_types_allowed = True
|
||||||
|
|
||||||
###############################
|
###############################
|
||||||
# Scraping Models
|
# Scraping Models
|
||||||
|
|||||||
6
crawl4ai/pipeline/__init__.py
Normal file
6
crawl4ai/pipeline/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Pipeline module providing high-level crawling functionality."""
|
||||||
|
|
||||||
|
from .pipeline import Pipeline, create_pipeline
|
||||||
|
from .crawler import Crawler
|
||||||
|
|
||||||
|
__all__ = ["Pipeline", "create_pipeline", "Crawler"]
|
||||||
406
crawl4ai/pipeline/crawler.py
Normal file
406
crawl4ai/pipeline/crawler.py
Normal file
@@ -0,0 +1,406 @@
|
|||||||
|
"""Crawler utility class for simplified crawling operations.
|
||||||
|
|
||||||
|
This module provides a high-level utility class for crawling web pages
|
||||||
|
with support for both single and multiple URL processing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from typing import Dict, List, Optional, Tuple, Union, Callable
|
||||||
|
|
||||||
|
from crawl4ai.models import CrawlResultContainer, CrawlResult
|
||||||
|
from crawl4ai.pipeline.pipeline import create_pipeline
|
||||||
|
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
from crawl4ai.browser.browser_hub import BrowserHub
|
||||||
|
|
||||||
|
# Type definitions
|
||||||
|
UrlList = List[str]
|
||||||
|
UrlBatch = Tuple[List[str], CrawlerRunConfig]
|
||||||
|
UrlFullBatch = Tuple[List[str], BrowserConfig, CrawlerRunConfig]
|
||||||
|
BatchType = Union[UrlList, UrlBatch, UrlFullBatch]
|
||||||
|
ProgressCallback = Callable[[str, str, Optional[CrawlResultContainer]], None]
|
||||||
|
RetryStrategy = Callable[[str, int, Exception], Tuple[bool, float]]
|
||||||
|
|
||||||
|
class Crawler:
|
||||||
|
"""High-level utility class for crawling web pages.
|
||||||
|
|
||||||
|
This class provides simplified methods for crawling both single URLs
|
||||||
|
and batches of URLs, with parallel processing capabilities.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def crawl(
|
||||||
|
cls,
|
||||||
|
urls: Union[str, List[str]],
|
||||||
|
browser_config: Optional[BrowserConfig] = None,
|
||||||
|
crawler_config: Optional[CrawlerRunConfig] = None,
|
||||||
|
browser_hub: Optional[BrowserHub] = None,
|
||||||
|
logger: Optional[AsyncLogger] = None,
|
||||||
|
max_retries: int = 0,
|
||||||
|
retry_delay: float = 1.0,
|
||||||
|
use_new_loop: bool = True # By default use a new loop for safety
|
||||||
|
) -> Union[CrawlResultContainer, Dict[str, CrawlResultContainer]]:
|
||||||
|
"""Crawl one or more URLs with the specified configurations.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
urls: Single URL or list of URLs to crawl
|
||||||
|
browser_config: Optional browser configuration
|
||||||
|
crawler_config: Optional crawler run configuration
|
||||||
|
browser_hub: Optional shared browser hub
|
||||||
|
logger: Optional logger instance
|
||||||
|
max_retries: Maximum number of retries for failed requests
|
||||||
|
retry_delay: Delay between retries in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
For a single URL: CrawlResultContainer with crawl results
|
||||||
|
For multiple URLs: Dict mapping URLs to their CrawlResultContainer results
|
||||||
|
"""
|
||||||
|
# Handle single URL case
|
||||||
|
if isinstance(urls, str):
|
||||||
|
return await cls._crawl_single_url(
|
||||||
|
urls,
|
||||||
|
browser_config,
|
||||||
|
crawler_config,
|
||||||
|
browser_hub,
|
||||||
|
logger,
|
||||||
|
max_retries,
|
||||||
|
retry_delay,
|
||||||
|
use_new_loop
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle multiple URLs case (sequential processing)
|
||||||
|
results = {}
|
||||||
|
for url in urls:
|
||||||
|
results[url] = await cls._crawl_single_url(
|
||||||
|
url,
|
||||||
|
browser_config,
|
||||||
|
crawler_config,
|
||||||
|
browser_hub,
|
||||||
|
logger,
|
||||||
|
max_retries,
|
||||||
|
retry_delay,
|
||||||
|
use_new_loop
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def _crawl_single_url(
|
||||||
|
cls,
|
||||||
|
url: str,
|
||||||
|
browser_config: Optional[BrowserConfig] = None,
|
||||||
|
crawler_config: Optional[CrawlerRunConfig] = None,
|
||||||
|
browser_hub: Optional[BrowserHub] = None,
|
||||||
|
logger: Optional[AsyncLogger] = None,
|
||||||
|
max_retries: int = 0,
|
||||||
|
retry_delay: float = 1.0,
|
||||||
|
use_new_loop: bool = False
|
||||||
|
) -> CrawlResultContainer:
|
||||||
|
"""Internal method to crawl a single URL with retry logic."""
|
||||||
|
# Create a logger if none provided
|
||||||
|
if logger is None:
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
|
||||||
|
# Create or use the provided crawler config
|
||||||
|
if crawler_config is None:
|
||||||
|
crawler_config = CrawlerRunConfig()
|
||||||
|
|
||||||
|
attempts = 0
|
||||||
|
last_error = None
|
||||||
|
|
||||||
|
# For testing purposes, each crawler gets a new event loop to avoid conflicts
|
||||||
|
# This is especially important in test suites where multiple tests run in sequence
|
||||||
|
if use_new_loop:
|
||||||
|
old_loop = asyncio.get_event_loop()
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
while attempts <= max_retries:
|
||||||
|
try:
|
||||||
|
# Create a pipeline
|
||||||
|
pipeline_args = {}
|
||||||
|
if browser_config:
|
||||||
|
pipeline_args["browser_config"] = browser_config
|
||||||
|
if browser_hub:
|
||||||
|
pipeline_args["browser_hub"] = browser_hub
|
||||||
|
if logger:
|
||||||
|
pipeline_args["logger"] = logger
|
||||||
|
|
||||||
|
pipeline = await create_pipeline(**pipeline_args)
|
||||||
|
|
||||||
|
# Perform the crawl
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
|
||||||
|
# Close the pipeline if we created it (not using a shared hub)
|
||||||
|
if not browser_hub:
|
||||||
|
await pipeline.close()
|
||||||
|
|
||||||
|
# Restore the original event loop if we created a new one
|
||||||
|
if use_new_loop:
|
||||||
|
asyncio.set_event_loop(old_loop)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
last_error = e
|
||||||
|
attempts += 1
|
||||||
|
|
||||||
|
if attempts <= max_retries:
|
||||||
|
logger.warning(
|
||||||
|
message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...",
|
||||||
|
tag="RETRY",
|
||||||
|
params={
|
||||||
|
"attempt": attempts,
|
||||||
|
"url": url,
|
||||||
|
"error": str(e),
|
||||||
|
"delay": retry_delay
|
||||||
|
}
|
||||||
|
)
|
||||||
|
await asyncio.sleep(retry_delay)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
message="All {attempts} crawl attempts failed for {url}: {error}",
|
||||||
|
tag="FAILED",
|
||||||
|
params={
|
||||||
|
"attempts": attempts,
|
||||||
|
"url": url,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we get here, all attempts failed
|
||||||
|
result = CrawlResultContainer(
|
||||||
|
CrawlResult(
|
||||||
|
url=url,
|
||||||
|
html="",
|
||||||
|
success=False,
|
||||||
|
error_message=f"All {attempts} crawl attempts failed: {str(last_error)}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Restore the original event loop if we created a new one
|
||||||
|
if use_new_loop:
|
||||||
|
asyncio.set_event_loop(old_loop)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def parallel_crawl(
|
||||||
|
cls,
|
||||||
|
url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]],
|
||||||
|
browser_config: Optional[BrowserConfig] = None,
|
||||||
|
crawler_config: Optional[CrawlerRunConfig] = None,
|
||||||
|
browser_hub: Optional[BrowserHub] = None,
|
||||||
|
logger: Optional[AsyncLogger] = None,
|
||||||
|
concurrency: int = 5,
|
||||||
|
max_retries: int = 0,
|
||||||
|
retry_delay: float = 1.0,
|
||||||
|
retry_strategy: Optional[RetryStrategy] = None,
|
||||||
|
progress_callback: Optional[ProgressCallback] = None,
|
||||||
|
use_new_loop: bool = True # By default use a new loop for safety
|
||||||
|
) -> Dict[str, CrawlResultContainer]:
|
||||||
|
"""Crawl multiple URLs in parallel with concurrency control.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url_batches: List of URLs or list of URL batches with configurations
|
||||||
|
browser_config: Default browser configuration (used if not in batch)
|
||||||
|
crawler_config: Default crawler configuration (used if not in batch)
|
||||||
|
browser_hub: Optional shared browser hub for resource efficiency
|
||||||
|
logger: Optional logger instance
|
||||||
|
concurrency: Maximum number of concurrent crawls
|
||||||
|
max_retries: Maximum number of retries for failed requests
|
||||||
|
retry_delay: Delay between retries in seconds
|
||||||
|
retry_strategy: Optional custom retry strategy function
|
||||||
|
progress_callback: Optional callback for progress reporting
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict mapping URLs to their CrawlResultContainer results
|
||||||
|
"""
|
||||||
|
# Create a logger if none provided
|
||||||
|
if logger is None:
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
|
||||||
|
# For testing purposes, each crawler gets a new event loop to avoid conflicts
|
||||||
|
# This is especially important in test suites where multiple tests run in sequence
|
||||||
|
if use_new_loop:
|
||||||
|
old_loop = asyncio.get_event_loop()
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
# Process batches to consistent format
|
||||||
|
processed_batches = cls._process_url_batches(
|
||||||
|
url_batches, browser_config, crawler_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize results dictionary
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
# Create semaphore for concurrency control
|
||||||
|
semaphore = asyncio.Semaphore(concurrency)
|
||||||
|
|
||||||
|
# Create shared browser hub if not provided
|
||||||
|
shared_hub = browser_hub
|
||||||
|
if not shared_hub:
|
||||||
|
shared_hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config or BrowserConfig(),
|
||||||
|
logger=logger,
|
||||||
|
max_browsers_per_config=concurrency,
|
||||||
|
max_pages_per_browser=1,
|
||||||
|
initial_pool_size=min(concurrency, 3) # Start with a reasonable number
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create worker function for each URL
|
||||||
|
async def process_url(url, b_config, c_config):
|
||||||
|
async with semaphore:
|
||||||
|
# Report start if callback provided
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback("started", url)
|
||||||
|
|
||||||
|
attempts = 0
|
||||||
|
last_error = None
|
||||||
|
|
||||||
|
while attempts <= max_retries:
|
||||||
|
try:
|
||||||
|
# Create a pipeline using the shared hub
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_config=b_config,
|
||||||
|
browser_hub=shared_hub,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Perform the crawl
|
||||||
|
result = await pipeline.crawl(url=url, config=c_config)
|
||||||
|
|
||||||
|
# Report completion if callback provided
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback("completed", url, result)
|
||||||
|
|
||||||
|
return url, result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
last_error = e
|
||||||
|
attempts += 1
|
||||||
|
|
||||||
|
# Determine if we should retry and with what delay
|
||||||
|
should_retry = attempts <= max_retries
|
||||||
|
delay = retry_delay
|
||||||
|
|
||||||
|
# Use custom retry strategy if provided
|
||||||
|
if retry_strategy and should_retry:
|
||||||
|
try:
|
||||||
|
should_retry, delay = await retry_strategy(url, attempts, e)
|
||||||
|
except Exception as strategy_error:
|
||||||
|
logger.error(
|
||||||
|
message="Error in retry strategy: {error}",
|
||||||
|
tag="RETRY",
|
||||||
|
params={"error": str(strategy_error)}
|
||||||
|
)
|
||||||
|
|
||||||
|
if should_retry:
|
||||||
|
logger.warning(
|
||||||
|
message="Crawl attempt {attempt} failed for {url}: {error}. Retrying in {delay}s...",
|
||||||
|
tag="RETRY",
|
||||||
|
params={
|
||||||
|
"attempt": attempts,
|
||||||
|
"url": url,
|
||||||
|
"error": str(e),
|
||||||
|
"delay": delay
|
||||||
|
}
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
message="All {attempts} crawl attempts failed for {url}: {error}",
|
||||||
|
tag="FAILED",
|
||||||
|
params={
|
||||||
|
"attempts": attempts,
|
||||||
|
"url": url,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
# If we get here, all attempts failed
|
||||||
|
error_result = CrawlResultContainer(
|
||||||
|
CrawlResult(
|
||||||
|
url=url,
|
||||||
|
html="",
|
||||||
|
success=False,
|
||||||
|
error_message=f"All {attempts} crawl attempts failed: {str(last_error)}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Report completion with error if callback provided
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback("completed", url, error_result)
|
||||||
|
|
||||||
|
return url, error_result
|
||||||
|
|
||||||
|
# Create tasks for all URLs
|
||||||
|
tasks = []
|
||||||
|
for urls, b_config, c_config in processed_batches:
|
||||||
|
for url in urls:
|
||||||
|
tasks.append(process_url(url, b_config, c_config))
|
||||||
|
|
||||||
|
# Run all tasks and collect results
|
||||||
|
for completed_task in asyncio.as_completed(tasks):
|
||||||
|
url, result = await completed_task
|
||||||
|
results[url] = result
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up the hub only if we created it
|
||||||
|
if not browser_hub and shared_hub:
|
||||||
|
await shared_hub.close()
|
||||||
|
|
||||||
|
# Restore the original event loop if we created a new one
|
||||||
|
if use_new_loop:
|
||||||
|
asyncio.set_event_loop(old_loop)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _process_url_batches(
|
||||||
|
cls,
|
||||||
|
url_batches: Union[List[str], List[Union[UrlBatch, UrlFullBatch]]],
|
||||||
|
default_browser_config: Optional[BrowserConfig],
|
||||||
|
default_crawler_config: Optional[CrawlerRunConfig]
|
||||||
|
) -> List[Tuple[List[str], BrowserConfig, CrawlerRunConfig]]:
|
||||||
|
"""Process URL batches into a consistent format.
|
||||||
|
|
||||||
|
Converts various input formats into a consistent list of
|
||||||
|
(urls, browser_config, crawler_config) tuples.
|
||||||
|
"""
|
||||||
|
processed_batches = []
|
||||||
|
|
||||||
|
# Handle case where input is just a list of URLs
|
||||||
|
if all(isinstance(item, str) for item in url_batches):
|
||||||
|
urls = url_batches
|
||||||
|
browser_config = default_browser_config or BrowserConfig()
|
||||||
|
crawler_config = default_crawler_config or CrawlerRunConfig()
|
||||||
|
processed_batches.append((urls, browser_config, crawler_config))
|
||||||
|
return processed_batches
|
||||||
|
|
||||||
|
# Process each batch
|
||||||
|
for batch in url_batches:
|
||||||
|
# Handle case: (urls, crawler_config)
|
||||||
|
if len(batch) == 2 and isinstance(batch[1], CrawlerRunConfig):
|
||||||
|
urls, c_config = batch
|
||||||
|
b_config = default_browser_config or BrowserConfig()
|
||||||
|
processed_batches.append((urls, b_config, c_config))
|
||||||
|
|
||||||
|
# Handle case: (urls, browser_config, crawler_config)
|
||||||
|
elif len(batch) == 3 and isinstance(batch[1], BrowserConfig) and isinstance(batch[2], CrawlerRunConfig):
|
||||||
|
processed_batches.append(batch)
|
||||||
|
|
||||||
|
# Fallback for unknown formats - assume it's just a list of URLs
|
||||||
|
else:
|
||||||
|
urls = batch
|
||||||
|
browser_config = default_browser_config or BrowserConfig()
|
||||||
|
crawler_config = default_crawler_config or CrawlerRunConfig()
|
||||||
|
processed_batches.append((urls, browser_config, crawler_config))
|
||||||
|
|
||||||
|
return processed_batches
|
||||||
702
crawl4ai/pipeline/middlewares.py
Normal file
702
crawl4ai/pipeline/middlewares.py
Normal file
@@ -0,0 +1,702 @@
|
|||||||
|
import time
|
||||||
|
import sys
|
||||||
|
from typing import Dict, Any, List
|
||||||
|
import json
|
||||||
|
|
||||||
|
from crawl4ai.models import (
|
||||||
|
CrawlResult,
|
||||||
|
MarkdownGenerationResult,
|
||||||
|
ScrapingResult,
|
||||||
|
CrawlResultContainer,
|
||||||
|
)
|
||||||
|
from crawl4ai.async_database import async_db_manager
|
||||||
|
from crawl4ai.cache_context import CacheMode, CacheContext
|
||||||
|
from crawl4ai.utils import (
|
||||||
|
sanitize_input_encode,
|
||||||
|
InvalidCSSSelectorError,
|
||||||
|
fast_format_html,
|
||||||
|
create_box_message,
|
||||||
|
get_error_context,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def initialize_context_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Initialize the context with basic configuration and validation"""
|
||||||
|
url = context.get("url")
|
||||||
|
config = context.get("config")
|
||||||
|
|
||||||
|
if not isinstance(url, str) or not url:
|
||||||
|
context["error_message"] = "Invalid URL, make sure the URL is a non-empty string"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Default to ENABLED if no cache mode specified
|
||||||
|
if config.cache_mode is None:
|
||||||
|
config.cache_mode = CacheMode.ENABLED
|
||||||
|
|
||||||
|
# Create cache context
|
||||||
|
context["cache_context"] = CacheContext(url, config.cache_mode, False)
|
||||||
|
context["start_time"] = time.perf_counter()
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# middlewares.py additions
|
||||||
|
|
||||||
|
async def browser_hub_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""
|
||||||
|
Initialize or connect to a Browser-Hub and add it to the pipeline context.
|
||||||
|
|
||||||
|
This middleware handles browser hub initialization for all three scenarios:
|
||||||
|
1. Default configuration when nothing is specified
|
||||||
|
2. Custom configuration when browser_config is provided
|
||||||
|
3. Connection to existing hub when browser_hub_connection is provided
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context: The pipeline context dictionary
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 1 for success, 0 for failure
|
||||||
|
"""
|
||||||
|
from crawl4ai.browser.browser_hub import BrowserHub
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get configuration from context
|
||||||
|
browser_config = context.get("browser_config")
|
||||||
|
browser_hub_id = context.get("browser_hub_id")
|
||||||
|
browser_hub_connection = context.get("browser_hub_connection")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# If we already have a browser hub in context, use it
|
||||||
|
if context.get("browser_hub"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Get or create Browser-Hub
|
||||||
|
browser_hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config,
|
||||||
|
hub_id=browser_hub_id,
|
||||||
|
connection_info=browser_hub_connection,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add to context
|
||||||
|
context["browser_hub"] = browser_hub
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
context["error_message"] = f"Failed to initialize browser hub: {str(e)}"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_content_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""
|
||||||
|
Fetch content from the web using the browser hub.
|
||||||
|
|
||||||
|
This middleware uses the browser hub to get pages for crawling,
|
||||||
|
and properly releases them back to the pool when done.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context: The pipeline context dictionary
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 1 for success, 0 for failure
|
||||||
|
"""
|
||||||
|
url = context.get("url")
|
||||||
|
config = context.get("config")
|
||||||
|
browser_hub = context.get("browser_hub")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Skip if using cached result
|
||||||
|
if context.get("cached_result") and context.get("html"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create crawler strategy without initializing its browser manager
|
||||||
|
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||||
|
|
||||||
|
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
|
||||||
|
browser_config=browser_hub.config if browser_hub else None,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Replace the browser manager with our shared instance
|
||||||
|
crawler_strategy.browser_manager = browser_hub
|
||||||
|
|
||||||
|
# Perform crawl without trying to initialize the browser
|
||||||
|
# The crawler will use the provided browser_manager to get pages
|
||||||
|
async_response = await crawler_strategy.crawl(url, config=config)
|
||||||
|
|
||||||
|
# Store results in context
|
||||||
|
context["html"] = async_response.html
|
||||||
|
context["screenshot_data"] = async_response.screenshot
|
||||||
|
context["pdf_data"] = async_response.pdf_data
|
||||||
|
context["js_execution_result"] = async_response.js_execution_result
|
||||||
|
context["async_response"] = async_response
|
||||||
|
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
context["error_message"] = f"Error fetching content: {str(e)}"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def check_cache_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Check if there's a cached result and load it if available"""
|
||||||
|
url = context.get("url")
|
||||||
|
config = context.get("config")
|
||||||
|
cache_context = context.get("cache_context")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Initialize variables
|
||||||
|
context["cached_result"] = None
|
||||||
|
context["html"] = None
|
||||||
|
context["extracted_content"] = None
|
||||||
|
context["screenshot_data"] = None
|
||||||
|
context["pdf_data"] = None
|
||||||
|
|
||||||
|
# Try to get cached result if appropriate
|
||||||
|
if cache_context.should_read():
|
||||||
|
cached_result = await async_db_manager.aget_cached_url(url)
|
||||||
|
context["cached_result"] = cached_result
|
||||||
|
|
||||||
|
if cached_result:
|
||||||
|
html = sanitize_input_encode(cached_result.html)
|
||||||
|
extracted_content = sanitize_input_encode(cached_result.extracted_content or "")
|
||||||
|
extracted_content = None if not extracted_content or extracted_content == "[]" else extracted_content
|
||||||
|
|
||||||
|
# If screenshot is requested but its not in cache, then set cache_result to None
|
||||||
|
screenshot_data = cached_result.screenshot
|
||||||
|
pdf_data = cached_result.pdf
|
||||||
|
|
||||||
|
if config.screenshot and not screenshot_data:
|
||||||
|
context["cached_result"] = None
|
||||||
|
|
||||||
|
if config.pdf and not pdf_data:
|
||||||
|
context["cached_result"] = None
|
||||||
|
|
||||||
|
context["html"] = html
|
||||||
|
context["extracted_content"] = extracted_content
|
||||||
|
context["screenshot_data"] = screenshot_data
|
||||||
|
context["pdf_data"] = pdf_data
|
||||||
|
|
||||||
|
logger.url_status(
|
||||||
|
url=cache_context.display_url,
|
||||||
|
success=bool(html),
|
||||||
|
timing=time.perf_counter() - context["start_time"],
|
||||||
|
tag="FETCH",
|
||||||
|
)
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def configure_proxy_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Configure proxy if a proxy rotation strategy is available"""
|
||||||
|
config = context.get("config")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Skip if using cached result
|
||||||
|
if context.get("cached_result") and context.get("html"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Update proxy configuration from rotation strategy if available
|
||||||
|
if config and config.proxy_rotation_strategy:
|
||||||
|
next_proxy = await config.proxy_rotation_strategy.get_next_proxy()
|
||||||
|
if next_proxy:
|
||||||
|
logger.info(
|
||||||
|
message="Switch proxy: {proxy}",
|
||||||
|
tag="PROXY",
|
||||||
|
params={"proxy": next_proxy.server},
|
||||||
|
)
|
||||||
|
config.proxy_config = next_proxy
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def check_robots_txt_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Check if the URL is allowed by robots.txt if enabled"""
|
||||||
|
url = context.get("url")
|
||||||
|
config = context.get("config")
|
||||||
|
browser_config = context.get("browser_config")
|
||||||
|
robots_parser = context.get("robots_parser")
|
||||||
|
|
||||||
|
# Skip if using cached result
|
||||||
|
if context.get("cached_result") and context.get("html"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Check robots.txt if enabled
|
||||||
|
if config and config.check_robots_txt:
|
||||||
|
if not await robots_parser.can_fetch(url, browser_config.user_agent):
|
||||||
|
context["crawl_result"] = CrawlResult(
|
||||||
|
url=url,
|
||||||
|
html="",
|
||||||
|
success=False,
|
||||||
|
status_code=403,
|
||||||
|
error_message="Access denied by robots.txt",
|
||||||
|
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
|
||||||
|
)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_content_middleware_(context: Dict[str, Any]) -> int:
|
||||||
|
"""Fetch content from the web using the crawler strategy"""
|
||||||
|
url = context.get("url")
|
||||||
|
config = context.get("config")
|
||||||
|
crawler_strategy = context.get("crawler_strategy")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Skip if using cached result
|
||||||
|
if context.get("cached_result") and context.get("html"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
t1 = time.perf_counter()
|
||||||
|
|
||||||
|
if config.user_agent:
|
||||||
|
crawler_strategy.update_user_agent(config.user_agent)
|
||||||
|
|
||||||
|
# Call CrawlerStrategy.crawl
|
||||||
|
async_response = await crawler_strategy.crawl(url, config=config)
|
||||||
|
|
||||||
|
html = sanitize_input_encode(async_response.html)
|
||||||
|
screenshot_data = async_response.screenshot
|
||||||
|
pdf_data = async_response.pdf_data
|
||||||
|
js_execution_result = async_response.js_execution_result
|
||||||
|
|
||||||
|
t2 = time.perf_counter()
|
||||||
|
logger.url_status(
|
||||||
|
url=context["cache_context"].display_url,
|
||||||
|
success=bool(html),
|
||||||
|
timing=t2 - t1,
|
||||||
|
tag="FETCH",
|
||||||
|
)
|
||||||
|
|
||||||
|
context["html"] = html
|
||||||
|
context["screenshot_data"] = screenshot_data
|
||||||
|
context["pdf_data"] = pdf_data
|
||||||
|
context["js_execution_result"] = js_execution_result
|
||||||
|
context["async_response"] = async_response
|
||||||
|
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
context["error_message"] = f"Error fetching content: {str(e)}"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def scrape_content_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Apply scraping strategy to extract content"""
|
||||||
|
url = context.get("url")
|
||||||
|
html = context.get("html")
|
||||||
|
config = context.get("config")
|
||||||
|
extracted_content = context.get("extracted_content")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Skip if already have a crawl result
|
||||||
|
if context.get("crawl_result"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
|
||||||
|
t1 = time.perf_counter()
|
||||||
|
|
||||||
|
# Get scraping strategy and ensure it has a logger
|
||||||
|
scraping_strategy = config.scraping_strategy
|
||||||
|
if not scraping_strategy.logger:
|
||||||
|
scraping_strategy.logger = logger
|
||||||
|
|
||||||
|
# Process HTML content
|
||||||
|
params = config.__dict__.copy()
|
||||||
|
params.pop("url", None)
|
||||||
|
# Add keys from kwargs to params that don't exist in params
|
||||||
|
kwargs = context.get("kwargs", {})
|
||||||
|
params.update({k: v for k, v in kwargs.items() if k not in params.keys()})
|
||||||
|
|
||||||
|
# Scraping Strategy Execution
|
||||||
|
result: ScrapingResult = scraping_strategy.scrap(url, html, **params)
|
||||||
|
|
||||||
|
if result is None:
|
||||||
|
raise ValueError(f"Process HTML, Failed to extract content from the website: {url}")
|
||||||
|
|
||||||
|
# Extract results - handle both dict and ScrapingResult
|
||||||
|
if isinstance(result, dict):
|
||||||
|
cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
|
||||||
|
media = result.get("media", {})
|
||||||
|
links = result.get("links", {})
|
||||||
|
metadata = result.get("metadata", {})
|
||||||
|
else:
|
||||||
|
cleaned_html = sanitize_input_encode(result.cleaned_html)
|
||||||
|
media = result.media.model_dump()
|
||||||
|
links = result.links.model_dump()
|
||||||
|
metadata = result.metadata
|
||||||
|
|
||||||
|
context["cleaned_html"] = cleaned_html
|
||||||
|
context["media"] = media
|
||||||
|
context["links"] = links
|
||||||
|
context["metadata"] = metadata
|
||||||
|
|
||||||
|
# Log processing completion
|
||||||
|
logger.info(
|
||||||
|
message="{url:.50}... | Time: {timing}s",
|
||||||
|
tag="SCRAPE",
|
||||||
|
params={
|
||||||
|
"url": _url,
|
||||||
|
"timing": int((time.perf_counter() - t1) * 1000) / 1000,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return 1
|
||||||
|
except InvalidCSSSelectorError as e:
|
||||||
|
context["error_message"] = str(e)
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
context["error_message"] = f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_markdown_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Generate markdown from cleaned HTML"""
|
||||||
|
url = context.get("url")
|
||||||
|
cleaned_html = context.get("cleaned_html")
|
||||||
|
config = context.get("config")
|
||||||
|
|
||||||
|
# Skip if already have a crawl result
|
||||||
|
if context.get("crawl_result"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Generate Markdown
|
||||||
|
markdown_generator = config.markdown_generator
|
||||||
|
|
||||||
|
markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown(
|
||||||
|
cleaned_html=cleaned_html,
|
||||||
|
base_url=url,
|
||||||
|
)
|
||||||
|
|
||||||
|
context["markdown_result"] = markdown_result
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def extract_structured_content_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Extract structured content using extraction strategy"""
|
||||||
|
url = context.get("url")
|
||||||
|
extracted_content = context.get("extracted_content")
|
||||||
|
config = context.get("config")
|
||||||
|
markdown_result = context.get("markdown_result")
|
||||||
|
cleaned_html = context.get("cleaned_html")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Skip if already have a crawl result or extracted content
|
||||||
|
if context.get("crawl_result") or bool(extracted_content):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
from crawl4ai.chunking_strategy import IdentityChunking
|
||||||
|
from crawl4ai.extraction_strategy import NoExtractionStrategy
|
||||||
|
|
||||||
|
if config.extraction_strategy and not isinstance(config.extraction_strategy, NoExtractionStrategy):
|
||||||
|
t1 = time.perf_counter()
|
||||||
|
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
|
||||||
|
|
||||||
|
# Choose content based on input_format
|
||||||
|
content_format = config.extraction_strategy.input_format
|
||||||
|
if content_format == "fit_markdown" and not markdown_result.fit_markdown:
|
||||||
|
logger.warning(
|
||||||
|
message="Fit markdown requested but not available. Falling back to raw markdown.",
|
||||||
|
tag="EXTRACT",
|
||||||
|
params={"url": _url},
|
||||||
|
)
|
||||||
|
content_format = "markdown"
|
||||||
|
|
||||||
|
content = {
|
||||||
|
"markdown": markdown_result.raw_markdown,
|
||||||
|
"html": context.get("html"),
|
||||||
|
"cleaned_html": cleaned_html,
|
||||||
|
"fit_markdown": markdown_result.fit_markdown,
|
||||||
|
}.get(content_format, markdown_result.raw_markdown)
|
||||||
|
|
||||||
|
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
|
||||||
|
chunking = (
|
||||||
|
IdentityChunking()
|
||||||
|
if content_format in ["html", "cleaned_html"]
|
||||||
|
else config.chunking_strategy
|
||||||
|
)
|
||||||
|
sections = chunking.chunk(content)
|
||||||
|
extracted_content = config.extraction_strategy.run(url, sections)
|
||||||
|
extracted_content = json.dumps(
|
||||||
|
extracted_content, indent=4, default=str, ensure_ascii=False
|
||||||
|
)
|
||||||
|
|
||||||
|
context["extracted_content"] = extracted_content
|
||||||
|
|
||||||
|
# Log extraction completion
|
||||||
|
logger.info(
|
||||||
|
message="Completed for {url:.50}... | Time: {timing}s",
|
||||||
|
tag="EXTRACT",
|
||||||
|
params={"url": _url, "timing": time.perf_counter() - t1},
|
||||||
|
)
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def format_html_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Format HTML if prettify is enabled"""
|
||||||
|
config = context.get("config")
|
||||||
|
cleaned_html = context.get("cleaned_html")
|
||||||
|
|
||||||
|
# Skip if already have a crawl result
|
||||||
|
if context.get("crawl_result"):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Apply HTML formatting if requested
|
||||||
|
if config.prettiify and cleaned_html:
|
||||||
|
context["cleaned_html"] = fast_format_html(cleaned_html)
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def write_cache_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Write result to cache if appropriate"""
|
||||||
|
cache_context = context.get("cache_context")
|
||||||
|
cached_result = context.get("cached_result")
|
||||||
|
|
||||||
|
# Skip if already have a crawl result or not using cache
|
||||||
|
if context.get("crawl_result") or not cache_context.should_write() or bool(cached_result):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# We'll create the CrawlResult in build_result_middleware and cache it there
|
||||||
|
# to avoid creating it twice
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def build_result_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Build the final CrawlResult object"""
|
||||||
|
url = context.get("url")
|
||||||
|
html = context.get("html", "")
|
||||||
|
cache_context = context.get("cache_context")
|
||||||
|
cached_result = context.get("cached_result")
|
||||||
|
config = context.get("config")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# If we already have a crawl result (from an earlier middleware like robots.txt check)
|
||||||
|
if context.get("crawl_result"):
|
||||||
|
result = context["crawl_result"]
|
||||||
|
context["final_result"] = CrawlResultContainer(result)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# If we have a cached result
|
||||||
|
if cached_result and html:
|
||||||
|
logger.success(
|
||||||
|
message="{url:.50}... | Status: {status} | Total: {timing}",
|
||||||
|
tag="COMPLETE",
|
||||||
|
params={
|
||||||
|
"url": cache_context.display_url,
|
||||||
|
"status": True,
|
||||||
|
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
|
||||||
|
},
|
||||||
|
colors={"status": "green", "timing": "yellow"},
|
||||||
|
)
|
||||||
|
|
||||||
|
cached_result.success = bool(html)
|
||||||
|
cached_result.session_id = getattr(config, "session_id", None)
|
||||||
|
cached_result.redirected_url = cached_result.redirected_url or url
|
||||||
|
context["final_result"] = CrawlResultContainer(cached_result)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Build a new result
|
||||||
|
try:
|
||||||
|
# Get all necessary components from context
|
||||||
|
cleaned_html = context.get("cleaned_html", "")
|
||||||
|
markdown_result = context.get("markdown_result")
|
||||||
|
media = context.get("media", {})
|
||||||
|
links = context.get("links", {})
|
||||||
|
metadata = context.get("metadata", {})
|
||||||
|
screenshot_data = context.get("screenshot_data")
|
||||||
|
pdf_data = context.get("pdf_data")
|
||||||
|
extracted_content = context.get("extracted_content")
|
||||||
|
async_response = context.get("async_response")
|
||||||
|
|
||||||
|
# Create the CrawlResult
|
||||||
|
crawl_result = CrawlResult(
|
||||||
|
url=url,
|
||||||
|
html=html,
|
||||||
|
cleaned_html=cleaned_html,
|
||||||
|
markdown=markdown_result,
|
||||||
|
media=media,
|
||||||
|
links=links,
|
||||||
|
metadata=metadata,
|
||||||
|
screenshot=screenshot_data,
|
||||||
|
pdf=pdf_data,
|
||||||
|
extracted_content=extracted_content,
|
||||||
|
success=bool(html),
|
||||||
|
error_message="",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add response details if available
|
||||||
|
if async_response:
|
||||||
|
crawl_result.status_code = async_response.status_code
|
||||||
|
crawl_result.redirected_url = async_response.redirected_url or url
|
||||||
|
crawl_result.response_headers = async_response.response_headers
|
||||||
|
crawl_result.downloaded_files = async_response.downloaded_files
|
||||||
|
crawl_result.js_execution_result = context.get("js_execution_result")
|
||||||
|
crawl_result.ssl_certificate = async_response.ssl_certificate
|
||||||
|
|
||||||
|
crawl_result.session_id = getattr(config, "session_id", None)
|
||||||
|
|
||||||
|
# Log completion
|
||||||
|
logger.success(
|
||||||
|
message="{url:.50}... | Status: {status} | Total: {timing}",
|
||||||
|
tag="COMPLETE",
|
||||||
|
params={
|
||||||
|
"url": cache_context.display_url,
|
||||||
|
"status": crawl_result.success,
|
||||||
|
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
|
||||||
|
},
|
||||||
|
colors={
|
||||||
|
"status": "green" if crawl_result.success else "red",
|
||||||
|
"timing": "yellow",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update cache if appropriate
|
||||||
|
if cache_context.should_write() and not bool(cached_result):
|
||||||
|
await async_db_manager.acache_url(crawl_result)
|
||||||
|
|
||||||
|
context["final_result"] = CrawlResultContainer(crawl_result)
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
error_context = get_error_context(sys.exc_info())
|
||||||
|
|
||||||
|
error_message = (
|
||||||
|
f"Unexpected error in build_result at line {error_context['line_no']} "
|
||||||
|
f"in {error_context['function']} ({error_context['filename']}):\n"
|
||||||
|
f"Error: {str(e)}\n\n"
|
||||||
|
f"Code context:\n{error_context['code_context']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.error_status(
|
||||||
|
url=url,
|
||||||
|
error=create_box_message(error_message, type="error"),
|
||||||
|
tag="ERROR",
|
||||||
|
)
|
||||||
|
|
||||||
|
context["final_result"] = CrawlResultContainer(
|
||||||
|
CrawlResult(
|
||||||
|
url=url, html="", success=False, error_message=error_message
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_error_middleware(context: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Error handler middleware"""
|
||||||
|
url = context.get("url", "")
|
||||||
|
error_message = context.get("error_message", "Unknown error")
|
||||||
|
logger = context.get("logger")
|
||||||
|
|
||||||
|
# Log the error
|
||||||
|
if logger:
|
||||||
|
logger.error_status(
|
||||||
|
url=url,
|
||||||
|
error=create_box_message(error_message, type="error"),
|
||||||
|
tag="ERROR",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a failure result
|
||||||
|
context["final_result"] = CrawlResultContainer(
|
||||||
|
CrawlResult(
|
||||||
|
url=url, html="", success=False, error_message=error_message
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return context
|
||||||
|
|
||||||
|
|
||||||
|
# Custom middlewares as requested
|
||||||
|
|
||||||
|
async def sentiment_analysis_middleware(context: Dict[str, Any]) -> int:
|
||||||
|
"""Analyze sentiment of generated markdown using TextBlob"""
|
||||||
|
from textblob import TextBlob
|
||||||
|
|
||||||
|
markdown_result = context.get("markdown_result")
|
||||||
|
|
||||||
|
# Skip if no markdown or already failed
|
||||||
|
if not markdown_result or not context.get("success", True):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get raw markdown text
|
||||||
|
raw_markdown = markdown_result.raw_markdown
|
||||||
|
|
||||||
|
# Analyze sentiment
|
||||||
|
blob = TextBlob(raw_markdown)
|
||||||
|
sentiment = blob.sentiment
|
||||||
|
|
||||||
|
# Add sentiment to context
|
||||||
|
context["sentiment_analysis"] = {
|
||||||
|
"polarity": sentiment.polarity, # -1.0 to 1.0 (negative to positive)
|
||||||
|
"subjectivity": sentiment.subjectivity, # 0.0 to 1.0 (objective to subjective)
|
||||||
|
"classification": "positive" if sentiment.polarity > 0.1 else
|
||||||
|
"negative" if sentiment.polarity < -0.1 else "neutral"
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
# Don't fail the pipeline on sentiment analysis failure
|
||||||
|
context["sentiment_analysis_error"] = str(e)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def log_timing_middleware(context: Dict[str, Any], name: str) -> int:
|
||||||
|
"""Log timing information for a specific point in the pipeline"""
|
||||||
|
context[f"_timing_mark_{name}"] = time.perf_counter()
|
||||||
|
|
||||||
|
# Calculate duration if we have a start time
|
||||||
|
start_key = f"_timing_start_{name}"
|
||||||
|
if start_key in context:
|
||||||
|
duration = context[f"_timing_mark_{name}"] - context[start_key]
|
||||||
|
context[f"_timing_duration_{name}"] = duration
|
||||||
|
|
||||||
|
# Log the timing if we have a logger
|
||||||
|
logger = context.get("logger")
|
||||||
|
if logger:
|
||||||
|
logger.info(
|
||||||
|
message="{name} completed in {duration:.2f}s",
|
||||||
|
tag="TIMING",
|
||||||
|
params={"name": name, "duration": duration},
|
||||||
|
)
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
async def validate_url_middleware(context: Dict[str, Any], patterns: List[str]) -> int:
|
||||||
|
"""Validate URL against glob patterns"""
|
||||||
|
import fnmatch
|
||||||
|
url = context.get("url", "")
|
||||||
|
|
||||||
|
# If no patterns provided, allow all
|
||||||
|
if not patterns:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Check if URL matches any of the allowed patterns
|
||||||
|
for pattern in patterns:
|
||||||
|
if fnmatch.fnmatch(url, pattern):
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# If we get here, URL didn't match any patterns
|
||||||
|
context["error_message"] = f"URL '{url}' does not match any allowed patterns"
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
# Update the default middleware list function
|
||||||
|
def create_default_middleware_list():
|
||||||
|
"""Return the default list of middleware functions for the pipeline."""
|
||||||
|
return [
|
||||||
|
initialize_context_middleware,
|
||||||
|
check_cache_middleware,
|
||||||
|
browser_hub_middleware, # Add browser hub middleware before fetch_content
|
||||||
|
configure_proxy_middleware,
|
||||||
|
check_robots_txt_middleware,
|
||||||
|
fetch_content_middleware,
|
||||||
|
scrape_content_middleware,
|
||||||
|
generate_markdown_middleware,
|
||||||
|
extract_structured_content_middleware,
|
||||||
|
format_html_middleware,
|
||||||
|
build_result_middleware
|
||||||
|
]
|
||||||
297
crawl4ai/pipeline/pipeline.py
Normal file
297
crawl4ai/pipeline/pipeline.py
Normal file
@@ -0,0 +1,297 @@
|
|||||||
|
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from typing import Callable, Dict, List, Any, Optional, Awaitable, Union, TypedDict, Tuple, Coroutine
|
||||||
|
|
||||||
|
from .middlewares import create_default_middleware_list, handle_error_middleware
|
||||||
|
from crawl4ai.models import CrawlResultContainer, CrawlResult
|
||||||
|
from crawl4ai.async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy
|
||||||
|
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
|
||||||
|
|
||||||
|
class CrawlSpec(TypedDict, total=False):
|
||||||
|
"""Specification for a single crawl operation in batch_crawl."""
|
||||||
|
url: str
|
||||||
|
config: Optional[CrawlerRunConfig]
|
||||||
|
browser_config: Optional[BrowserConfig]
|
||||||
|
|
||||||
|
class BatchStatus(TypedDict, total=False):
|
||||||
|
"""Status information for batch crawl operations."""
|
||||||
|
total: int
|
||||||
|
processed: int
|
||||||
|
succeeded: int
|
||||||
|
failed: int
|
||||||
|
in_progress: int
|
||||||
|
duration: float
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
"""
|
||||||
|
A pipeline processor that executes a series of async middleware functions.
|
||||||
|
Each middleware function receives a context dictionary, updates it,
|
||||||
|
and returns 1 for success or 0 for failure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
middleware: List[Callable[[Dict[str, Any]], Awaitable[int]]] = None,
|
||||||
|
error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
|
||||||
|
after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
|
||||||
|
crawler_strategy: Optional[AsyncCrawlerStrategy] = None,
|
||||||
|
browser_config: Optional[BrowserConfig] = None,
|
||||||
|
logger: Optional[AsyncLogger] = None,
|
||||||
|
_initial_context: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
|
self.middleware = middleware or create_default_middleware_list()
|
||||||
|
self.error_handler = error_handler or handle_error_middleware
|
||||||
|
self.after_middleware_callback = after_middleware_callback
|
||||||
|
self.browser_config = browser_config or BrowserConfig()
|
||||||
|
self.logger = logger or AsyncLogger(verbose=self.browser_config.verbose)
|
||||||
|
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
|
||||||
|
browser_config=self.browser_config,
|
||||||
|
logger=self.logger
|
||||||
|
)
|
||||||
|
self._initial_context = _initial_context
|
||||||
|
self._strategy_initialized = False
|
||||||
|
|
||||||
|
async def _initialize_strategy__(self):
|
||||||
|
"""Initialize the crawler strategy if not already initialized"""
|
||||||
|
if not self.crawler_strategy:
|
||||||
|
self.crawler_strategy = AsyncPlaywrightCrawlerStrategy(
|
||||||
|
browser_config=self.browser_config,
|
||||||
|
logger=self.logger
|
||||||
|
)
|
||||||
|
|
||||||
|
if not self._strategy_initialized:
|
||||||
|
await self.crawler_strategy.__aenter__()
|
||||||
|
self._strategy_initialized = True
|
||||||
|
|
||||||
|
async def _initialize_strategy(self):
|
||||||
|
"""Initialize the crawler strategy if not already initialized"""
|
||||||
|
# With our new approach, we don't need to create the crawler strategy here
|
||||||
|
# as it will be created on-demand in fetch_content_middleware
|
||||||
|
|
||||||
|
# Just ensure browser hub is available if needed
|
||||||
|
if hasattr(self, "_initial_context") and "browser_hub" not in self._initial_context:
|
||||||
|
# If a browser_config was provided but no browser_hub yet,
|
||||||
|
# we'll let the browser_hub_middleware handle creating it
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Mark as initialized to prevent repeated initialization attempts
|
||||||
|
self._strategy_initialized = True
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
"""Start the crawler strategy and prepare it for use"""
|
||||||
|
if not self._strategy_initialized:
|
||||||
|
await self._initialize_strategy()
|
||||||
|
self._strategy_initialized = True
|
||||||
|
if self.crawler_strategy:
|
||||||
|
await self.crawler_strategy.__aenter__()
|
||||||
|
self._strategy_initialized = True
|
||||||
|
else:
|
||||||
|
raise ValueError("Crawler strategy is not initialized.")
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
"""Close the crawler strategy and clean up resources"""
|
||||||
|
await self.stop()
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
"""Close the crawler strategy and clean up resources"""
|
||||||
|
if self._strategy_initialized and self.crawler_strategy:
|
||||||
|
await self.crawler_strategy.__aexit__(None, None, None)
|
||||||
|
self._strategy_initialized = False
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
await self.start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
await self.close()
|
||||||
|
|
||||||
|
async def crawl(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs) -> CrawlResultContainer:
|
||||||
|
"""
|
||||||
|
Crawl a URL and process it through the pipeline.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: The URL to crawl
|
||||||
|
config: Optional configuration for the crawl
|
||||||
|
**kwargs: Additional arguments to pass to the middleware
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CrawlResultContainer: The result of the crawl
|
||||||
|
"""
|
||||||
|
# Initialize strategy if needed
|
||||||
|
await self._initialize_strategy()
|
||||||
|
|
||||||
|
# Create the initial context
|
||||||
|
context = {
|
||||||
|
"url": url,
|
||||||
|
"config": config or CrawlerRunConfig(),
|
||||||
|
"browser_config": self.browser_config,
|
||||||
|
"logger": self.logger,
|
||||||
|
"crawler_strategy": self.crawler_strategy,
|
||||||
|
"kwargs": kwargs
|
||||||
|
}
|
||||||
|
|
||||||
|
# Process the pipeline
|
||||||
|
result_context = await self.process(context)
|
||||||
|
|
||||||
|
# Return the final result
|
||||||
|
return result_context.get("final_result")
|
||||||
|
|
||||||
|
async def process(self, initial_context: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Process all middleware functions with the given context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
initial_context: Initial context dictionary, defaults to empty dict
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Updated context dictionary after all middleware have been processed
|
||||||
|
"""
|
||||||
|
context = {**self._initial_context}
|
||||||
|
if initial_context:
|
||||||
|
context.update(initial_context)
|
||||||
|
|
||||||
|
# Record pipeline start time
|
||||||
|
context["_pipeline_start_time"] = time.perf_counter()
|
||||||
|
|
||||||
|
for middleware_fn in self.middleware:
|
||||||
|
# Get middleware name for logging
|
||||||
|
middleware_name = getattr(middleware_fn, '__name__', str(middleware_fn))
|
||||||
|
|
||||||
|
# Record start time for this middleware
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
context[f"_timing_start_{middleware_name}"] = start_time
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Execute middleware (all middleware functions are async)
|
||||||
|
result = await middleware_fn(context)
|
||||||
|
|
||||||
|
# Record completion time
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
context[f"_timing_end_{middleware_name}"] = end_time
|
||||||
|
context[f"_timing_duration_{middleware_name}"] = end_time - start_time
|
||||||
|
|
||||||
|
# Execute after-middleware callback if provided
|
||||||
|
if self.after_middleware_callback:
|
||||||
|
await self.after_middleware_callback(middleware_name, context)
|
||||||
|
|
||||||
|
# Convert boolean returns to int (True->1, False->0)
|
||||||
|
if isinstance(result, bool):
|
||||||
|
result = 1 if result else 0
|
||||||
|
|
||||||
|
# Handle failure
|
||||||
|
if result == 0:
|
||||||
|
if self.error_handler:
|
||||||
|
context["_error_in"] = middleware_name
|
||||||
|
context["_error_at"] = time.perf_counter()
|
||||||
|
return await self._handle_error(context)
|
||||||
|
else:
|
||||||
|
context["success"] = False
|
||||||
|
context["error_message"] = f"Pipeline failed at {middleware_name}"
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
# Record error information
|
||||||
|
context["_error_in"] = middleware_name
|
||||||
|
context["_error_at"] = time.perf_counter()
|
||||||
|
context["_exception"] = e
|
||||||
|
context["success"] = False
|
||||||
|
context["error_message"] = f"Exception in {middleware_name}: {str(e)}"
|
||||||
|
|
||||||
|
# Call error handler if available
|
||||||
|
if self.error_handler:
|
||||||
|
return await self._handle_error(context)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Record pipeline completion time
|
||||||
|
pipeline_end_time = time.perf_counter()
|
||||||
|
context["_pipeline_end_time"] = pipeline_end_time
|
||||||
|
context["_pipeline_duration"] = pipeline_end_time - context["_pipeline_start_time"]
|
||||||
|
|
||||||
|
# Set success to True if not already set (no failures)
|
||||||
|
if "success" not in context:
|
||||||
|
context["success"] = True
|
||||||
|
|
||||||
|
return context
|
||||||
|
|
||||||
|
async def _handle_error(self, context: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Handle errors by calling the error handler"""
|
||||||
|
try:
|
||||||
|
return await self.error_handler(context)
|
||||||
|
except Exception as e:
|
||||||
|
# If error handler fails, update context with this new error
|
||||||
|
context["_error_handler_exception"] = e
|
||||||
|
context["error_message"] = f"Error handler failed: {str(e)}"
|
||||||
|
return context
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def create_pipeline(
|
||||||
|
middleware_list=None,
|
||||||
|
error_handler=None,
|
||||||
|
after_middleware_callback=None,
|
||||||
|
browser_config=None,
|
||||||
|
browser_hub_id=None,
|
||||||
|
browser_hub_connection=None,
|
||||||
|
browser_hub=None,
|
||||||
|
logger=None
|
||||||
|
) -> Pipeline:
|
||||||
|
"""
|
||||||
|
Factory function to create a pipeline with Browser-Hub integration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
middleware_list: List of middleware functions
|
||||||
|
error_handler: Error handler middleware
|
||||||
|
after_middleware_callback: Callback after middleware execution
|
||||||
|
browser_config: Configuration for the browser
|
||||||
|
browser_hub_id: ID for browser hub instance
|
||||||
|
browser_hub_connection: Connection string for existing browser hub
|
||||||
|
browser_hub: Existing browser hub instance to use
|
||||||
|
logger: Logger instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Pipeline: Configured pipeline instance
|
||||||
|
"""
|
||||||
|
# Use default middleware list if none provided
|
||||||
|
middleware = middleware_list or create_default_middleware_list()
|
||||||
|
|
||||||
|
# Create the pipeline
|
||||||
|
pipeline = Pipeline(
|
||||||
|
middleware=middleware,
|
||||||
|
error_handler=error_handler,
|
||||||
|
after_middleware_callback=after_middleware_callback,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set browser-related attributes in the initial context
|
||||||
|
pipeline._initial_context = {
|
||||||
|
"browser_config": browser_config,
|
||||||
|
"browser_hub_id": browser_hub_id,
|
||||||
|
"browser_hub_connection": browser_hub_connection,
|
||||||
|
"browser_hub": browser_hub,
|
||||||
|
"logger": logger
|
||||||
|
}
|
||||||
|
|
||||||
|
return pipeline
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# async def create_pipeline(
|
||||||
|
# middleware_list: Optional[List[Callable[[Dict[str, Any]], Awaitable[int]]]] = None,
|
||||||
|
# error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
|
||||||
|
# after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
|
||||||
|
# crawler_strategy = None,
|
||||||
|
# browser_config = None,
|
||||||
|
# logger = None
|
||||||
|
# ) -> Pipeline:
|
||||||
|
# """Factory function to create a pipeline with the given middleware"""
|
||||||
|
# return Pipeline(
|
||||||
|
# middleware=middleware_list,
|
||||||
|
# error_handler=error_handler,
|
||||||
|
# after_middleware_callback=after_middleware_callback,
|
||||||
|
# crawler_strategy=crawler_strategy,
|
||||||
|
# browser_config=browser_config,
|
||||||
|
# logger=logger
|
||||||
|
# )
|
||||||
109
crawl4ai/pipeline/test_pipeline.py
Normal file
109
crawl4ai/pipeline/test_pipeline.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
import asyncio
|
||||||
|
from crawl4ai import (
|
||||||
|
BrowserConfig,
|
||||||
|
CrawlerRunConfig,
|
||||||
|
CacheMode,
|
||||||
|
DefaultMarkdownGenerator,
|
||||||
|
PruningContentFilter
|
||||||
|
)
|
||||||
|
from pipeline import Pipeline
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create configuration objects
|
||||||
|
browser_config = BrowserConfig(headless=True, verbose=True)
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
markdown_generator=DefaultMarkdownGenerator(
|
||||||
|
content_filter=PruningContentFilter(
|
||||||
|
threshold=0.48,
|
||||||
|
threshold_type="fixed",
|
||||||
|
min_word_threshold=0
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create and use pipeline with context manager
|
||||||
|
async with Pipeline(browser_config=browser_config) as pipeline:
|
||||||
|
result = await pipeline.crawl(
|
||||||
|
url="https://www.example.com",
|
||||||
|
config=crawler_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Print the result
|
||||||
|
print(f"URL: {result.url}")
|
||||||
|
print(f"Success: {result.success}")
|
||||||
|
|
||||||
|
if result.success:
|
||||||
|
print("\nMarkdown excerpt:")
|
||||||
|
print(result.markdown.raw_markdown[:500] + "...")
|
||||||
|
else:
|
||||||
|
print(f"Error: {result.error_message}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
|
|
||||||
|
class CrawlTarget:
|
||||||
|
def __init__(self, urls, config=None):
|
||||||
|
self.urls = urls
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"CrawlTarget(urls={self.urls}, config={self.config})"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# async def main():
|
||||||
|
# # Create configuration objects
|
||||||
|
# browser_config = BrowserConfig(headless=True, verbose=True)
|
||||||
|
|
||||||
|
# # Define different configurations
|
||||||
|
# config1 = CrawlerRunConfig(
|
||||||
|
# cache_mode=CacheMode.BYPASS,
|
||||||
|
# markdown_generator=DefaultMarkdownGenerator(
|
||||||
|
# content_filter=PruningContentFilter(threshold=0.48)
|
||||||
|
# ),
|
||||||
|
# )
|
||||||
|
|
||||||
|
# config2 = CrawlerRunConfig(
|
||||||
|
# cache_mode=CacheMode.ENABLED,
|
||||||
|
# screenshot=True,
|
||||||
|
# pdf=True
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # Create crawl targets
|
||||||
|
# targets = [
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls=["https://www.example.com", "https://www.wikipedia.org"],
|
||||||
|
# config=config1
|
||||||
|
# ),
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls="https://news.ycombinator.com",
|
||||||
|
# config=config2
|
||||||
|
# ),
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
|
||||||
|
# config=None
|
||||||
|
# )
|
||||||
|
# ]
|
||||||
|
|
||||||
|
# # Create and use pipeline with context manager
|
||||||
|
# async with Pipeline(browser_config=browser_config) as pipeline:
|
||||||
|
# all_results = await pipeline.crawl_batch(targets)
|
||||||
|
|
||||||
|
# for target_key, results in all_results.items():
|
||||||
|
# print(f"\n===== Results for {target_key} =====")
|
||||||
|
# print(f"Number of URLs crawled: {len(results)}")
|
||||||
|
|
||||||
|
# for i, result in enumerate(results):
|
||||||
|
# print(f"\nURL {i+1}: {result.url}")
|
||||||
|
# print(f"Success: {result.success}")
|
||||||
|
|
||||||
|
# if result.success:
|
||||||
|
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
|
||||||
|
# else:
|
||||||
|
# print(f"Error: {result.error_message}")
|
||||||
|
|
||||||
|
# if __name__ == "__main__":
|
||||||
|
# asyncio.run(main())
|
||||||
222
tests/pipeline/demo_browser_hub_pipeline.py
Normal file
222
tests/pipeline/demo_browser_hub_pipeline.py
Normal file
@@ -0,0 +1,222 @@
|
|||||||
|
# demo_browser_hub.py
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from crawl4ai.browser.browser_hub import BrowserHub
|
||||||
|
from pipeline import create_pipeline
|
||||||
|
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
from crawl4ai.models import CrawlResultContainer
|
||||||
|
from crawl4ai.cache_context import CacheMode
|
||||||
|
from crawl4ai import DefaultMarkdownGenerator
|
||||||
|
from crawl4ai import PruningContentFilter
|
||||||
|
|
||||||
|
async def create_prewarmed_browser_hub(urls_to_crawl: List[str]):
|
||||||
|
"""Create a pre-warmed browser hub with 10 browsers and 5 pages each."""
|
||||||
|
# Set up logging
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
logger.info("Setting up pre-warmed browser hub", tag="DEMO")
|
||||||
|
|
||||||
|
# Create browser configuration
|
||||||
|
browser_config = BrowserConfig(
|
||||||
|
browser_type="chromium",
|
||||||
|
headless=True, # Set to False to see the browsers in action
|
||||||
|
viewport_width=1280,
|
||||||
|
viewport_height=800,
|
||||||
|
light_mode=True, # Optimize for performance
|
||||||
|
java_script_enabled=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create crawler configurations for pre-warming with different user agents
|
||||||
|
# This allows pages to be ready for different scenarios
|
||||||
|
crawler_configs = [
|
||||||
|
CrawlerRunConfig(
|
||||||
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
||||||
|
wait_until="networkidle"
|
||||||
|
),
|
||||||
|
# CrawlerRunConfig(
|
||||||
|
# user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
|
||||||
|
# wait_until="networkidle"
|
||||||
|
# ),
|
||||||
|
# CrawlerRunConfig(
|
||||||
|
# user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
|
||||||
|
# wait_until="networkidle"
|
||||||
|
# )
|
||||||
|
]
|
||||||
|
|
||||||
|
# Number of browsers and pages per browser
|
||||||
|
num_browsers = 1
|
||||||
|
pages_per_browser = 1
|
||||||
|
|
||||||
|
# Distribute pages across configurations
|
||||||
|
# We'll create a total of 50 pages (10 browsers × 5 pages)
|
||||||
|
page_configs = []
|
||||||
|
total_pages = num_browsers * pages_per_browser
|
||||||
|
pages_per_config = total_pages // len(crawler_configs)
|
||||||
|
|
||||||
|
for i, config in enumerate(crawler_configs):
|
||||||
|
# For the last config, add any remaining pages
|
||||||
|
if i == len(crawler_configs) - 1:
|
||||||
|
remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1))
|
||||||
|
page_configs.append((browser_config, config, remaining))
|
||||||
|
else:
|
||||||
|
page_configs.append((browser_config, config, pages_per_config))
|
||||||
|
|
||||||
|
# Create browser hub with pre-warmed pages
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO")
|
||||||
|
|
||||||
|
hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config,
|
||||||
|
hub_id="demo_hub",
|
||||||
|
logger=logger,
|
||||||
|
max_browsers_per_config=num_browsers,
|
||||||
|
max_pages_per_browser=pages_per_browser,
|
||||||
|
initial_pool_size=num_browsers,
|
||||||
|
page_configs=page_configs
|
||||||
|
)
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
logger.success(
|
||||||
|
message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds",
|
||||||
|
tag="DEMO",
|
||||||
|
params={
|
||||||
|
"total_pages": total_pages,
|
||||||
|
"duration": end_time - start_time
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get and display pool status
|
||||||
|
status = await hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Browser pool status: {status}",
|
||||||
|
tag="DEMO",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
|
||||||
|
return hub
|
||||||
|
|
||||||
|
async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]:
|
||||||
|
"""Crawl a list of URLs using a pre-warmed browser hub."""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
|
||||||
|
# Create crawler configuration
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
markdown_generator=DefaultMarkdownGenerator(
|
||||||
|
content_filter=PruningContentFilter(
|
||||||
|
threshold=0.48,
|
||||||
|
threshold_type="fixed",
|
||||||
|
min_word_threshold=0
|
||||||
|
)
|
||||||
|
),
|
||||||
|
wait_until="networkidle",
|
||||||
|
screenshot=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create pipeline with the browser hub
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_hub=hub,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
# Crawl all URLs in parallel
|
||||||
|
async def crawl_url(url):
|
||||||
|
logger.info(f"Crawling {url}...", tag="CRAWL")
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
logger.success(f"Completed crawl of {url}", tag="CRAWL")
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Create tasks for all URLs
|
||||||
|
tasks = [crawl_url(url) for url in urls]
|
||||||
|
|
||||||
|
# Execute all tasks in parallel and collect results
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Main demo function."""
|
||||||
|
# List of URLs to crawl
|
||||||
|
urls_to_crawl = [
|
||||||
|
"https://example.com",
|
||||||
|
# "https://www.python.org",
|
||||||
|
# "https://httpbin.org/html",
|
||||||
|
# "https://news.ycombinator.com",
|
||||||
|
# "https://github.com",
|
||||||
|
# "https://pypi.org",
|
||||||
|
# "https://docs.python.org/3/",
|
||||||
|
# "https://opensource.org",
|
||||||
|
# "https://whatismyipaddress.com",
|
||||||
|
# "https://en.wikipedia.org/wiki/Web_scraping"
|
||||||
|
]
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
logger.info("Starting browser hub demo", tag="DEMO")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create pre-warmed browser hub
|
||||||
|
hub = await create_prewarmed_browser_hub(urls_to_crawl)
|
||||||
|
|
||||||
|
# Use hub to crawl URLs
|
||||||
|
logger.info("Crawling URLs in parallel...", tag="DEMO")
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
results = await crawl_urls_with_hub(hub, urls_to_crawl)
|
||||||
|
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Display results
|
||||||
|
logger.success(
|
||||||
|
message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)",
|
||||||
|
tag="DEMO",
|
||||||
|
params={
|
||||||
|
"count": len(results),
|
||||||
|
"duration": end_time - start_time,
|
||||||
|
"avg": (end_time - start_time) / len(results)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Print summary of results
|
||||||
|
logger.info("Crawl results summary:", tag="DEMO")
|
||||||
|
for i, result in enumerate(results):
|
||||||
|
logger.info(
|
||||||
|
message="{idx}. {url}: Success={success}, Content length={length}",
|
||||||
|
tag="RESULT",
|
||||||
|
params={
|
||||||
|
"idx": i+1,
|
||||||
|
"url": result.url,
|
||||||
|
"success": result.success,
|
||||||
|
"length": len(result.html) if result.html else 0
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.success and result.markdown and result.markdown.raw_markdown:
|
||||||
|
# Print a snippet of the markdown
|
||||||
|
markdown_snippet = result.markdown.raw_markdown[:150] + "..."
|
||||||
|
logger.info(
|
||||||
|
message=" Markdown: {snippet}",
|
||||||
|
tag="RESULT",
|
||||||
|
params={"snippet": markdown_snippet}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Display final browser pool status
|
||||||
|
status = await hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Final browser pool status: {status}",
|
||||||
|
tag="DEMO",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
logger.info("Shutting down browser hub...", tag="DEMO")
|
||||||
|
await BrowserHub.shutdown_all()
|
||||||
|
logger.success("Demo completed", tag="DEMO")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
505
tests/pipeline/extended_browser_hub_tests.py
Normal file
505
tests/pipeline/extended_browser_hub_tests.py
Normal file
@@ -0,0 +1,505 @@
|
|||||||
|
# extended_browser_hub_tests.py
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from crawl4ai.browser.browser_hub import BrowserHub
|
||||||
|
from pipeline import create_pipeline
|
||||||
|
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
from crawl4ai.cache_context import CacheMode
|
||||||
|
|
||||||
|
# Common test URLs
|
||||||
|
TEST_URLS = [
|
||||||
|
"https://example.com",
|
||||||
|
"https://example.com/page1",
|
||||||
|
"https://httpbin.org/html",
|
||||||
|
"https://httpbin.org/headers",
|
||||||
|
"https://httpbin.org/ip",
|
||||||
|
"https://httpstat.us/200"
|
||||||
|
]
|
||||||
|
|
||||||
|
class TestResults:
|
||||||
|
"""Simple container for test results"""
|
||||||
|
def __init__(self, name: str):
|
||||||
|
self.name = name
|
||||||
|
self.results = []
|
||||||
|
self.start_time = None
|
||||||
|
self.end_time = None
|
||||||
|
self.errors = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def duration(self) -> float:
|
||||||
|
if self.start_time and self.end_time:
|
||||||
|
return self.end_time - self.start_time
|
||||||
|
return 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def success_rate(self) -> float:
|
||||||
|
if not self.results:
|
||||||
|
return 0
|
||||||
|
return sum(1 for r in self.results if r.success) / len(self.results) * 100
|
||||||
|
|
||||||
|
def log_summary(self, logger: AsyncLogger):
|
||||||
|
logger.info(f"=== Test: {self.name} ===", tag="SUMMARY")
|
||||||
|
logger.info(
|
||||||
|
message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}",
|
||||||
|
tag="SUMMARY",
|
||||||
|
params={
|
||||||
|
"duration": self.duration,
|
||||||
|
"success_rate": self.success_rate,
|
||||||
|
"count": len(self.results)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.errors:
|
||||||
|
logger.error(
|
||||||
|
message="Errors ({count}): {errors}",
|
||||||
|
tag="SUMMARY",
|
||||||
|
params={
|
||||||
|
"count": len(self.errors),
|
||||||
|
"errors": "; ".join(str(e) for e in self.errors)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# ======== TEST SCENARIO 1: Simple default configuration ========
|
||||||
|
async def test_default_configuration():
|
||||||
|
"""
|
||||||
|
Test Scenario 1: Simple default configuration
|
||||||
|
|
||||||
|
This tests the basic case where the user does not provide any specific
|
||||||
|
browser configuration, relying on default auto-setup.
|
||||||
|
"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
results = TestResults("Default Configuration")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create pipeline with no browser config
|
||||||
|
pipeline = await create_pipeline(logger=logger)
|
||||||
|
|
||||||
|
# Start timing
|
||||||
|
results.start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Create basic crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process each URL sequentially
|
||||||
|
for url in TEST_URLS:
|
||||||
|
try:
|
||||||
|
logger.info(f"Crawling {url} with default configuration", tag="TEST")
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
results.results.append(result)
|
||||||
|
|
||||||
|
logger.success(
|
||||||
|
message="Result: url={url}, success={success}, content_length={length}",
|
||||||
|
tag="TEST",
|
||||||
|
params={
|
||||||
|
"url": url,
|
||||||
|
"success": result.success,
|
||||||
|
"length": len(result.html) if result.html else 0
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# End timing
|
||||||
|
results.end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# Log summary
|
||||||
|
results.log_summary(logger)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
# ======== TEST SCENARIO 2: Detailed custom configuration ========
|
||||||
|
async def test_custom_configuration():
|
||||||
|
"""
|
||||||
|
Test Scenario 2: Detailed custom configuration
|
||||||
|
|
||||||
|
This tests the case where the user provides detailed browser configuration
|
||||||
|
to customize the browser behavior.
|
||||||
|
"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
results = TestResults("Custom Configuration")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create custom browser config
|
||||||
|
browser_config = BrowserConfig(
|
||||||
|
browser_type="chromium",
|
||||||
|
headless=True,
|
||||||
|
viewport_width=1920,
|
||||||
|
viewport_height=1080,
|
||||||
|
user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
|
||||||
|
light_mode=True,
|
||||||
|
ignore_https_errors=True,
|
||||||
|
extra_args=["--disable-extensions"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create custom crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="networkidle",
|
||||||
|
page_timeout=30000,
|
||||||
|
screenshot=True,
|
||||||
|
pdf=False,
|
||||||
|
screenshot_wait_for=0.5,
|
||||||
|
wait_for_images=True,
|
||||||
|
scan_full_page=True,
|
||||||
|
scroll_delay=0.2,
|
||||||
|
process_iframes=True,
|
||||||
|
remove_overlay_elements=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create pipeline with custom configuration
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_config=browser_config,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start timing
|
||||||
|
results.start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Process each URL sequentially
|
||||||
|
for url in TEST_URLS:
|
||||||
|
try:
|
||||||
|
logger.info(f"Crawling {url} with custom configuration", tag="TEST")
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
results.results.append(result)
|
||||||
|
|
||||||
|
has_screenshot = result.screenshot is not None
|
||||||
|
|
||||||
|
logger.success(
|
||||||
|
message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}",
|
||||||
|
tag="TEST",
|
||||||
|
params={
|
||||||
|
"url": url,
|
||||||
|
"success": result.success,
|
||||||
|
"screenshot": has_screenshot,
|
||||||
|
"length": len(result.html) if result.html else 0
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# End timing
|
||||||
|
results.end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Get browser hub status from context
|
||||||
|
try:
|
||||||
|
# Run a dummy crawl to get the context with browser hub
|
||||||
|
context = await pipeline.process({"url": "about:blank", "config": crawler_config})
|
||||||
|
browser_hub = context.get("browser_hub")
|
||||||
|
if browser_hub:
|
||||||
|
status = await browser_hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Browser hub status: {status}",
|
||||||
|
tag="TEST",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# Log summary
|
||||||
|
results.log_summary(logger)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
# ======== TEST SCENARIO 3: Using pre-initialized browser hub ========
|
||||||
|
async def test_preinitalized_browser_hub():
|
||||||
|
"""
|
||||||
|
Test Scenario 3: Using pre-initialized browser hub
|
||||||
|
|
||||||
|
This tests the case where a browser hub is initialized separately
|
||||||
|
and then passed to the pipeline.
|
||||||
|
"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
results = TestResults("Pre-initialized Browser Hub")
|
||||||
|
|
||||||
|
browser_hub = None
|
||||||
|
try:
|
||||||
|
# Create and initialize browser hub separately
|
||||||
|
logger.info("Initializing browser hub separately", tag="TEST")
|
||||||
|
|
||||||
|
browser_config = BrowserConfig(
|
||||||
|
browser_type="chromium",
|
||||||
|
headless=True,
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
|
||||||
|
browser_hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config,
|
||||||
|
hub_id="test_preinitalized",
|
||||||
|
logger=logger,
|
||||||
|
max_browsers_per_config=2,
|
||||||
|
max_pages_per_browser=3,
|
||||||
|
initial_pool_size=2
|
||||||
|
)
|
||||||
|
|
||||||
|
# Display initial status
|
||||||
|
status = await browser_hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Initial browser hub status: {status}",
|
||||||
|
tag="TEST",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create pipeline with pre-initialized browser hub
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_hub=browser_hub,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="networkidle",
|
||||||
|
screenshot=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start timing
|
||||||
|
results.start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Process URLs in parallel
|
||||||
|
async def crawl_url(url):
|
||||||
|
try:
|
||||||
|
logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST")
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
logger.success(f"Completed crawl of {url}", tag="TEST")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create tasks for all URLs
|
||||||
|
tasks = [crawl_url(url) for url in TEST_URLS]
|
||||||
|
|
||||||
|
# Execute all tasks in parallel and collect results
|
||||||
|
all_results = await asyncio.gather(*tasks)
|
||||||
|
results.results = [r for r in all_results if r is not None]
|
||||||
|
|
||||||
|
# End timing
|
||||||
|
results.end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Display final status
|
||||||
|
status = await browser_hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Final browser hub status: {status}",
|
||||||
|
tag="TEST",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# Log summary
|
||||||
|
results.log_summary(logger)
|
||||||
|
|
||||||
|
return results, browser_hub
|
||||||
|
|
||||||
|
# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ========
|
||||||
|
async def test_parallel_pipelines():
|
||||||
|
"""
|
||||||
|
Test Scenario 4: Multiple parallel pipelines sharing browser hub
|
||||||
|
|
||||||
|
This tests the case where multiple pipelines share the same browser hub,
|
||||||
|
demonstrating resource sharing and parallel operation.
|
||||||
|
"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
results = TestResults("Parallel Pipelines")
|
||||||
|
|
||||||
|
# We'll reuse the browser hub from the previous test
|
||||||
|
_, browser_hub = await test_preinitalized_browser_hub()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create 3 pipelines that all share the same browser hub
|
||||||
|
pipelines = []
|
||||||
|
for i in range(3):
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_hub=browser_hub,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
pipelines.append(pipeline)
|
||||||
|
|
||||||
|
logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST")
|
||||||
|
|
||||||
|
# Create crawler configs with different settings
|
||||||
|
configs = [
|
||||||
|
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
|
||||||
|
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
|
||||||
|
CrawlerRunConfig(wait_until="load", scan_full_page=True)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Start timing
|
||||||
|
results.start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Function to process URLs with a specific pipeline
|
||||||
|
async def process_with_pipeline(pipeline_idx, urls):
|
||||||
|
pipeline_results = []
|
||||||
|
for url in urls:
|
||||||
|
try:
|
||||||
|
logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST")
|
||||||
|
result = await pipelines[pipeline_idx].crawl(
|
||||||
|
url=url,
|
||||||
|
config=configs[pipeline_idx]
|
||||||
|
)
|
||||||
|
pipeline_results.append(result)
|
||||||
|
logger.success(
|
||||||
|
message="Pipeline {idx} completed: url={url}, success={success}",
|
||||||
|
tag="TEST",
|
||||||
|
params={
|
||||||
|
"idx": pipeline_idx,
|
||||||
|
"url": url,
|
||||||
|
"success": result.success
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
message="Pipeline {idx} error: {error}",
|
||||||
|
tag="TEST",
|
||||||
|
params={
|
||||||
|
"idx": pipeline_idx,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
results.errors.append(e)
|
||||||
|
return pipeline_results
|
||||||
|
|
||||||
|
# Distribute URLs among pipelines
|
||||||
|
pipeline_urls = [
|
||||||
|
TEST_URLS[:2],
|
||||||
|
TEST_URLS[2:4],
|
||||||
|
TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3
|
||||||
|
]
|
||||||
|
|
||||||
|
# Execute all pipelines in parallel
|
||||||
|
tasks = [
|
||||||
|
process_with_pipeline(i, urls)
|
||||||
|
for i, urls in enumerate(pipeline_urls)
|
||||||
|
]
|
||||||
|
|
||||||
|
pipeline_results = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
# Flatten results
|
||||||
|
for res_list in pipeline_results:
|
||||||
|
results.results.extend(res_list)
|
||||||
|
|
||||||
|
# End timing
|
||||||
|
results.end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Display browser hub status
|
||||||
|
status = await browser_hub.get_pool_status()
|
||||||
|
logger.info(
|
||||||
|
message="Browser hub status after parallel pipelines: {status}",
|
||||||
|
tag="TEST",
|
||||||
|
params={"status": status}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# Log summary
|
||||||
|
results.log_summary(logger)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
# ======== TEST SCENARIO 5: Browser hub with connection string ========
|
||||||
|
async def test_connection_string():
|
||||||
|
"""
|
||||||
|
Test Scenario 5: Browser hub with connection string
|
||||||
|
|
||||||
|
This tests the case where a browser hub is initialized from a connection string,
|
||||||
|
simulating connecting to a running browser hub service.
|
||||||
|
"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
results = TestResults("Connection String")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create pipeline with connection string
|
||||||
|
# Note: In a real implementation, this would connect to an existing service
|
||||||
|
# For this test, we're using a simulated connection
|
||||||
|
connection_string = "localhost:9222" # Simulated connection string
|
||||||
|
|
||||||
|
pipeline = await create_pipeline(
|
||||||
|
browser_hub_connection=connection_string,
|
||||||
|
logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="networkidle"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start timing
|
||||||
|
results.start_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Test with a single URL
|
||||||
|
url = TEST_URLS[0]
|
||||||
|
try:
|
||||||
|
logger.info(f"Crawling {url} with connection string hub", tag="TEST")
|
||||||
|
result = await pipeline.crawl(url=url, config=crawler_config)
|
||||||
|
results.results.append(result)
|
||||||
|
|
||||||
|
logger.success(
|
||||||
|
message="Result: url={url}, success={success}, content_length={length}",
|
||||||
|
tag="TEST",
|
||||||
|
params={
|
||||||
|
"url": url,
|
||||||
|
"success": result.success,
|
||||||
|
"length": len(result.html) if result.html else 0
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# End timing
|
||||||
|
results.end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
|
||||||
|
results.errors.append(e)
|
||||||
|
|
||||||
|
# Log summary
|
||||||
|
results.log_summary(logger)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
# ======== RUN ALL TESTS ========
|
||||||
|
async def run_all_tests():
|
||||||
|
"""Run all test scenarios"""
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run each test scenario
|
||||||
|
await test_default_configuration()
|
||||||
|
# await test_custom_configuration()
|
||||||
|
# await test_preinitalized_browser_hub()
|
||||||
|
# await test_parallel_pipelines()
|
||||||
|
# await test_connection_string()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Test suite failed: {str(e)}", tag="MAIN")
|
||||||
|
finally:
|
||||||
|
# Clean up all browser hubs
|
||||||
|
logger.info("Shutting down all browser hubs...", tag="MAIN")
|
||||||
|
await BrowserHub.shutdown_all()
|
||||||
|
logger.success("All tests completed", tag="MAIN")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(run_all_tests())
|
||||||
163
tests/pipeline/test_batch_crawl.py
Normal file
163
tests/pipeline/test_batch_crawl.py
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
"""Test the Crawler class for batch crawling capabilities."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import pytest
|
||||||
|
from typing import List, Dict, Any, Optional, Tuple
|
||||||
|
|
||||||
|
from crawl4ai import Crawler
|
||||||
|
from crawl4ai import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
from crawl4ai.models import CrawlResult, CrawlResultContainer
|
||||||
|
from crawl4ai.browser import BrowserHub
|
||||||
|
from crawl4ai.cache_context import CacheMode
|
||||||
|
|
||||||
|
# Test URLs for crawling
|
||||||
|
SAFE_URLS = [
|
||||||
|
"https://example.com",
|
||||||
|
"https://httpbin.org/html",
|
||||||
|
"https://httpbin.org/headers",
|
||||||
|
"https://httpbin.org/ip",
|
||||||
|
"https://httpbin.org/user-agent",
|
||||||
|
"https://httpstat.us/200",
|
||||||
|
"https://jsonplaceholder.typicode.com/posts/1",
|
||||||
|
"https://jsonplaceholder.typicode.com/comments/1",
|
||||||
|
"https://iana.org",
|
||||||
|
"https://www.python.org"
|
||||||
|
]
|
||||||
|
|
||||||
|
# Simple test for batch crawling
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_crawl_simple():
|
||||||
|
"""Test simple batch crawling with multiple URLs."""
|
||||||
|
# Use a few test URLs
|
||||||
|
urls = SAFE_URLS[:3]
|
||||||
|
|
||||||
|
# Custom crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl multiple URLs using batch crawl
|
||||||
|
results = await Crawler.crawl(
|
||||||
|
urls,
|
||||||
|
crawler_config=crawler_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify the results
|
||||||
|
assert isinstance(results, dict)
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
|
||||||
|
for url in urls:
|
||||||
|
assert url in results
|
||||||
|
assert results[url].success
|
||||||
|
assert results[url].html is not None
|
||||||
|
|
||||||
|
# Test parallel batch crawling
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parallel_batch_crawl():
|
||||||
|
"""Test parallel batch crawling with multiple URLs."""
|
||||||
|
# Use several URLs for parallel crawling
|
||||||
|
urls = SAFE_URLS[:5]
|
||||||
|
|
||||||
|
# Basic crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl in parallel
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls,
|
||||||
|
crawler_config=crawler_config
|
||||||
|
)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
|
||||||
|
print(f"Success rate: {successful}/{len(urls)}")
|
||||||
|
|
||||||
|
# At least 80% should succeed
|
||||||
|
assert successful / len(urls) >= 0.8
|
||||||
|
|
||||||
|
# Test batch crawling with different configurations
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_crawl_mixed_configs():
|
||||||
|
"""Test batch crawling with different configurations for different URLs."""
|
||||||
|
# Create URL batches with different configurations
|
||||||
|
batch1 = (SAFE_URLS[:2], CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False))
|
||||||
|
batch2 = (SAFE_URLS[2:4], CrawlerRunConfig(wait_until="networkidle", screenshot=True))
|
||||||
|
|
||||||
|
# Crawl with mixed configurations
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl([batch1, batch2])
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Extract all URLs
|
||||||
|
all_urls = batch1[0] + batch2[0]
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(all_urls)
|
||||||
|
|
||||||
|
# Check that screenshots are present only for batch2
|
||||||
|
for url in batch1[0]:
|
||||||
|
assert results[url].screenshot is None
|
||||||
|
|
||||||
|
for url in batch2[0]:
|
||||||
|
assert results[url].screenshot is not None
|
||||||
|
|
||||||
|
print(f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s")
|
||||||
|
|
||||||
|
# Test shared browser hub
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_crawl_shared_hub():
|
||||||
|
"""Test batch crawling with a shared browser hub."""
|
||||||
|
# Create and initialize a browser hub
|
||||||
|
browser_config = BrowserConfig(
|
||||||
|
browser_type="chromium",
|
||||||
|
headless=True
|
||||||
|
)
|
||||||
|
|
||||||
|
browser_hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config,
|
||||||
|
max_browsers_per_config=3,
|
||||||
|
max_pages_per_browser=4,
|
||||||
|
initial_pool_size=1
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use the hub for parallel crawling
|
||||||
|
urls = SAFE_URLS[:3]
|
||||||
|
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls,
|
||||||
|
browser_hub=browser_hub,
|
||||||
|
crawler_config=CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s")
|
||||||
|
print(f"Success rate: {successful}/{len(urls)}")
|
||||||
|
|
||||||
|
# Get browser hub statistics
|
||||||
|
hub_stats = await browser_hub.get_pool_status()
|
||||||
|
print(f"Browser hub stats: {hub_stats}")
|
||||||
|
|
||||||
|
# At least 80% should succeed
|
||||||
|
assert successful / len(urls) >= 0.8
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up the browser hub
|
||||||
|
await browser_hub.close()
|
||||||
447
tests/pipeline/test_crawler.py
Normal file
447
tests/pipeline/test_crawler.py
Normal file
@@ -0,0 +1,447 @@
|
|||||||
|
# test_crawler.py
|
||||||
|
import asyncio
|
||||||
|
import warnings
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
# Define test fixtures
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def clean_browser_hub():
|
||||||
|
"""Fixture to ensure clean browser hub state between tests."""
|
||||||
|
# Yield control to the test
|
||||||
|
yield
|
||||||
|
|
||||||
|
# After test, cleanup all browser hubs
|
||||||
|
from crawl4ai.browser import BrowserHub
|
||||||
|
try:
|
||||||
|
await BrowserHub.shutdown_all()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during browser cleanup: {e}")
|
||||||
|
|
||||||
|
from crawl4ai import Crawler
|
||||||
|
from crawl4ai import BrowserConfig, CrawlerRunConfig
|
||||||
|
from crawl4ai.async_logger import AsyncLogger
|
||||||
|
from crawl4ai.models import CrawlResultContainer
|
||||||
|
from crawl4ai.browser import BrowserHub
|
||||||
|
from crawl4ai.cache_context import CacheMode
|
||||||
|
|
||||||
|
import warnings
|
||||||
|
from pydantic import PydanticDeprecatedSince20
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Test URLs for crawling
|
||||||
|
SAFE_URLS = [
|
||||||
|
"https://example.com",
|
||||||
|
"https://httpbin.org/html",
|
||||||
|
"https://httpbin.org/headers",
|
||||||
|
"https://httpbin.org/ip",
|
||||||
|
"https://httpbin.org/user-agent",
|
||||||
|
"https://httpstat.us/200",
|
||||||
|
"https://jsonplaceholder.typicode.com/posts/1",
|
||||||
|
"https://jsonplaceholder.typicode.com/comments/1",
|
||||||
|
"https://iana.org",
|
||||||
|
"https://www.python.org",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrawlerBasic:
|
||||||
|
"""Basic tests for the Crawler utility class"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_crawl_single_url(self, clean_browser_hub):
|
||||||
|
"""Test crawling a single URL with default configuration"""
|
||||||
|
with warnings.catch_warnings():
|
||||||
|
warnings.filterwarnings("ignore", category=Warning)
|
||||||
|
# Basic logger
|
||||||
|
logger = AsyncLogger(verbose=True)
|
||||||
|
|
||||||
|
# Basic single URL crawl with default configuration
|
||||||
|
url = "https://example.com"
|
||||||
|
result = await Crawler.crawl(url)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert isinstance(result, CrawlResultContainer)
|
||||||
|
assert result.success
|
||||||
|
assert result.url == url
|
||||||
|
assert result.html is not None
|
||||||
|
assert len(result.html) > 0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_custom_config(self, clean_browser_hub):
|
||||||
|
"""Test crawling with custom browser and crawler configuration"""
|
||||||
|
# Custom browser config
|
||||||
|
browser_config = BrowserConfig(
|
||||||
|
browser_type="chromium",
|
||||||
|
headless=True,
|
||||||
|
viewport_width=1280,
|
||||||
|
viewport_height=800,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Custom crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS, wait_until="networkidle", screenshot=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl with custom configuration
|
||||||
|
url = "https://httpbin.org/html"
|
||||||
|
result = await Crawler.crawl(
|
||||||
|
url, browser_config=browser_config, crawler_config=crawler_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert result.success
|
||||||
|
assert result.url == url
|
||||||
|
assert result.screenshot is not None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_multiple_urls_sequential(self, clean_browser_hub):
|
||||||
|
"""Test crawling multiple URLs sequentially"""
|
||||||
|
# Use a few test URLs
|
||||||
|
urls = SAFE_URLS[:3]
|
||||||
|
|
||||||
|
# Custom crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl multiple URLs sequentially
|
||||||
|
results = await Crawler.crawl(urls, crawler_config=crawler_config)
|
||||||
|
|
||||||
|
# Verify the results
|
||||||
|
assert isinstance(results, dict)
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
|
||||||
|
for url in urls:
|
||||||
|
assert url in results
|
||||||
|
assert results[url].success
|
||||||
|
assert results[url].html is not None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_error_handling(self, clean_browser_hub):
|
||||||
|
"""Test error handling during crawling"""
|
||||||
|
# Include a valid URL and a non-existent URL
|
||||||
|
urls = ["https://example.com", "https://non-existent-domain-123456789.com"]
|
||||||
|
|
||||||
|
# Crawl with retries
|
||||||
|
results = await Crawler.crawl(urls, max_retries=2, retry_delay=1.0)
|
||||||
|
|
||||||
|
# Verify results for both URLs
|
||||||
|
assert len(results) == 2
|
||||||
|
|
||||||
|
# Valid URL should succeed
|
||||||
|
assert results[urls[0]].success
|
||||||
|
|
||||||
|
# Invalid URL should fail but be in results
|
||||||
|
assert urls[1] in results
|
||||||
|
assert not results[urls[1]].success
|
||||||
|
assert results[urls[1]].error_message is not None
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrawlerParallel:
|
||||||
|
"""Tests for the parallel crawling capabilities of Crawler"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parallel_crawl_simple(self, clean_browser_hub):
|
||||||
|
"""Test basic parallel crawling with same configuration"""
|
||||||
|
# Use several URLs for parallel crawling
|
||||||
|
urls = SAFE_URLS[:5]
|
||||||
|
|
||||||
|
# Basic crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl in parallel with default concurrency
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(urls, crawler_config=crawler_config)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
print(f"Success rate: {successful}/{len(urls)}")
|
||||||
|
|
||||||
|
# At least 80% should succeed
|
||||||
|
assert successful / len(urls) >= 0.8
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parallel_crawl_with_concurrency_limit(self, clean_browser_hub):
|
||||||
|
"""Test parallel crawling with concurrency limit"""
|
||||||
|
# Use more URLs to test concurrency control
|
||||||
|
urls = SAFE_URLS[:8]
|
||||||
|
|
||||||
|
# Custom crawler config
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Limited concurrency
|
||||||
|
concurrency = 2
|
||||||
|
|
||||||
|
# Time the crawl
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls, crawler_config=crawler_config, concurrency=concurrency
|
||||||
|
)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Parallel crawl with concurrency={concurrency} of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
print(f"Success rate: {successful}/{len(urls)}")
|
||||||
|
|
||||||
|
# At least 80% should succeed
|
||||||
|
assert successful / len(urls) >= 0.8
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parallel_crawl_with_different_configs(self, clean_browser_hub):
|
||||||
|
"""Test parallel crawling with different configurations for different URLs"""
|
||||||
|
# Create URL batches with different configurations
|
||||||
|
batch1 = (
|
||||||
|
SAFE_URLS[:2],
|
||||||
|
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
|
||||||
|
)
|
||||||
|
batch2 = (
|
||||||
|
SAFE_URLS[2:4],
|
||||||
|
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
|
||||||
|
)
|
||||||
|
batch3 = (
|
||||||
|
SAFE_URLS[4:6],
|
||||||
|
CrawlerRunConfig(wait_until="load", scan_full_page=True),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Crawl with mixed configurations
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl([batch1, batch2, batch3])
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Extract all URLs
|
||||||
|
all_urls = batch1[0] + batch2[0] + batch3[0]
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(all_urls)
|
||||||
|
|
||||||
|
# Check that screenshots are present only for batch2
|
||||||
|
for url in batch1[0]:
|
||||||
|
assert not results[url].screenshot
|
||||||
|
|
||||||
|
for url in batch2[0]:
|
||||||
|
assert results[url].screenshot
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Mixed-config parallel crawl of {len(all_urls)} URLs completed in {end_time - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parallel_crawl_with_shared_browser_hub(self, clean_browser_hub):
|
||||||
|
"""Test parallel crawling with a shared browser hub"""
|
||||||
|
# Create and initialize a browser hub
|
||||||
|
browser_config = BrowserConfig(browser_type="chromium", headless=True)
|
||||||
|
|
||||||
|
browser_hub = await BrowserHub.get_browser_manager(
|
||||||
|
config=browser_config,
|
||||||
|
max_browsers_per_config=3,
|
||||||
|
max_pages_per_browser=4,
|
||||||
|
initial_pool_size=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use the hub for parallel crawling
|
||||||
|
urls = SAFE_URLS[:6]
|
||||||
|
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls,
|
||||||
|
browser_hub=browser_hub,
|
||||||
|
crawler_config=CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS, wait_until="domcontentloaded"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
# assert (len(results), len(urls))
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Shared hub parallel crawl of {len(urls)} URLs completed in {end_time - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
print(f"Success rate: {successful}/{len(urls)}")
|
||||||
|
|
||||||
|
# Get browser hub statistics
|
||||||
|
hub_stats = await browser_hub.get_pool_status()
|
||||||
|
print(f"Browser hub stats: {hub_stats}")
|
||||||
|
|
||||||
|
# At least 80% should succeed
|
||||||
|
# assert (successful / len(urls), 0.8)
|
||||||
|
assert successful / len(urls) >= 0.8
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up the browser hub
|
||||||
|
await browser_hub.close()
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrawlerAdvanced:
|
||||||
|
"""Advanced tests for the Crawler utility class"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_customized_batch_config(self, clean_browser_hub):
|
||||||
|
"""Test crawling with fully customized batch configuration"""
|
||||||
|
# Create URL batches with different browser and crawler configurations
|
||||||
|
browser_config1 = BrowserConfig(browser_type="chromium", headless=True)
|
||||||
|
browser_config2 = BrowserConfig(
|
||||||
|
browser_type="chromium", headless=False, viewport_width=1920
|
||||||
|
)
|
||||||
|
|
||||||
|
crawler_config1 = CrawlerRunConfig(wait_until="domcontentloaded")
|
||||||
|
crawler_config2 = CrawlerRunConfig(wait_until="networkidle", screenshot=True)
|
||||||
|
|
||||||
|
batch1 = (SAFE_URLS[:2], browser_config1, crawler_config1)
|
||||||
|
batch2 = (SAFE_URLS[2:4], browser_config2, crawler_config2)
|
||||||
|
|
||||||
|
# Crawl with mixed configurations
|
||||||
|
results = await Crawler.parallel_crawl([batch1, batch2])
|
||||||
|
|
||||||
|
# Extract all URLs
|
||||||
|
all_urls = batch1[0] + batch2[0]
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
# assert (len(results), len(all_urls))
|
||||||
|
assert len(results) == len(all_urls)
|
||||||
|
|
||||||
|
# Verify batch-specific processing
|
||||||
|
for url in batch1[0]:
|
||||||
|
assert results[url].screenshot is None # No screenshots for batch1
|
||||||
|
|
||||||
|
for url in batch2[0]:
|
||||||
|
assert results[url].screenshot is not None # Should have screenshots for batch2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_progress_callback(self, clean_browser_hub):
|
||||||
|
"""Test crawling with progress callback"""
|
||||||
|
# Use several URLs
|
||||||
|
urls = SAFE_URLS[:5]
|
||||||
|
|
||||||
|
# Track progress
|
||||||
|
progress_data = {"started": 0, "completed": 0, "failed": 0, "updates": []}
|
||||||
|
|
||||||
|
# Progress callback
|
||||||
|
async def on_progress(
|
||||||
|
status: str, url: str, result: Optional[CrawlResultContainer] = None
|
||||||
|
):
|
||||||
|
if status == "started":
|
||||||
|
progress_data["started"] += 1
|
||||||
|
elif status == "completed":
|
||||||
|
progress_data["completed"] += 1
|
||||||
|
if not result.success:
|
||||||
|
progress_data["failed"] += 1
|
||||||
|
|
||||||
|
progress_data["updates"].append((status, url))
|
||||||
|
print(f"Progress: {status} - {url}")
|
||||||
|
|
||||||
|
# Crawl with progress tracking
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls,
|
||||||
|
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
|
||||||
|
progress_callback=on_progress,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify progress tracking
|
||||||
|
assert progress_data["started"] == len(urls)
|
||||||
|
assert progress_data["completed"] == len(urls)
|
||||||
|
assert len(progress_data["updates"]) == len(urls) * 2 # start + complete events
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_dynamic_retry_strategy(self, clean_browser_hub):
|
||||||
|
"""Test crawling with a dynamic retry strategy"""
|
||||||
|
# Include URLs that might fail
|
||||||
|
urls = [
|
||||||
|
"https://example.com",
|
||||||
|
"https://httpstat.us/500",
|
||||||
|
"https://httpstat.us/404",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Custom retry strategy
|
||||||
|
async def retry_strategy(
|
||||||
|
url: str, attempt: int, error: Exception
|
||||||
|
) -> Tuple[bool, float]:
|
||||||
|
# Only retry 500 errors, not 404s
|
||||||
|
if "500" in url:
|
||||||
|
return True, 1.0 # Retry with 1 second delay
|
||||||
|
return False, 0.0 # Don't retry other errors
|
||||||
|
|
||||||
|
# Crawl with custom retry strategy
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
urls,
|
||||||
|
crawler_config=CrawlerRunConfig(wait_until="domcontentloaded"),
|
||||||
|
retry_strategy=retry_strategy,
|
||||||
|
max_retries=3,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert len(results) == len(urls)
|
||||||
|
|
||||||
|
# Example.com should succeed
|
||||||
|
assert results[urls[0]].success
|
||||||
|
|
||||||
|
# httpstat.us pages return content even for error status codes
|
||||||
|
# so our crawler marks them as successful since it got HTML content
|
||||||
|
# Verify that we got the expected status code
|
||||||
|
assert results[urls[1]].status_code == 500
|
||||||
|
|
||||||
|
# 404 should have the correct status code
|
||||||
|
assert results[urls[2]].status_code == 404
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_crawl_with_very_large_batch(self, clean_browser_hub):
|
||||||
|
"""Test crawling with a very large batch of URLs"""
|
||||||
|
# Create a batch by repeating our safe URLs
|
||||||
|
# Note: In a real test, we'd use more URLs, but for simplicity we'll use a smaller set
|
||||||
|
large_batch = list(dict.fromkeys(SAFE_URLS[:5] * 2)) # ~10 unique URLs
|
||||||
|
|
||||||
|
# Set a reasonable concurrency limit
|
||||||
|
concurrency = 10
|
||||||
|
|
||||||
|
# Time the crawl
|
||||||
|
start_time = asyncio.get_event_loop().time()
|
||||||
|
results = await Crawler.parallel_crawl(
|
||||||
|
large_batch,
|
||||||
|
crawler_config=CrawlerRunConfig(
|
||||||
|
wait_until="domcontentloaded",
|
||||||
|
page_timeout=10000, # Shorter timeout for large batch
|
||||||
|
),
|
||||||
|
concurrency=concurrency,
|
||||||
|
)
|
||||||
|
end_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
# assert (len(results), len(large_batch))
|
||||||
|
assert len(results) == len(large_batch)
|
||||||
|
successful = sum(1 for r in results.values() if r.success)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Large batch crawl of {len(large_batch)} URLs completed in {end_time - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
print(f"Success rate: {successful}/{len(large_batch)}")
|
||||||
|
print(
|
||||||
|
f"Average time per URL: {(end_time - start_time) / len(large_batch):.2f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# At least 80% should succeed (from our unique URLs)
|
||||||
|
assert successful / len(results) >= 0.8
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Use pytest for async tests
|
||||||
|
pytest.main(["-xvs", __file__])
|
||||||
109
tests/pipeline/test_pipeline.py
Normal file
109
tests/pipeline/test_pipeline.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
import asyncio
|
||||||
|
from crawl4ai import (
|
||||||
|
BrowserConfig,
|
||||||
|
CrawlerRunConfig,
|
||||||
|
CacheMode,
|
||||||
|
DefaultMarkdownGenerator,
|
||||||
|
PruningContentFilter
|
||||||
|
)
|
||||||
|
from pipeline import Pipeline
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create configuration objects
|
||||||
|
browser_config = BrowserConfig(headless=True, verbose=True)
|
||||||
|
crawler_config = CrawlerRunConfig(
|
||||||
|
cache_mode=CacheMode.BYPASS,
|
||||||
|
markdown_generator=DefaultMarkdownGenerator(
|
||||||
|
content_filter=PruningContentFilter(
|
||||||
|
threshold=0.48,
|
||||||
|
threshold_type="fixed",
|
||||||
|
min_word_threshold=0
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create and use pipeline with context manager
|
||||||
|
async with Pipeline(browser_config=browser_config) as pipeline:
|
||||||
|
result = await pipeline.crawl(
|
||||||
|
url="https://www.example.com",
|
||||||
|
config=crawler_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Print the result
|
||||||
|
print(f"URL: {result.url}")
|
||||||
|
print(f"Success: {result.success}")
|
||||||
|
|
||||||
|
if result.success:
|
||||||
|
print("\nMarkdown excerpt:")
|
||||||
|
print(result.markdown.raw_markdown[:500] + "...")
|
||||||
|
else:
|
||||||
|
print(f"Error: {result.error_message}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
|
|
||||||
|
class CrawlTarget:
|
||||||
|
def __init__(self, urls, config=None):
|
||||||
|
self.urls = urls
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"CrawlTarget(urls={self.urls}, config={self.config})"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# async def main():
|
||||||
|
# # Create configuration objects
|
||||||
|
# browser_config = BrowserConfig(headless=True, verbose=True)
|
||||||
|
|
||||||
|
# # Define different configurations
|
||||||
|
# config1 = CrawlerRunConfig(
|
||||||
|
# cache_mode=CacheMode.BYPASS,
|
||||||
|
# markdown_generator=DefaultMarkdownGenerator(
|
||||||
|
# content_filter=PruningContentFilter(threshold=0.48)
|
||||||
|
# ),
|
||||||
|
# )
|
||||||
|
|
||||||
|
# config2 = CrawlerRunConfig(
|
||||||
|
# cache_mode=CacheMode.ENABLED,
|
||||||
|
# screenshot=True,
|
||||||
|
# pdf=True
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # Create crawl targets
|
||||||
|
# targets = [
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls=["https://www.example.com", "https://www.wikipedia.org"],
|
||||||
|
# config=config1
|
||||||
|
# ),
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls="https://news.ycombinator.com",
|
||||||
|
# config=config2
|
||||||
|
# ),
|
||||||
|
# CrawlTarget(
|
||||||
|
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
|
||||||
|
# config=None
|
||||||
|
# )
|
||||||
|
# ]
|
||||||
|
|
||||||
|
# # Create and use pipeline with context manager
|
||||||
|
# async with Pipeline(browser_config=browser_config) as pipeline:
|
||||||
|
# all_results = await pipeline.crawl_batch(targets)
|
||||||
|
|
||||||
|
# for target_key, results in all_results.items():
|
||||||
|
# print(f"\n===== Results for {target_key} =====")
|
||||||
|
# print(f"Number of URLs crawled: {len(results)}")
|
||||||
|
|
||||||
|
# for i, result in enumerate(results):
|
||||||
|
# print(f"\nURL {i+1}: {result.url}")
|
||||||
|
# print(f"Success: {result.success}")
|
||||||
|
|
||||||
|
# if result.success:
|
||||||
|
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
|
||||||
|
# else:
|
||||||
|
# print(f"Error: {result.error_message}")
|
||||||
|
|
||||||
|
# if __name__ == "__main__":
|
||||||
|
# asyncio.run(main())
|
||||||
Reference in New Issue
Block a user