Some refactoring, movie pipelin submodule folder into the main.

This commit is contained in:
UncleCode
2025-04-06 18:28:28 +08:00
parent 591f55edc7
commit d95b2dc9f2
8 changed files with 2655 additions and 0 deletions

View File

@@ -0,0 +1,222 @@
# demo_browser_hub.py
import asyncio
from typing import List
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResultContainer
from crawl4ai.cache_context import CacheMode
from crawl4ai import DefaultMarkdownGenerator
from crawl4ai import PruningContentFilter
async def create_prewarmed_browser_hub(urls_to_crawl: List[str]):
"""Create a pre-warmed browser hub with 10 browsers and 5 pages each."""
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Setting up pre-warmed browser hub", tag="DEMO")
# Create browser configuration
browser_config = BrowserConfig(
browser_type="chromium",
headless=True, # Set to False to see the browsers in action
viewport_width=1280,
viewport_height=800,
light_mode=True, # Optimize for performance
java_script_enabled=True
)
# Create crawler configurations for pre-warming with different user agents
# This allows pages to be ready for different scenarios
crawler_configs = [
CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
wait_until="networkidle"
),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
# wait_until="networkidle"
# ),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
# wait_until="networkidle"
# )
]
# Number of browsers and pages per browser
num_browsers = 1
pages_per_browser = 1
# Distribute pages across configurations
# We'll create a total of 50 pages (10 browsers × 5 pages)
page_configs = []
total_pages = num_browsers * pages_per_browser
pages_per_config = total_pages // len(crawler_configs)
for i, config in enumerate(crawler_configs):
# For the last config, add any remaining pages
if i == len(crawler_configs) - 1:
remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1))
page_configs.append((browser_config, config, remaining))
else:
page_configs.append((browser_config, config, pages_per_config))
# Create browser hub with pre-warmed pages
start_time = asyncio.get_event_loop().time()
logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO")
hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="demo_hub",
logger=logger,
max_browsers_per_config=num_browsers,
max_pages_per_browser=pages_per_browser,
initial_pool_size=num_browsers,
page_configs=page_configs
)
end_time = asyncio.get_event_loop().time()
logger.success(
message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds",
tag="DEMO",
params={
"total_pages": total_pages,
"duration": end_time - start_time
}
)
# Get and display pool status
status = await hub.get_pool_status()
logger.info(
message="Browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
return hub
async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]:
"""Crawl a list of URLs using a pre-warmed browser hub."""
logger = AsyncLogger(verbose=True)
# Create crawler configuration
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
wait_until="networkidle",
screenshot=True
)
# Create pipeline with the browser hub
pipeline = await create_pipeline(
browser_hub=hub,
logger=logger
)
results = []
# Crawl all URLs in parallel
async def crawl_url(url):
logger.info(f"Crawling {url}...", tag="CRAWL")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="CRAWL")
return result
# Create tasks for all URLs
tasks = [crawl_url(url) for url in urls]
# Execute all tasks in parallel and collect results
results = await asyncio.gather(*tasks)
return results
async def main():
"""Main demo function."""
# List of URLs to crawl
urls_to_crawl = [
"https://example.com",
# "https://www.python.org",
# "https://httpbin.org/html",
# "https://news.ycombinator.com",
# "https://github.com",
# "https://pypi.org",
# "https://docs.python.org/3/",
# "https://opensource.org",
# "https://whatismyipaddress.com",
# "https://en.wikipedia.org/wiki/Web_scraping"
]
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Starting browser hub demo", tag="DEMO")
try:
# Create pre-warmed browser hub
hub = await create_prewarmed_browser_hub(urls_to_crawl)
# Use hub to crawl URLs
logger.info("Crawling URLs in parallel...", tag="DEMO")
start_time = asyncio.get_event_loop().time()
results = await crawl_urls_with_hub(hub, urls_to_crawl)
end_time = asyncio.get_event_loop().time()
# Display results
logger.success(
message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)",
tag="DEMO",
params={
"count": len(results),
"duration": end_time - start_time,
"avg": (end_time - start_time) / len(results)
}
)
# Print summary of results
logger.info("Crawl results summary:", tag="DEMO")
for i, result in enumerate(results):
logger.info(
message="{idx}. {url}: Success={success}, Content length={length}",
tag="RESULT",
params={
"idx": i+1,
"url": result.url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
if result.success and result.markdown and result.markdown.raw_markdown:
# Print a snippet of the markdown
markdown_snippet = result.markdown.raw_markdown[:150] + "..."
logger.info(
message=" Markdown: {snippet}",
tag="RESULT",
params={"snippet": markdown_snippet}
)
# Display final browser pool status
status = await hub.get_pool_status()
logger.info(
message="Final browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
finally:
# Clean up
logger.info("Shutting down browser hub...", tag="DEMO")
await BrowserHub.shutdown_all()
logger.success("Demo completed", tag="DEMO")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,505 @@
# extended_browser_hub_tests.py
import asyncio
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.cache_context import CacheMode
# Common test URLs
TEST_URLS = [
"https://example.com",
"https://example.com/page1",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpstat.us/200"
]
class TestResults:
"""Simple container for test results"""
def __init__(self, name: str):
self.name = name
self.results = []
self.start_time = None
self.end_time = None
self.errors = []
@property
def duration(self) -> float:
if self.start_time and self.end_time:
return self.end_time - self.start_time
return 0
@property
def success_rate(self) -> float:
if not self.results:
return 0
return sum(1 for r in self.results if r.success) / len(self.results) * 100
def log_summary(self, logger: AsyncLogger):
logger.info(f"=== Test: {self.name} ===", tag="SUMMARY")
logger.info(
message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}",
tag="SUMMARY",
params={
"duration": self.duration,
"success_rate": self.success_rate,
"count": len(self.results)
}
)
if self.errors:
logger.error(
message="Errors ({count}): {errors}",
tag="SUMMARY",
params={
"count": len(self.errors),
"errors": "; ".join(str(e) for e in self.errors)
}
)
# ======== TEST SCENARIO 1: Simple default configuration ========
async def test_default_configuration():
"""
Test Scenario 1: Simple default configuration
This tests the basic case where the user does not provide any specific
browser configuration, relying on default auto-setup.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Default Configuration")
try:
# Create pipeline with no browser config
pipeline = await create_pipeline(logger=logger)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Create basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with default configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 2: Detailed custom configuration ========
async def test_custom_configuration():
"""
Test Scenario 2: Detailed custom configuration
This tests the case where the user provides detailed browser configuration
to customize the browser behavior.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Custom Configuration")
try:
# Create custom browser config
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1920,
viewport_height=1080,
user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
light_mode=True,
ignore_https_errors=True,
extra_args=["--disable-extensions"]
)
# Create custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
page_timeout=30000,
screenshot=True,
pdf=False,
screenshot_wait_for=0.5,
wait_for_images=True,
scan_full_page=True,
scroll_delay=0.2,
process_iframes=True,
remove_overlay_elements=True
)
# Create pipeline with custom configuration
pipeline = await create_pipeline(
browser_config=browser_config,
logger=logger
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with custom configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
has_screenshot = result.screenshot is not None
logger.success(
message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"screenshot": has_screenshot,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Get browser hub status from context
try:
# Run a dummy crawl to get the context with browser hub
context = await pipeline.process({"url": "about:blank", "config": crawler_config})
browser_hub = context.get("browser_hub")
if browser_hub:
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST")
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 3: Using pre-initialized browser hub ========
async def test_preinitalized_browser_hub():
"""
Test Scenario 3: Using pre-initialized browser hub
This tests the case where a browser hub is initialized separately
and then passed to the pipeline.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Pre-initialized Browser Hub")
browser_hub = None
try:
# Create and initialize browser hub separately
logger.info("Initializing browser hub separately", tag="TEST")
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
verbose=True
)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="test_preinitalized",
logger=logger,
max_browsers_per_config=2,
max_pages_per_browser=3,
initial_pool_size=2
)
# Display initial status
status = await browser_hub.get_pool_status()
logger.info(
message="Initial browser hub status: {status}",
tag="TEST",
params={"status": status}
)
# Create pipeline with pre-initialized browser hub
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
screenshot=True
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process URLs in parallel
async def crawl_url(url):
try:
logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="TEST")
return result
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
return None
# Create tasks for all URLs
tasks = [crawl_url(url) for url in TEST_URLS]
# Execute all tasks in parallel and collect results
all_results = await asyncio.gather(*tasks)
results.results = [r for r in all_results if r is not None]
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display final status
status = await browser_hub.get_pool_status()
logger.info(
message="Final browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results, browser_hub
# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ========
async def test_parallel_pipelines():
"""
Test Scenario 4: Multiple parallel pipelines sharing browser hub
This tests the case where multiple pipelines share the same browser hub,
demonstrating resource sharing and parallel operation.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Parallel Pipelines")
# We'll reuse the browser hub from the previous test
_, browser_hub = await test_preinitalized_browser_hub()
try:
# Create 3 pipelines that all share the same browser hub
pipelines = []
for i in range(3):
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
pipelines.append(pipeline)
logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST")
# Create crawler configs with different settings
configs = [
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
CrawlerRunConfig(wait_until="load", scan_full_page=True)
]
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Function to process URLs with a specific pipeline
async def process_with_pipeline(pipeline_idx, urls):
pipeline_results = []
for url in urls:
try:
logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST")
result = await pipelines[pipeline_idx].crawl(
url=url,
config=configs[pipeline_idx]
)
pipeline_results.append(result)
logger.success(
message="Pipeline {idx} completed: url={url}, success={success}",
tag="TEST",
params={
"idx": pipeline_idx,
"url": url,
"success": result.success
}
)
except Exception as e:
logger.error(
message="Pipeline {idx} error: {error}",
tag="TEST",
params={
"idx": pipeline_idx,
"error": str(e)
}
)
results.errors.append(e)
return pipeline_results
# Distribute URLs among pipelines
pipeline_urls = [
TEST_URLS[:2],
TEST_URLS[2:4],
TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3
]
# Execute all pipelines in parallel
tasks = [
process_with_pipeline(i, urls)
for i, urls in enumerate(pipeline_urls)
]
pipeline_results = await asyncio.gather(*tasks)
# Flatten results
for res_list in pipeline_results:
results.results.extend(res_list)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display browser hub status
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status after parallel pipelines: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 5: Browser hub with connection string ========
async def test_connection_string():
"""
Test Scenario 5: Browser hub with connection string
This tests the case where a browser hub is initialized from a connection string,
simulating connecting to a running browser hub service.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Connection String")
try:
# Create pipeline with connection string
# Note: In a real implementation, this would connect to an existing service
# For this test, we're using a simulated connection
connection_string = "localhost:9222" # Simulated connection string
pipeline = await create_pipeline(
browser_hub_connection=connection_string,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle"
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Test with a single URL
url = TEST_URLS[0]
try:
logger.info(f"Crawling {url} with connection string hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== RUN ALL TESTS ========
async def run_all_tests():
"""Run all test scenarios"""
logger = AsyncLogger(verbose=True)
logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN")
try:
# Run each test scenario
await test_default_configuration()
# await test_custom_configuration()
# await test_preinitalized_browser_hub()
# await test_parallel_pipelines()
# await test_connection_string()
except Exception as e:
logger.error(f"Test suite failed: {str(e)}", tag="MAIN")
finally:
# Clean up all browser hubs
logger.info("Shutting down all browser hubs...", tag="MAIN")
await BrowserHub.shutdown_all()
logger.success("All tests completed", tag="MAIN")
if __name__ == "__main__":
asyncio.run(run_all_tests())

View File

@@ -0,0 +1,702 @@
import time
import sys
from typing import Dict, Any, List
import json
from crawl4ai.models import (
CrawlResult,
MarkdownGenerationResult,
ScrapingResult,
CrawlResultContainer,
)
from crawl4ai.async_database import async_db_manager
from crawl4ai.cache_context import CacheMode, CacheContext
from crawl4ai.utils import (
sanitize_input_encode,
InvalidCSSSelectorError,
fast_format_html,
create_box_message,
get_error_context,
)
async def initialize_context_middleware(context: Dict[str, Any]) -> int:
"""Initialize the context with basic configuration and validation"""
url = context.get("url")
config = context.get("config")
if not isinstance(url, str) or not url:
context["error_message"] = "Invalid URL, make sure the URL is a non-empty string"
return 0
# Default to ENABLED if no cache mode specified
if config.cache_mode is None:
config.cache_mode = CacheMode.ENABLED
# Create cache context
context["cache_context"] = CacheContext(url, config.cache_mode, False)
context["start_time"] = time.perf_counter()
return 1
# middlewares.py additions
async def browser_hub_middleware(context: Dict[str, Any]) -> int:
"""
Initialize or connect to a Browser-Hub and add it to the pipeline context.
This middleware handles browser hub initialization for all three scenarios:
1. Default configuration when nothing is specified
2. Custom configuration when browser_config is provided
3. Connection to existing hub when browser_hub_connection is provided
Args:
context: The pipeline context dictionary
Returns:
int: 1 for success, 0 for failure
"""
from crawl4ai.browser.browser_hub import BrowserHub
try:
# Get configuration from context
browser_config = context.get("browser_config")
browser_hub_id = context.get("browser_hub_id")
browser_hub_connection = context.get("browser_hub_connection")
logger = context.get("logger")
# If we already have a browser hub in context, use it
if context.get("browser_hub"):
return 1
# Get or create Browser-Hub
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id=browser_hub_id,
connection_info=browser_hub_connection,
logger=logger
)
# Add to context
context["browser_hub"] = browser_hub
return 1
except Exception as e:
context["error_message"] = f"Failed to initialize browser hub: {str(e)}"
return 0
async def fetch_content_middleware(context: Dict[str, Any]) -> int:
"""
Fetch content from the web using the browser hub.
This middleware uses the browser hub to get pages for crawling,
and properly releases them back to the pool when done.
Args:
context: The pipeline context dictionary
Returns:
int: 1 for success, 0 for failure
"""
url = context.get("url")
config = context.get("config")
browser_hub = context.get("browser_hub")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
try:
# Create crawler strategy without initializing its browser manager
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=browser_hub.config if browser_hub else None,
logger=logger
)
# Replace the browser manager with our shared instance
crawler_strategy.browser_manager = browser_hub
# Perform crawl without trying to initialize the browser
# The crawler will use the provided browser_manager to get pages
async_response = await crawler_strategy.crawl(url, config=config)
# Store results in context
context["html"] = async_response.html
context["screenshot_data"] = async_response.screenshot
context["pdf_data"] = async_response.pdf_data
context["js_execution_result"] = async_response.js_execution_result
context["async_response"] = async_response
return 1
except Exception as e:
context["error_message"] = f"Error fetching content: {str(e)}"
return 0
async def check_cache_middleware(context: Dict[str, Any]) -> int:
"""Check if there's a cached result and load it if available"""
url = context.get("url")
config = context.get("config")
cache_context = context.get("cache_context")
logger = context.get("logger")
# Initialize variables
context["cached_result"] = None
context["html"] = None
context["extracted_content"] = None
context["screenshot_data"] = None
context["pdf_data"] = None
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
context["cached_result"] = cached_result
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(cached_result.extracted_content or "")
extracted_content = None if not extracted_content or extracted_content == "[]" else extracted_content
# If screenshot is requested but its not in cache, then set cache_result to None
screenshot_data = cached_result.screenshot
pdf_data = cached_result.pdf
if config.screenshot and not screenshot_data:
context["cached_result"] = None
if config.pdf and not pdf_data:
context["cached_result"] = None
context["html"] = html
context["extracted_content"] = extracted_content
context["screenshot_data"] = screenshot_data
context["pdf_data"] = pdf_data
logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - context["start_time"],
tag="FETCH",
)
return 1
async def configure_proxy_middleware(context: Dict[str, Any]) -> int:
"""Configure proxy if a proxy rotation strategy is available"""
config = context.get("config")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
# Update proxy configuration from rotation strategy if available
if config and config.proxy_rotation_strategy:
next_proxy = await config.proxy_rotation_strategy.get_next_proxy()
if next_proxy:
logger.info(
message="Switch proxy: {proxy}",
tag="PROXY",
params={"proxy": next_proxy.server},
)
config.proxy_config = next_proxy
return 1
async def check_robots_txt_middleware(context: Dict[str, Any]) -> int:
"""Check if the URL is allowed by robots.txt if enabled"""
url = context.get("url")
config = context.get("config")
browser_config = context.get("browser_config")
robots_parser = context.get("robots_parser")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await robots_parser.can_fetch(url, browser_config.user_agent):
context["crawl_result"] = CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
)
return 0
return 1
async def fetch_content_middleware_(context: Dict[str, Any]) -> int:
"""Fetch content from the web using the crawler strategy"""
url = context.get("url")
config = context.get("config")
crawler_strategy = context.get("crawler_strategy")
logger = context.get("logger")
# Skip if using cached result
if context.get("cached_result") and context.get("html"):
return 1
try:
t1 = time.perf_counter()
if config.user_agent:
crawler_strategy.update_user_agent(config.user_agent)
# Call CrawlerStrategy.crawl
async_response = await crawler_strategy.crawl(url, config=config)
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
pdf_data = async_response.pdf_data
js_execution_result = async_response.js_execution_result
t2 = time.perf_counter()
logger.url_status(
url=context["cache_context"].display_url,
success=bool(html),
timing=t2 - t1,
tag="FETCH",
)
context["html"] = html
context["screenshot_data"] = screenshot_data
context["pdf_data"] = pdf_data
context["js_execution_result"] = js_execution_result
context["async_response"] = async_response
return 1
except Exception as e:
context["error_message"] = f"Error fetching content: {str(e)}"
return 0
async def scrape_content_middleware(context: Dict[str, Any]) -> int:
"""Apply scraping strategy to extract content"""
url = context.get("url")
html = context.get("html")
config = context.get("config")
extracted_content = context.get("extracted_content")
logger = context.get("logger")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
try:
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
t1 = time.perf_counter()
# Get scraping strategy and ensure it has a logger
scraping_strategy = config.scraping_strategy
if not scraping_strategy.logger:
scraping_strategy.logger = logger
# Process HTML content
params = config.__dict__.copy()
params.pop("url", None)
# Add keys from kwargs to params that don't exist in params
kwargs = context.get("kwargs", {})
params.update({k: v for k, v in kwargs.items() if k not in params.keys()})
# Scraping Strategy Execution
result: ScrapingResult = scraping_strategy.scrap(url, html, **params)
if result is None:
raise ValueError(f"Process HTML, Failed to extract content from the website: {url}")
# Extract results - handle both dict and ScrapingResult
if isinstance(result, dict):
cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
media = result.get("media", {})
links = result.get("links", {})
metadata = result.get("metadata", {})
else:
cleaned_html = sanitize_input_encode(result.cleaned_html)
media = result.media.model_dump()
links = result.links.model_dump()
metadata = result.metadata
context["cleaned_html"] = cleaned_html
context["media"] = media
context["links"] = links
context["metadata"] = metadata
# Log processing completion
logger.info(
message="{url:.50}... | Time: {timing}s",
tag="SCRAPE",
params={
"url": _url,
"timing": int((time.perf_counter() - t1) * 1000) / 1000,
},
)
return 1
except InvalidCSSSelectorError as e:
context["error_message"] = str(e)
return 0
except Exception as e:
context["error_message"] = f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}"
return 0
async def generate_markdown_middleware(context: Dict[str, Any]) -> int:
"""Generate markdown from cleaned HTML"""
url = context.get("url")
cleaned_html = context.get("cleaned_html")
config = context.get("config")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
# Generate Markdown
markdown_generator = config.markdown_generator
markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown(
cleaned_html=cleaned_html,
base_url=url,
)
context["markdown_result"] = markdown_result
return 1
async def extract_structured_content_middleware(context: Dict[str, Any]) -> int:
"""Extract structured content using extraction strategy"""
url = context.get("url")
extracted_content = context.get("extracted_content")
config = context.get("config")
markdown_result = context.get("markdown_result")
cleaned_html = context.get("cleaned_html")
logger = context.get("logger")
# Skip if already have a crawl result or extracted content
if context.get("crawl_result") or bool(extracted_content):
return 1
from crawl4ai.chunking_strategy import IdentityChunking
from crawl4ai.extraction_strategy import NoExtractionStrategy
if config.extraction_strategy and not isinstance(config.extraction_strategy, NoExtractionStrategy):
t1 = time.perf_counter()
_url = url if not context.get("is_raw_html", False) else "Raw HTML"
# Choose content based on input_format
content_format = config.extraction_strategy.input_format
if content_format == "fit_markdown" and not markdown_result.fit_markdown:
logger.warning(
message="Fit markdown requested but not available. Falling back to raw markdown.",
tag="EXTRACT",
params={"url": _url},
)
content_format = "markdown"
content = {
"markdown": markdown_result.raw_markdown,
"html": context.get("html"),
"cleaned_html": cleaned_html,
"fit_markdown": markdown_result.fit_markdown,
}.get(content_format, markdown_result.raw_markdown)
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = (
IdentityChunking()
if content_format in ["html", "cleaned_html"]
else config.chunking_strategy
)
sections = chunking.chunk(content)
extracted_content = config.extraction_strategy.run(url, sections)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)
context["extracted_content"] = extracted_content
# Log extraction completion
logger.info(
message="Completed for {url:.50}... | Time: {timing}s",
tag="EXTRACT",
params={"url": _url, "timing": time.perf_counter() - t1},
)
return 1
async def format_html_middleware(context: Dict[str, Any]) -> int:
"""Format HTML if prettify is enabled"""
config = context.get("config")
cleaned_html = context.get("cleaned_html")
# Skip if already have a crawl result
if context.get("crawl_result"):
return 1
# Apply HTML formatting if requested
if config.prettiify and cleaned_html:
context["cleaned_html"] = fast_format_html(cleaned_html)
return 1
async def write_cache_middleware(context: Dict[str, Any]) -> int:
"""Write result to cache if appropriate"""
cache_context = context.get("cache_context")
cached_result = context.get("cached_result")
# Skip if already have a crawl result or not using cache
if context.get("crawl_result") or not cache_context.should_write() or bool(cached_result):
return 1
# We'll create the CrawlResult in build_result_middleware and cache it there
# to avoid creating it twice
return 1
async def build_result_middleware(context: Dict[str, Any]) -> int:
"""Build the final CrawlResult object"""
url = context.get("url")
html = context.get("html", "")
cache_context = context.get("cache_context")
cached_result = context.get("cached_result")
config = context.get("config")
logger = context.get("logger")
# If we already have a crawl result (from an earlier middleware like robots.txt check)
if context.get("crawl_result"):
result = context["crawl_result"]
context["final_result"] = CrawlResultContainer(result)
return 1
# If we have a cached result
if cached_result and html:
logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": True,
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
},
colors={"status": "green", "timing": "yellow"},
)
cached_result.success = bool(html)
cached_result.session_id = getattr(config, "session_id", None)
cached_result.redirected_url = cached_result.redirected_url or url
context["final_result"] = CrawlResultContainer(cached_result)
return 1
# Build a new result
try:
# Get all necessary components from context
cleaned_html = context.get("cleaned_html", "")
markdown_result = context.get("markdown_result")
media = context.get("media", {})
links = context.get("links", {})
metadata = context.get("metadata", {})
screenshot_data = context.get("screenshot_data")
pdf_data = context.get("pdf_data")
extracted_content = context.get("extracted_content")
async_response = context.get("async_response")
# Create the CrawlResult
crawl_result = CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown_result,
media=media,
links=links,
metadata=metadata,
screenshot=screenshot_data,
pdf=pdf_data,
extracted_content=extracted_content,
success=bool(html),
error_message="",
)
# Add response details if available
if async_response:
crawl_result.status_code = async_response.status_code
crawl_result.redirected_url = async_response.redirected_url or url
crawl_result.response_headers = async_response.response_headers
crawl_result.downloaded_files = async_response.downloaded_files
crawl_result.js_execution_result = context.get("js_execution_result")
crawl_result.ssl_certificate = async_response.ssl_certificate
crawl_result.session_id = getattr(config, "session_id", None)
# Log completion
logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - context['start_time']:.2f}s",
},
colors={
"status": "green" if crawl_result.success else "red",
"timing": "yellow",
},
)
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
await async_db_manager.acache_url(crawl_result)
context["final_result"] = CrawlResultContainer(crawl_result)
return 1
except Exception as e:
error_context = get_error_context(sys.exc_info())
error_message = (
f"Unexpected error in build_result at line {error_context['line_no']} "
f"in {error_context['function']} ({error_context['filename']}):\n"
f"Error: {str(e)}\n\n"
f"Code context:\n{error_context['code_context']}"
)
logger.error_status(
url=url,
error=create_box_message(error_message, type="error"),
tag="ERROR",
)
context["final_result"] = CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
)
return 1
async def handle_error_middleware(context: Dict[str, Any]) -> Dict[str, Any]:
"""Error handler middleware"""
url = context.get("url", "")
error_message = context.get("error_message", "Unknown error")
logger = context.get("logger")
# Log the error
if logger:
logger.error_status(
url=url,
error=create_box_message(error_message, type="error"),
tag="ERROR",
)
# Create a failure result
context["final_result"] = CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
)
return context
# Custom middlewares as requested
async def sentiment_analysis_middleware(context: Dict[str, Any]) -> int:
"""Analyze sentiment of generated markdown using TextBlob"""
from textblob import TextBlob
markdown_result = context.get("markdown_result")
# Skip if no markdown or already failed
if not markdown_result or not context.get("success", True):
return 1
try:
# Get raw markdown text
raw_markdown = markdown_result.raw_markdown
# Analyze sentiment
blob = TextBlob(raw_markdown)
sentiment = blob.sentiment
# Add sentiment to context
context["sentiment_analysis"] = {
"polarity": sentiment.polarity, # -1.0 to 1.0 (negative to positive)
"subjectivity": sentiment.subjectivity, # 0.0 to 1.0 (objective to subjective)
"classification": "positive" if sentiment.polarity > 0.1 else
"negative" if sentiment.polarity < -0.1 else "neutral"
}
return 1
except Exception as e:
# Don't fail the pipeline on sentiment analysis failure
context["sentiment_analysis_error"] = str(e)
return 1
async def log_timing_middleware(context: Dict[str, Any], name: str) -> int:
"""Log timing information for a specific point in the pipeline"""
context[f"_timing_mark_{name}"] = time.perf_counter()
# Calculate duration if we have a start time
start_key = f"_timing_start_{name}"
if start_key in context:
duration = context[f"_timing_mark_{name}"] - context[start_key]
context[f"_timing_duration_{name}"] = duration
# Log the timing if we have a logger
logger = context.get("logger")
if logger:
logger.info(
message="{name} completed in {duration:.2f}s",
tag="TIMING",
params={"name": name, "duration": duration},
)
return 1
async def validate_url_middleware(context: Dict[str, Any], patterns: List[str]) -> int:
"""Validate URL against glob patterns"""
import fnmatch
url = context.get("url", "")
# If no patterns provided, allow all
if not patterns:
return 1
# Check if URL matches any of the allowed patterns
for pattern in patterns:
if fnmatch.fnmatch(url, pattern):
return 1
# If we get here, URL didn't match any patterns
context["error_message"] = f"URL '{url}' does not match any allowed patterns"
return 0
# Update the default middleware list function
def create_default_middleware_list():
"""Return the default list of middleware functions for the pipeline."""
return [
initialize_context_middleware,
check_cache_middleware,
browser_hub_middleware, # Add browser hub middleware before fetch_content
configure_proxy_middleware,
check_robots_txt_middleware,
fetch_content_middleware,
scrape_content_middleware,
generate_markdown_middleware,
extract_structured_content_middleware,
format_html_middleware,
build_result_middleware
]

View File

@@ -0,0 +1,281 @@
import time
from typing import Callable, Dict, List, Any, Optional, Awaitable
from middlewares import create_default_middleware_list, handle_error_middleware
from crawl4ai.models import CrawlResultContainer
from crawl4ai.async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
class Pipeline:
"""
A pipeline processor that executes a series of async middleware functions.
Each middleware function receives a context dictionary, updates it,
and returns 1 for success or 0 for failure.
"""
def __init__(
self,
middleware: List[Callable[[Dict[str, Any]], Awaitable[int]]] = None,
error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
crawler_strategy: Optional[AsyncCrawlerStrategy] = None,
browser_config: Optional[BrowserConfig] = None,
logger: Optional[AsyncLogger] = None,
_initial_context: Optional[Dict[str, Any]] = None
):
self.middleware = middleware or create_default_middleware_list()
self.error_handler = error_handler or handle_error_middleware
self.after_middleware_callback = after_middleware_callback
self.browser_config = browser_config or BrowserConfig()
self.logger = logger or AsyncLogger(verbose=self.browser_config.verbose)
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
browser_config=self.browser_config,
logger=self.logger
)
self._initial_context = _initial_context
self._strategy_initialized = False
async def _initialize_strategy__(self):
"""Initialize the crawler strategy if not already initialized"""
if not self.crawler_strategy:
self.crawler_strategy = AsyncPlaywrightCrawlerStrategy(
browser_config=self.browser_config,
logger=self.logger
)
if not self._strategy_initialized:
await self.crawler_strategy.__aenter__()
self._strategy_initialized = True
async def _initialize_strategy(self):
"""Initialize the crawler strategy if not already initialized"""
# With our new approach, we don't need to create the crawler strategy here
# as it will be created on-demand in fetch_content_middleware
# Just ensure browser hub is available if needed
if hasattr(self, "_initial_context") and "browser_hub" not in self._initial_context:
# If a browser_config was provided but no browser_hub yet,
# we'll let the browser_hub_middleware handle creating it
pass
# Mark as initialized to prevent repeated initialization attempts
self._strategy_initialized = True
async def start(self):
"""Start the crawler strategy and prepare it for use"""
if not self._strategy_initialized:
await self._initialize_strategy()
self._strategy_initialized = True
if self.crawler_strategy:
await self.crawler_strategy.__aenter__()
self._strategy_initialized = True
else:
raise ValueError("Crawler strategy is not initialized.")
async def close(self):
"""Close the crawler strategy and clean up resources"""
await self.stop()
async def stop(self):
"""Close the crawler strategy and clean up resources"""
if self._strategy_initialized and self.crawler_strategy:
await self.crawler_strategy.__aexit__(None, None, None)
self._strategy_initialized = False
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def crawl(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs) -> CrawlResultContainer:
"""
Crawl a URL and process it through the pipeline.
Args:
url: The URL to crawl
config: Optional configuration for the crawl
**kwargs: Additional arguments to pass to the middleware
Returns:
CrawlResultContainer: The result of the crawl
"""
# Initialize strategy if needed
await self._initialize_strategy()
# Create the initial context
context = {
"url": url,
"config": config or CrawlerRunConfig(),
"browser_config": self.browser_config,
"logger": self.logger,
"crawler_strategy": self.crawler_strategy,
"kwargs": kwargs
}
# Process the pipeline
result_context = await self.process(context)
# Return the final result
return result_context.get("final_result")
async def process(self, initial_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Process all middleware functions with the given context.
Args:
initial_context: Initial context dictionary, defaults to empty dict
Returns:
Updated context dictionary after all middleware have been processed
"""
context = {**self._initial_context}
if initial_context:
context.update(initial_context)
# Record pipeline start time
context["_pipeline_start_time"] = time.perf_counter()
for middleware_fn in self.middleware:
# Get middleware name for logging
middleware_name = getattr(middleware_fn, '__name__', str(middleware_fn))
# Record start time for this middleware
start_time = time.perf_counter()
context[f"_timing_start_{middleware_name}"] = start_time
try:
# Execute middleware (all middleware functions are async)
result = await middleware_fn(context)
# Record completion time
end_time = time.perf_counter()
context[f"_timing_end_{middleware_name}"] = end_time
context[f"_timing_duration_{middleware_name}"] = end_time - start_time
# Execute after-middleware callback if provided
if self.after_middleware_callback:
await self.after_middleware_callback(middleware_name, context)
# Convert boolean returns to int (True->1, False->0)
if isinstance(result, bool):
result = 1 if result else 0
# Handle failure
if result == 0:
if self.error_handler:
context["_error_in"] = middleware_name
context["_error_at"] = time.perf_counter()
return await self._handle_error(context)
else:
context["success"] = False
context["error_message"] = f"Pipeline failed at {middleware_name}"
break
except Exception as e:
# Record error information
context["_error_in"] = middleware_name
context["_error_at"] = time.perf_counter()
context["_exception"] = e
context["success"] = False
context["error_message"] = f"Exception in {middleware_name}: {str(e)}"
# Call error handler if available
if self.error_handler:
return await self._handle_error(context)
break
# Record pipeline completion time
pipeline_end_time = time.perf_counter()
context["_pipeline_end_time"] = pipeline_end_time
context["_pipeline_duration"] = pipeline_end_time - context["_pipeline_start_time"]
# Set success to True if not already set (no failures)
if "success" not in context:
context["success"] = True
return context
async def _handle_error(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle errors by calling the error handler"""
try:
return await self.error_handler(context)
except Exception as e:
# If error handler fails, update context with this new error
context["_error_handler_exception"] = e
context["error_message"] = f"Error handler failed: {str(e)}"
return context
async def create_pipeline(
middleware_list=None,
error_handler=None,
after_middleware_callback=None,
browser_config=None,
browser_hub_id=None,
browser_hub_connection=None,
browser_hub=None,
logger=None
) -> Pipeline:
"""
Factory function to create a pipeline with Browser-Hub integration.
Args:
middleware_list: List of middleware functions
error_handler: Error handler middleware
after_middleware_callback: Callback after middleware execution
browser_config: Configuration for the browser
browser_hub_id: ID for browser hub instance
browser_hub_connection: Connection string for existing browser hub
browser_hub: Existing browser hub instance to use
logger: Logger instance
Returns:
Pipeline: Configured pipeline instance
"""
# Use default middleware list if none provided
middleware = middleware_list or create_default_middleware_list()
# Create the pipeline
pipeline = Pipeline(
middleware=middleware,
error_handler=error_handler,
after_middleware_callback=after_middleware_callback,
logger=logger
)
# Set browser-related attributes in the initial context
pipeline._initial_context = {
"browser_config": browser_config,
"browser_hub_id": browser_hub_id,
"browser_hub_connection": browser_hub_connection,
"browser_hub": browser_hub,
"logger": logger
}
return pipeline
# async def create_pipeline(
# middleware_list: Optional[List[Callable[[Dict[str, Any]], Awaitable[int]]]] = None,
# error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None,
# after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None,
# crawler_strategy = None,
# browser_config = None,
# logger = None
# ) -> Pipeline:
# """Factory function to create a pipeline with the given middleware"""
# return Pipeline(
# middleware=middleware_list,
# error_handler=error_handler,
# after_middleware_callback=after_middleware_callback,
# crawler_strategy=crawler_strategy,
# browser_config=browser_config,
# logger=logger
# )

View File

@@ -0,0 +1,109 @@
import asyncio
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
CacheMode,
DefaultMarkdownGenerator,
PruningContentFilter
)
from pipeline import Pipeline
async def main():
# Create configuration objects
browser_config = BrowserConfig(headless=True, verbose=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
)
# Create and use pipeline with context manager
async with Pipeline(browser_config=browser_config) as pipeline:
result = await pipeline.crawl(
url="https://www.example.com",
config=crawler_config
)
# Print the result
print(f"URL: {result.url}")
print(f"Success: {result.success}")
if result.success:
print("\nMarkdown excerpt:")
print(result.markdown.raw_markdown[:500] + "...")
else:
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
class CrawlTarget:
def __init__(self, urls, config=None):
self.urls = urls
self.config = config
def __repr__(self):
return f"CrawlTarget(urls={self.urls}, config={self.config})"
# async def main():
# # Create configuration objects
# browser_config = BrowserConfig(headless=True, verbose=True)
# # Define different configurations
# config1 = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
# markdown_generator=DefaultMarkdownGenerator(
# content_filter=PruningContentFilter(threshold=0.48)
# ),
# )
# config2 = CrawlerRunConfig(
# cache_mode=CacheMode.ENABLED,
# screenshot=True,
# pdf=True
# )
# # Create crawl targets
# targets = [
# CrawlTarget(
# urls=["https://www.example.com", "https://www.wikipedia.org"],
# config=config1
# ),
# CrawlTarget(
# urls="https://news.ycombinator.com",
# config=config2
# ),
# CrawlTarget(
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
# config=None
# )
# ]
# # Create and use pipeline with context manager
# async with Pipeline(browser_config=browser_config) as pipeline:
# all_results = await pipeline.crawl_batch(targets)
# for target_key, results in all_results.items():
# print(f"\n===== Results for {target_key} =====")
# print(f"Number of URLs crawled: {len(results)}")
# for i, result in enumerate(results):
# print(f"\nURL {i+1}: {result.url}")
# print(f"Success: {result.success}")
# if result.success:
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
# else:
# print(f"Error: {result.error_message}")
# if __name__ == "__main__":
# asyncio.run(main())

View File

@@ -0,0 +1,222 @@
# demo_browser_hub.py
import asyncio
from typing import List
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.models import CrawlResultContainer
from crawl4ai.cache_context import CacheMode
from crawl4ai import DefaultMarkdownGenerator
from crawl4ai import PruningContentFilter
async def create_prewarmed_browser_hub(urls_to_crawl: List[str]):
"""Create a pre-warmed browser hub with 10 browsers and 5 pages each."""
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Setting up pre-warmed browser hub", tag="DEMO")
# Create browser configuration
browser_config = BrowserConfig(
browser_type="chromium",
headless=True, # Set to False to see the browsers in action
viewport_width=1280,
viewport_height=800,
light_mode=True, # Optimize for performance
java_script_enabled=True
)
# Create crawler configurations for pre-warming with different user agents
# This allows pages to be ready for different scenarios
crawler_configs = [
CrawlerRunConfig(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
wait_until="networkidle"
),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
# wait_until="networkidle"
# ),
# CrawlerRunConfig(
# user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
# wait_until="networkidle"
# )
]
# Number of browsers and pages per browser
num_browsers = 1
pages_per_browser = 1
# Distribute pages across configurations
# We'll create a total of 50 pages (10 browsers × 5 pages)
page_configs = []
total_pages = num_browsers * pages_per_browser
pages_per_config = total_pages // len(crawler_configs)
for i, config in enumerate(crawler_configs):
# For the last config, add any remaining pages
if i == len(crawler_configs) - 1:
remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1))
page_configs.append((browser_config, config, remaining))
else:
page_configs.append((browser_config, config, pages_per_config))
# Create browser hub with pre-warmed pages
start_time = asyncio.get_event_loop().time()
logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO")
hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="demo_hub",
logger=logger,
max_browsers_per_config=num_browsers,
max_pages_per_browser=pages_per_browser,
initial_pool_size=num_browsers,
page_configs=page_configs
)
end_time = asyncio.get_event_loop().time()
logger.success(
message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds",
tag="DEMO",
params={
"total_pages": total_pages,
"duration": end_time - start_time
}
)
# Get and display pool status
status = await hub.get_pool_status()
logger.info(
message="Browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
return hub
async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]:
"""Crawl a list of URLs using a pre-warmed browser hub."""
logger = AsyncLogger(verbose=True)
# Create crawler configuration
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
wait_until="networkidle",
screenshot=True
)
# Create pipeline with the browser hub
pipeline = await create_pipeline(
browser_hub=hub,
logger=logger
)
results = []
# Crawl all URLs in parallel
async def crawl_url(url):
logger.info(f"Crawling {url}...", tag="CRAWL")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="CRAWL")
return result
# Create tasks for all URLs
tasks = [crawl_url(url) for url in urls]
# Execute all tasks in parallel and collect results
results = await asyncio.gather(*tasks)
return results
async def main():
"""Main demo function."""
# List of URLs to crawl
urls_to_crawl = [
"https://example.com",
# "https://www.python.org",
# "https://httpbin.org/html",
# "https://news.ycombinator.com",
# "https://github.com",
# "https://pypi.org",
# "https://docs.python.org/3/",
# "https://opensource.org",
# "https://whatismyipaddress.com",
# "https://en.wikipedia.org/wiki/Web_scraping"
]
# Set up logging
logger = AsyncLogger(verbose=True)
logger.info("Starting browser hub demo", tag="DEMO")
try:
# Create pre-warmed browser hub
hub = await create_prewarmed_browser_hub(urls_to_crawl)
# Use hub to crawl URLs
logger.info("Crawling URLs in parallel...", tag="DEMO")
start_time = asyncio.get_event_loop().time()
results = await crawl_urls_with_hub(hub, urls_to_crawl)
end_time = asyncio.get_event_loop().time()
# Display results
logger.success(
message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)",
tag="DEMO",
params={
"count": len(results),
"duration": end_time - start_time,
"avg": (end_time - start_time) / len(results)
}
)
# Print summary of results
logger.info("Crawl results summary:", tag="DEMO")
for i, result in enumerate(results):
logger.info(
message="{idx}. {url}: Success={success}, Content length={length}",
tag="RESULT",
params={
"idx": i+1,
"url": result.url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
if result.success and result.markdown and result.markdown.raw_markdown:
# Print a snippet of the markdown
markdown_snippet = result.markdown.raw_markdown[:150] + "..."
logger.info(
message=" Markdown: {snippet}",
tag="RESULT",
params={"snippet": markdown_snippet}
)
# Display final browser pool status
status = await hub.get_pool_status()
logger.info(
message="Final browser pool status: {status}",
tag="DEMO",
params={"status": status}
)
finally:
# Clean up
logger.info("Shutting down browser hub...", tag="DEMO")
await BrowserHub.shutdown_all()
logger.success("Demo completed", tag="DEMO")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,505 @@
# extended_browser_hub_tests.py
import asyncio
from crawl4ai.browser.browser_hub import BrowserHub
from pipeline import create_pipeline
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
from crawl4ai.cache_context import CacheMode
# Common test URLs
TEST_URLS = [
"https://example.com",
"https://example.com/page1",
"https://httpbin.org/html",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpstat.us/200"
]
class TestResults:
"""Simple container for test results"""
def __init__(self, name: str):
self.name = name
self.results = []
self.start_time = None
self.end_time = None
self.errors = []
@property
def duration(self) -> float:
if self.start_time and self.end_time:
return self.end_time - self.start_time
return 0
@property
def success_rate(self) -> float:
if not self.results:
return 0
return sum(1 for r in self.results if r.success) / len(self.results) * 100
def log_summary(self, logger: AsyncLogger):
logger.info(f"=== Test: {self.name} ===", tag="SUMMARY")
logger.info(
message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}",
tag="SUMMARY",
params={
"duration": self.duration,
"success_rate": self.success_rate,
"count": len(self.results)
}
)
if self.errors:
logger.error(
message="Errors ({count}): {errors}",
tag="SUMMARY",
params={
"count": len(self.errors),
"errors": "; ".join(str(e) for e in self.errors)
}
)
# ======== TEST SCENARIO 1: Simple default configuration ========
async def test_default_configuration():
"""
Test Scenario 1: Simple default configuration
This tests the basic case where the user does not provide any specific
browser configuration, relying on default auto-setup.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Default Configuration")
try:
# Create pipeline with no browser config
pipeline = await create_pipeline(logger=logger)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Create basic crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded"
)
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with default configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 2: Detailed custom configuration ========
async def test_custom_configuration():
"""
Test Scenario 2: Detailed custom configuration
This tests the case where the user provides detailed browser configuration
to customize the browser behavior.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Custom Configuration")
try:
# Create custom browser config
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1920,
viewport_height=1080,
user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
light_mode=True,
ignore_https_errors=True,
extra_args=["--disable-extensions"]
)
# Create custom crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
page_timeout=30000,
screenshot=True,
pdf=False,
screenshot_wait_for=0.5,
wait_for_images=True,
scan_full_page=True,
scroll_delay=0.2,
process_iframes=True,
remove_overlay_elements=True
)
# Create pipeline with custom configuration
pipeline = await create_pipeline(
browser_config=browser_config,
logger=logger
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process each URL sequentially
for url in TEST_URLS:
try:
logger.info(f"Crawling {url} with custom configuration", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
has_screenshot = result.screenshot is not None
logger.success(
message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"screenshot": has_screenshot,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Get browser hub status from context
try:
# Run a dummy crawl to get the context with browser hub
context = await pipeline.process({"url": "about:blank", "config": crawler_config})
browser_hub = context.get("browser_hub")
if browser_hub:
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST")
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 3: Using pre-initialized browser hub ========
async def test_preinitalized_browser_hub():
"""
Test Scenario 3: Using pre-initialized browser hub
This tests the case where a browser hub is initialized separately
and then passed to the pipeline.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Pre-initialized Browser Hub")
browser_hub = None
try:
# Create and initialize browser hub separately
logger.info("Initializing browser hub separately", tag="TEST")
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
verbose=True
)
browser_hub = await BrowserHub.get_browser_manager(
config=browser_config,
hub_id="test_preinitalized",
logger=logger,
max_browsers_per_config=2,
max_pages_per_browser=3,
initial_pool_size=2
)
# Display initial status
status = await browser_hub.get_pool_status()
logger.info(
message="Initial browser hub status: {status}",
tag="TEST",
params={"status": status}
)
# Create pipeline with pre-initialized browser hub
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle",
screenshot=True
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Process URLs in parallel
async def crawl_url(url):
try:
logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
logger.success(f"Completed crawl of {url}", tag="TEST")
return result
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
return None
# Create tasks for all URLs
tasks = [crawl_url(url) for url in TEST_URLS]
# Execute all tasks in parallel and collect results
all_results = await asyncio.gather(*tasks)
results.results = [r for r in all_results if r is not None]
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display final status
status = await browser_hub.get_pool_status()
logger.info(
message="Final browser hub status: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results, browser_hub
# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ========
async def test_parallel_pipelines():
"""
Test Scenario 4: Multiple parallel pipelines sharing browser hub
This tests the case where multiple pipelines share the same browser hub,
demonstrating resource sharing and parallel operation.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Parallel Pipelines")
# We'll reuse the browser hub from the previous test
_, browser_hub = await test_preinitalized_browser_hub()
try:
# Create 3 pipelines that all share the same browser hub
pipelines = []
for i in range(3):
pipeline = await create_pipeline(
browser_hub=browser_hub,
logger=logger
)
pipelines.append(pipeline)
logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST")
# Create crawler configs with different settings
configs = [
CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False),
CrawlerRunConfig(wait_until="networkidle", screenshot=True),
CrawlerRunConfig(wait_until="load", scan_full_page=True)
]
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Function to process URLs with a specific pipeline
async def process_with_pipeline(pipeline_idx, urls):
pipeline_results = []
for url in urls:
try:
logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST")
result = await pipelines[pipeline_idx].crawl(
url=url,
config=configs[pipeline_idx]
)
pipeline_results.append(result)
logger.success(
message="Pipeline {idx} completed: url={url}, success={success}",
tag="TEST",
params={
"idx": pipeline_idx,
"url": url,
"success": result.success
}
)
except Exception as e:
logger.error(
message="Pipeline {idx} error: {error}",
tag="TEST",
params={
"idx": pipeline_idx,
"error": str(e)
}
)
results.errors.append(e)
return pipeline_results
# Distribute URLs among pipelines
pipeline_urls = [
TEST_URLS[:2],
TEST_URLS[2:4],
TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3
]
# Execute all pipelines in parallel
tasks = [
process_with_pipeline(i, urls)
for i, urls in enumerate(pipeline_urls)
]
pipeline_results = await asyncio.gather(*tasks)
# Flatten results
for res_list in pipeline_results:
results.results.extend(res_list)
# End timing
results.end_time = asyncio.get_event_loop().time()
# Display browser hub status
status = await browser_hub.get_pool_status()
logger.info(
message="Browser hub status after parallel pipelines: {status}",
tag="TEST",
params={"status": status}
)
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== TEST SCENARIO 5: Browser hub with connection string ========
async def test_connection_string():
"""
Test Scenario 5: Browser hub with connection string
This tests the case where a browser hub is initialized from a connection string,
simulating connecting to a running browser hub service.
"""
logger = AsyncLogger(verbose=True)
results = TestResults("Connection String")
try:
# Create pipeline with connection string
# Note: In a real implementation, this would connect to an existing service
# For this test, we're using a simulated connection
connection_string = "localhost:9222" # Simulated connection string
pipeline = await create_pipeline(
browser_hub_connection=connection_string,
logger=logger
)
# Create crawler config
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_until="networkidle"
)
# Start timing
results.start_time = asyncio.get_event_loop().time()
# Test with a single URL
url = TEST_URLS[0]
try:
logger.info(f"Crawling {url} with connection string hub", tag="TEST")
result = await pipeline.crawl(url=url, config=crawler_config)
results.results.append(result)
logger.success(
message="Result: url={url}, success={success}, content_length={length}",
tag="TEST",
params={
"url": url,
"success": result.success,
"length": len(result.html) if result.html else 0
}
)
except Exception as e:
logger.error(f"Error crawling {url}: {str(e)}", tag="TEST")
results.errors.append(e)
# End timing
results.end_time = asyncio.get_event_loop().time()
except Exception as e:
logger.error(f"Test failed with error: {str(e)}", tag="TEST")
results.errors.append(e)
# Log summary
results.log_summary(logger)
return results
# ======== RUN ALL TESTS ========
async def run_all_tests():
"""Run all test scenarios"""
logger = AsyncLogger(verbose=True)
logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN")
try:
# Run each test scenario
await test_default_configuration()
# await test_custom_configuration()
# await test_preinitalized_browser_hub()
# await test_parallel_pipelines()
# await test_connection_string()
except Exception as e:
logger.error(f"Test suite failed: {str(e)}", tag="MAIN")
finally:
# Clean up all browser hubs
logger.info("Shutting down all browser hubs...", tag="MAIN")
await BrowserHub.shutdown_all()
logger.success("All tests completed", tag="MAIN")
if __name__ == "__main__":
asyncio.run(run_all_tests())

View File

@@ -0,0 +1,109 @@
import asyncio
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
CacheMode,
DefaultMarkdownGenerator,
PruningContentFilter
)
from pipeline import Pipeline
async def main():
# Create configuration objects
browser_config = BrowserConfig(headless=True, verbose=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
)
# Create and use pipeline with context manager
async with Pipeline(browser_config=browser_config) as pipeline:
result = await pipeline.crawl(
url="https://www.example.com",
config=crawler_config
)
# Print the result
print(f"URL: {result.url}")
print(f"Success: {result.success}")
if result.success:
print("\nMarkdown excerpt:")
print(result.markdown.raw_markdown[:500] + "...")
else:
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
class CrawlTarget:
def __init__(self, urls, config=None):
self.urls = urls
self.config = config
def __repr__(self):
return f"CrawlTarget(urls={self.urls}, config={self.config})"
# async def main():
# # Create configuration objects
# browser_config = BrowserConfig(headless=True, verbose=True)
# # Define different configurations
# config1 = CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
# markdown_generator=DefaultMarkdownGenerator(
# content_filter=PruningContentFilter(threshold=0.48)
# ),
# )
# config2 = CrawlerRunConfig(
# cache_mode=CacheMode.ENABLED,
# screenshot=True,
# pdf=True
# )
# # Create crawl targets
# targets = [
# CrawlTarget(
# urls=["https://www.example.com", "https://www.wikipedia.org"],
# config=config1
# ),
# CrawlTarget(
# urls="https://news.ycombinator.com",
# config=config2
# ),
# CrawlTarget(
# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"],
# config=None
# )
# ]
# # Create and use pipeline with context manager
# async with Pipeline(browser_config=browser_config) as pipeline:
# all_results = await pipeline.crawl_batch(targets)
# for target_key, results in all_results.items():
# print(f"\n===== Results for {target_key} =====")
# print(f"Number of URLs crawled: {len(results)}")
# for i, result in enumerate(results):
# print(f"\nURL {i+1}: {result.url}")
# print(f"Success: {result.success}")
# if result.success:
# print(f"Content length: {len(result.markdown.raw_markdown)} chars")
# else:
# print(f"Error: {result.error_message}")
# if __name__ == "__main__":
# asyncio.run(main())