diff --git a/crawl4ai/pipeline/demo_browser_hub_pipeline.py b/crawl4ai/pipeline/demo_browser_hub_pipeline.py new file mode 100644 index 00000000..32298f68 --- /dev/null +++ b/crawl4ai/pipeline/demo_browser_hub_pipeline.py @@ -0,0 +1,222 @@ +# demo_browser_hub.py + +import asyncio +from typing import List + +from crawl4ai.browser.browser_hub import BrowserHub +from pipeline import create_pipeline +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.models import CrawlResultContainer +from crawl4ai.cache_context import CacheMode +from crawl4ai import DefaultMarkdownGenerator +from crawl4ai import PruningContentFilter + +async def create_prewarmed_browser_hub(urls_to_crawl: List[str]): + """Create a pre-warmed browser hub with 10 browsers and 5 pages each.""" + # Set up logging + logger = AsyncLogger(verbose=True) + logger.info("Setting up pre-warmed browser hub", tag="DEMO") + + # Create browser configuration + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, # Set to False to see the browsers in action + viewport_width=1280, + viewport_height=800, + light_mode=True, # Optimize for performance + java_script_enabled=True + ) + + # Create crawler configurations for pre-warming with different user agents + # This allows pages to be ready for different scenarios + crawler_configs = [ + CrawlerRunConfig( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + wait_until="networkidle" + ), + # CrawlerRunConfig( + # user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15", + # wait_until="networkidle" + # ), + # CrawlerRunConfig( + # user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", + # wait_until="networkidle" + # ) + ] + + # Number of browsers and pages per browser + num_browsers = 1 + pages_per_browser = 1 + + # Distribute pages across configurations + # We'll create a total of 50 pages (10 browsers × 5 pages) + page_configs = [] + total_pages = num_browsers * pages_per_browser + pages_per_config = total_pages // len(crawler_configs) + + for i, config in enumerate(crawler_configs): + # For the last config, add any remaining pages + if i == len(crawler_configs) - 1: + remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1)) + page_configs.append((browser_config, config, remaining)) + else: + page_configs.append((browser_config, config, pages_per_config)) + + # Create browser hub with pre-warmed pages + start_time = asyncio.get_event_loop().time() + logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO") + + hub = await BrowserHub.get_browser_manager( + config=browser_config, + hub_id="demo_hub", + logger=logger, + max_browsers_per_config=num_browsers, + max_pages_per_browser=pages_per_browser, + initial_pool_size=num_browsers, + page_configs=page_configs + ) + + end_time = asyncio.get_event_loop().time() + logger.success( + message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds", + tag="DEMO", + params={ + "total_pages": total_pages, + "duration": end_time - start_time + } + ) + + # Get and display pool status + status = await hub.get_pool_status() + logger.info( + message="Browser pool status: {status}", + tag="DEMO", + params={"status": status} + ) + + return hub + +async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]: + """Crawl a list of URLs using a pre-warmed browser hub.""" + logger = AsyncLogger(verbose=True) + + # Create crawler configuration + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="fixed", + min_word_threshold=0 + ) + ), + wait_until="networkidle", + screenshot=True + ) + + # Create pipeline with the browser hub + pipeline = await create_pipeline( + browser_hub=hub, + logger=logger + ) + + results = [] + + # Crawl all URLs in parallel + async def crawl_url(url): + logger.info(f"Crawling {url}...", tag="CRAWL") + result = await pipeline.crawl(url=url, config=crawler_config) + logger.success(f"Completed crawl of {url}", tag="CRAWL") + return result + + # Create tasks for all URLs + tasks = [crawl_url(url) for url in urls] + + # Execute all tasks in parallel and collect results + results = await asyncio.gather(*tasks) + + return results + +async def main(): + """Main demo function.""" + # List of URLs to crawl + urls_to_crawl = [ + "https://example.com", + # "https://www.python.org", + # "https://httpbin.org/html", + # "https://news.ycombinator.com", + # "https://github.com", + # "https://pypi.org", + # "https://docs.python.org/3/", + # "https://opensource.org", + # "https://whatismyipaddress.com", + # "https://en.wikipedia.org/wiki/Web_scraping" + ] + + # Set up logging + logger = AsyncLogger(verbose=True) + logger.info("Starting browser hub demo", tag="DEMO") + + try: + # Create pre-warmed browser hub + hub = await create_prewarmed_browser_hub(urls_to_crawl) + + # Use hub to crawl URLs + logger.info("Crawling URLs in parallel...", tag="DEMO") + start_time = asyncio.get_event_loop().time() + + results = await crawl_urls_with_hub(hub, urls_to_crawl) + + end_time = asyncio.get_event_loop().time() + + # Display results + logger.success( + message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)", + tag="DEMO", + params={ + "count": len(results), + "duration": end_time - start_time, + "avg": (end_time - start_time) / len(results) + } + ) + + # Print summary of results + logger.info("Crawl results summary:", tag="DEMO") + for i, result in enumerate(results): + logger.info( + message="{idx}. {url}: Success={success}, Content length={length}", + tag="RESULT", + params={ + "idx": i+1, + "url": result.url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + + if result.success and result.markdown and result.markdown.raw_markdown: + # Print a snippet of the markdown + markdown_snippet = result.markdown.raw_markdown[:150] + "..." + logger.info( + message=" Markdown: {snippet}", + tag="RESULT", + params={"snippet": markdown_snippet} + ) + + # Display final browser pool status + status = await hub.get_pool_status() + logger.info( + message="Final browser pool status: {status}", + tag="DEMO", + params={"status": status} + ) + + finally: + # Clean up + logger.info("Shutting down browser hub...", tag="DEMO") + await BrowserHub.shutdown_all() + logger.success("Demo completed", tag="DEMO") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/crawl4ai/pipeline/extended_browser_hub_tests.py b/crawl4ai/pipeline/extended_browser_hub_tests.py new file mode 100644 index 00000000..2f80d83b --- /dev/null +++ b/crawl4ai/pipeline/extended_browser_hub_tests.py @@ -0,0 +1,505 @@ +# extended_browser_hub_tests.py + +import asyncio + +from crawl4ai.browser.browser_hub import BrowserHub +from pipeline import create_pipeline +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.cache_context import CacheMode + +# Common test URLs +TEST_URLS = [ + "https://example.com", + "https://example.com/page1", + "https://httpbin.org/html", + "https://httpbin.org/headers", + "https://httpbin.org/ip", + "https://httpstat.us/200" +] + +class TestResults: + """Simple container for test results""" + def __init__(self, name: str): + self.name = name + self.results = [] + self.start_time = None + self.end_time = None + self.errors = [] + + @property + def duration(self) -> float: + if self.start_time and self.end_time: + return self.end_time - self.start_time + return 0 + + @property + def success_rate(self) -> float: + if not self.results: + return 0 + return sum(1 for r in self.results if r.success) / len(self.results) * 100 + + def log_summary(self, logger: AsyncLogger): + logger.info(f"=== Test: {self.name} ===", tag="SUMMARY") + logger.info( + message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}", + tag="SUMMARY", + params={ + "duration": self.duration, + "success_rate": self.success_rate, + "count": len(self.results) + } + ) + + if self.errors: + logger.error( + message="Errors ({count}): {errors}", + tag="SUMMARY", + params={ + "count": len(self.errors), + "errors": "; ".join(str(e) for e in self.errors) + } + ) + +# ======== TEST SCENARIO 1: Simple default configuration ======== +async def test_default_configuration(): + """ + Test Scenario 1: Simple default configuration + + This tests the basic case where the user does not provide any specific + browser configuration, relying on default auto-setup. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Default Configuration") + + try: + # Create pipeline with no browser config + pipeline = await create_pipeline(logger=logger) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Create basic crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="domcontentloaded" + ) + + # Process each URL sequentially + for url in TEST_URLS: + try: + logger.info(f"Crawling {url} with default configuration", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + logger.success( + message="Result: url={url}, success={success}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 2: Detailed custom configuration ======== +async def test_custom_configuration(): + """ + Test Scenario 2: Detailed custom configuration + + This tests the case where the user provides detailed browser configuration + to customize the browser behavior. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Custom Configuration") + + try: + # Create custom browser config + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, + viewport_width=1920, + viewport_height=1080, + user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36", + light_mode=True, + ignore_https_errors=True, + extra_args=["--disable-extensions"] + ) + + # Create custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle", + page_timeout=30000, + screenshot=True, + pdf=False, + screenshot_wait_for=0.5, + wait_for_images=True, + scan_full_page=True, + scroll_delay=0.2, + process_iframes=True, + remove_overlay_elements=True + ) + + # Create pipeline with custom configuration + pipeline = await create_pipeline( + browser_config=browser_config, + logger=logger + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Process each URL sequentially + for url in TEST_URLS: + try: + logger.info(f"Crawling {url} with custom configuration", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + has_screenshot = result.screenshot is not None + + logger.success( + message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "screenshot": has_screenshot, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Get browser hub status from context + try: + # Run a dummy crawl to get the context with browser hub + context = await pipeline.process({"url": "about:blank", "config": crawler_config}) + browser_hub = context.get("browser_hub") + if browser_hub: + status = await browser_hub.get_pool_status() + logger.info( + message="Browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + except Exception as e: + logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST") + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 3: Using pre-initialized browser hub ======== +async def test_preinitalized_browser_hub(): + """ + Test Scenario 3: Using pre-initialized browser hub + + This tests the case where a browser hub is initialized separately + and then passed to the pipeline. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Pre-initialized Browser Hub") + + browser_hub = None + try: + # Create and initialize browser hub separately + logger.info("Initializing browser hub separately", tag="TEST") + + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, + verbose=True + ) + + browser_hub = await BrowserHub.get_browser_manager( + config=browser_config, + hub_id="test_preinitalized", + logger=logger, + max_browsers_per_config=2, + max_pages_per_browser=3, + initial_pool_size=2 + ) + + # Display initial status + status = await browser_hub.get_pool_status() + logger.info( + message="Initial browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + + # Create pipeline with pre-initialized browser hub + pipeline = await create_pipeline( + browser_hub=browser_hub, + logger=logger + ) + + # Create crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle", + screenshot=True + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Process URLs in parallel + async def crawl_url(url): + try: + logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + logger.success(f"Completed crawl of {url}", tag="TEST") + return result + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + return None + + # Create tasks for all URLs + tasks = [crawl_url(url) for url in TEST_URLS] + + # Execute all tasks in parallel and collect results + all_results = await asyncio.gather(*tasks) + results.results = [r for r in all_results if r is not None] + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Display final status + status = await browser_hub.get_pool_status() + logger.info( + message="Final browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results, browser_hub + +# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ======== +async def test_parallel_pipelines(): + """ + Test Scenario 4: Multiple parallel pipelines sharing browser hub + + This tests the case where multiple pipelines share the same browser hub, + demonstrating resource sharing and parallel operation. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Parallel Pipelines") + + # We'll reuse the browser hub from the previous test + _, browser_hub = await test_preinitalized_browser_hub() + + try: + # Create 3 pipelines that all share the same browser hub + pipelines = [] + for i in range(3): + pipeline = await create_pipeline( + browser_hub=browser_hub, + logger=logger + ) + pipelines.append(pipeline) + + logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST") + + # Create crawler configs with different settings + configs = [ + CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False), + CrawlerRunConfig(wait_until="networkidle", screenshot=True), + CrawlerRunConfig(wait_until="load", scan_full_page=True) + ] + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Function to process URLs with a specific pipeline + async def process_with_pipeline(pipeline_idx, urls): + pipeline_results = [] + for url in urls: + try: + logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST") + result = await pipelines[pipeline_idx].crawl( + url=url, + config=configs[pipeline_idx] + ) + pipeline_results.append(result) + logger.success( + message="Pipeline {idx} completed: url={url}, success={success}", + tag="TEST", + params={ + "idx": pipeline_idx, + "url": url, + "success": result.success + } + ) + except Exception as e: + logger.error( + message="Pipeline {idx} error: {error}", + tag="TEST", + params={ + "idx": pipeline_idx, + "error": str(e) + } + ) + results.errors.append(e) + return pipeline_results + + # Distribute URLs among pipelines + pipeline_urls = [ + TEST_URLS[:2], + TEST_URLS[2:4], + TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3 + ] + + # Execute all pipelines in parallel + tasks = [ + process_with_pipeline(i, urls) + for i, urls in enumerate(pipeline_urls) + ] + + pipeline_results = await asyncio.gather(*tasks) + + # Flatten results + for res_list in pipeline_results: + results.results.extend(res_list) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Display browser hub status + status = await browser_hub.get_pool_status() + logger.info( + message="Browser hub status after parallel pipelines: {status}", + tag="TEST", + params={"status": status} + ) + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 5: Browser hub with connection string ======== +async def test_connection_string(): + """ + Test Scenario 5: Browser hub with connection string + + This tests the case where a browser hub is initialized from a connection string, + simulating connecting to a running browser hub service. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Connection String") + + try: + # Create pipeline with connection string + # Note: In a real implementation, this would connect to an existing service + # For this test, we're using a simulated connection + connection_string = "localhost:9222" # Simulated connection string + + pipeline = await create_pipeline( + browser_hub_connection=connection_string, + logger=logger + ) + + # Create crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle" + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Test with a single URL + url = TEST_URLS[0] + try: + logger.info(f"Crawling {url} with connection string hub", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + logger.success( + message="Result: url={url}, success={success}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== RUN ALL TESTS ======== +async def run_all_tests(): + """Run all test scenarios""" + logger = AsyncLogger(verbose=True) + logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN") + + try: + # Run each test scenario + await test_default_configuration() + # await test_custom_configuration() + # await test_preinitalized_browser_hub() + # await test_parallel_pipelines() + # await test_connection_string() + + except Exception as e: + logger.error(f"Test suite failed: {str(e)}", tag="MAIN") + finally: + # Clean up all browser hubs + logger.info("Shutting down all browser hubs...", tag="MAIN") + await BrowserHub.shutdown_all() + logger.success("All tests completed", tag="MAIN") + +if __name__ == "__main__": + asyncio.run(run_all_tests()) \ No newline at end of file diff --git a/crawl4ai/pipeline/middlewares.py b/crawl4ai/pipeline/middlewares.py new file mode 100644 index 00000000..40375a9f --- /dev/null +++ b/crawl4ai/pipeline/middlewares.py @@ -0,0 +1,702 @@ +import time +import sys +from typing import Dict, Any, List +import json + +from crawl4ai.models import ( + CrawlResult, + MarkdownGenerationResult, + ScrapingResult, + CrawlResultContainer, +) +from crawl4ai.async_database import async_db_manager +from crawl4ai.cache_context import CacheMode, CacheContext +from crawl4ai.utils import ( + sanitize_input_encode, + InvalidCSSSelectorError, + fast_format_html, + create_box_message, + get_error_context, +) + + +async def initialize_context_middleware(context: Dict[str, Any]) -> int: + """Initialize the context with basic configuration and validation""" + url = context.get("url") + config = context.get("config") + + if not isinstance(url, str) or not url: + context["error_message"] = "Invalid URL, make sure the URL is a non-empty string" + return 0 + + # Default to ENABLED if no cache mode specified + if config.cache_mode is None: + config.cache_mode = CacheMode.ENABLED + + # Create cache context + context["cache_context"] = CacheContext(url, config.cache_mode, False) + context["start_time"] = time.perf_counter() + + return 1 + +# middlewares.py additions + +async def browser_hub_middleware(context: Dict[str, Any]) -> int: + """ + Initialize or connect to a Browser-Hub and add it to the pipeline context. + + This middleware handles browser hub initialization for all three scenarios: + 1. Default configuration when nothing is specified + 2. Custom configuration when browser_config is provided + 3. Connection to existing hub when browser_hub_connection is provided + + Args: + context: The pipeline context dictionary + + Returns: + int: 1 for success, 0 for failure + """ + from crawl4ai.browser.browser_hub import BrowserHub + + try: + # Get configuration from context + browser_config = context.get("browser_config") + browser_hub_id = context.get("browser_hub_id") + browser_hub_connection = context.get("browser_hub_connection") + logger = context.get("logger") + + # If we already have a browser hub in context, use it + if context.get("browser_hub"): + return 1 + + # Get or create Browser-Hub + browser_hub = await BrowserHub.get_browser_manager( + config=browser_config, + hub_id=browser_hub_id, + connection_info=browser_hub_connection, + logger=logger + ) + + # Add to context + context["browser_hub"] = browser_hub + return 1 + except Exception as e: + context["error_message"] = f"Failed to initialize browser hub: {str(e)}" + return 0 + + +async def fetch_content_middleware(context: Dict[str, Any]) -> int: + """ + Fetch content from the web using the browser hub. + + This middleware uses the browser hub to get pages for crawling, + and properly releases them back to the pool when done. + + Args: + context: The pipeline context dictionary + + Returns: + int: 1 for success, 0 for failure + """ + url = context.get("url") + config = context.get("config") + browser_hub = context.get("browser_hub") + logger = context.get("logger") + + # Skip if using cached result + if context.get("cached_result") and context.get("html"): + return 1 + + try: + # Create crawler strategy without initializing its browser manager + from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy + + crawler_strategy = AsyncPlaywrightCrawlerStrategy( + browser_config=browser_hub.config if browser_hub else None, + logger=logger + ) + + # Replace the browser manager with our shared instance + crawler_strategy.browser_manager = browser_hub + + # Perform crawl without trying to initialize the browser + # The crawler will use the provided browser_manager to get pages + async_response = await crawler_strategy.crawl(url, config=config) + + # Store results in context + context["html"] = async_response.html + context["screenshot_data"] = async_response.screenshot + context["pdf_data"] = async_response.pdf_data + context["js_execution_result"] = async_response.js_execution_result + context["async_response"] = async_response + + return 1 + except Exception as e: + context["error_message"] = f"Error fetching content: {str(e)}" + return 0 + + +async def check_cache_middleware(context: Dict[str, Any]) -> int: + """Check if there's a cached result and load it if available""" + url = context.get("url") + config = context.get("config") + cache_context = context.get("cache_context") + logger = context.get("logger") + + # Initialize variables + context["cached_result"] = None + context["html"] = None + context["extracted_content"] = None + context["screenshot_data"] = None + context["pdf_data"] = None + + # Try to get cached result if appropriate + if cache_context.should_read(): + cached_result = await async_db_manager.aget_cached_url(url) + context["cached_result"] = cached_result + + if cached_result: + html = sanitize_input_encode(cached_result.html) + extracted_content = sanitize_input_encode(cached_result.extracted_content or "") + extracted_content = None if not extracted_content or extracted_content == "[]" else extracted_content + + # If screenshot is requested but its not in cache, then set cache_result to None + screenshot_data = cached_result.screenshot + pdf_data = cached_result.pdf + + if config.screenshot and not screenshot_data: + context["cached_result"] = None + + if config.pdf and not pdf_data: + context["cached_result"] = None + + context["html"] = html + context["extracted_content"] = extracted_content + context["screenshot_data"] = screenshot_data + context["pdf_data"] = pdf_data + + logger.url_status( + url=cache_context.display_url, + success=bool(html), + timing=time.perf_counter() - context["start_time"], + tag="FETCH", + ) + + return 1 + + +async def configure_proxy_middleware(context: Dict[str, Any]) -> int: + """Configure proxy if a proxy rotation strategy is available""" + config = context.get("config") + logger = context.get("logger") + + # Skip if using cached result + if context.get("cached_result") and context.get("html"): + return 1 + + # Update proxy configuration from rotation strategy if available + if config and config.proxy_rotation_strategy: + next_proxy = await config.proxy_rotation_strategy.get_next_proxy() + if next_proxy: + logger.info( + message="Switch proxy: {proxy}", + tag="PROXY", + params={"proxy": next_proxy.server}, + ) + config.proxy_config = next_proxy + + return 1 + + +async def check_robots_txt_middleware(context: Dict[str, Any]) -> int: + """Check if the URL is allowed by robots.txt if enabled""" + url = context.get("url") + config = context.get("config") + browser_config = context.get("browser_config") + robots_parser = context.get("robots_parser") + + # Skip if using cached result + if context.get("cached_result") and context.get("html"): + return 1 + + # Check robots.txt if enabled + if config and config.check_robots_txt: + if not await robots_parser.can_fetch(url, browser_config.user_agent): + context["crawl_result"] = CrawlResult( + url=url, + html="", + success=False, + status_code=403, + error_message="Access denied by robots.txt", + response_headers={"X-Robots-Status": "Blocked by robots.txt"} + ) + return 0 + + return 1 + + +async def fetch_content_middleware_(context: Dict[str, Any]) -> int: + """Fetch content from the web using the crawler strategy""" + url = context.get("url") + config = context.get("config") + crawler_strategy = context.get("crawler_strategy") + logger = context.get("logger") + + # Skip if using cached result + if context.get("cached_result") and context.get("html"): + return 1 + + try: + t1 = time.perf_counter() + + if config.user_agent: + crawler_strategy.update_user_agent(config.user_agent) + + # Call CrawlerStrategy.crawl + async_response = await crawler_strategy.crawl(url, config=config) + + html = sanitize_input_encode(async_response.html) + screenshot_data = async_response.screenshot + pdf_data = async_response.pdf_data + js_execution_result = async_response.js_execution_result + + t2 = time.perf_counter() + logger.url_status( + url=context["cache_context"].display_url, + success=bool(html), + timing=t2 - t1, + tag="FETCH", + ) + + context["html"] = html + context["screenshot_data"] = screenshot_data + context["pdf_data"] = pdf_data + context["js_execution_result"] = js_execution_result + context["async_response"] = async_response + + return 1 + except Exception as e: + context["error_message"] = f"Error fetching content: {str(e)}" + return 0 + + +async def scrape_content_middleware(context: Dict[str, Any]) -> int: + """Apply scraping strategy to extract content""" + url = context.get("url") + html = context.get("html") + config = context.get("config") + extracted_content = context.get("extracted_content") + logger = context.get("logger") + + # Skip if already have a crawl result + if context.get("crawl_result"): + return 1 + + try: + _url = url if not context.get("is_raw_html", False) else "Raw HTML" + t1 = time.perf_counter() + + # Get scraping strategy and ensure it has a logger + scraping_strategy = config.scraping_strategy + if not scraping_strategy.logger: + scraping_strategy.logger = logger + + # Process HTML content + params = config.__dict__.copy() + params.pop("url", None) + # Add keys from kwargs to params that don't exist in params + kwargs = context.get("kwargs", {}) + params.update({k: v for k, v in kwargs.items() if k not in params.keys()}) + + # Scraping Strategy Execution + result: ScrapingResult = scraping_strategy.scrap(url, html, **params) + + if result is None: + raise ValueError(f"Process HTML, Failed to extract content from the website: {url}") + + # Extract results - handle both dict and ScrapingResult + if isinstance(result, dict): + cleaned_html = sanitize_input_encode(result.get("cleaned_html", "")) + media = result.get("media", {}) + links = result.get("links", {}) + metadata = result.get("metadata", {}) + else: + cleaned_html = sanitize_input_encode(result.cleaned_html) + media = result.media.model_dump() + links = result.links.model_dump() + metadata = result.metadata + + context["cleaned_html"] = cleaned_html + context["media"] = media + context["links"] = links + context["metadata"] = metadata + + # Log processing completion + logger.info( + message="{url:.50}... | Time: {timing}s", + tag="SCRAPE", + params={ + "url": _url, + "timing": int((time.perf_counter() - t1) * 1000) / 1000, + }, + ) + + return 1 + except InvalidCSSSelectorError as e: + context["error_message"] = str(e) + return 0 + except Exception as e: + context["error_message"] = f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}" + return 0 + + +async def generate_markdown_middleware(context: Dict[str, Any]) -> int: + """Generate markdown from cleaned HTML""" + url = context.get("url") + cleaned_html = context.get("cleaned_html") + config = context.get("config") + + # Skip if already have a crawl result + if context.get("crawl_result"): + return 1 + + # Generate Markdown + markdown_generator = config.markdown_generator + + markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown( + cleaned_html=cleaned_html, + base_url=url, + ) + + context["markdown_result"] = markdown_result + + return 1 + + +async def extract_structured_content_middleware(context: Dict[str, Any]) -> int: + """Extract structured content using extraction strategy""" + url = context.get("url") + extracted_content = context.get("extracted_content") + config = context.get("config") + markdown_result = context.get("markdown_result") + cleaned_html = context.get("cleaned_html") + logger = context.get("logger") + + # Skip if already have a crawl result or extracted content + if context.get("crawl_result") or bool(extracted_content): + return 1 + + from crawl4ai.chunking_strategy import IdentityChunking + from crawl4ai.extraction_strategy import NoExtractionStrategy + + if config.extraction_strategy and not isinstance(config.extraction_strategy, NoExtractionStrategy): + t1 = time.perf_counter() + _url = url if not context.get("is_raw_html", False) else "Raw HTML" + + # Choose content based on input_format + content_format = config.extraction_strategy.input_format + if content_format == "fit_markdown" and not markdown_result.fit_markdown: + logger.warning( + message="Fit markdown requested but not available. Falling back to raw markdown.", + tag="EXTRACT", + params={"url": _url}, + ) + content_format = "markdown" + + content = { + "markdown": markdown_result.raw_markdown, + "html": context.get("html"), + "cleaned_html": cleaned_html, + "fit_markdown": markdown_result.fit_markdown, + }.get(content_format, markdown_result.raw_markdown) + + # Use IdentityChunking for HTML input, otherwise use provided chunking strategy + chunking = ( + IdentityChunking() + if content_format in ["html", "cleaned_html"] + else config.chunking_strategy + ) + sections = chunking.chunk(content) + extracted_content = config.extraction_strategy.run(url, sections) + extracted_content = json.dumps( + extracted_content, indent=4, default=str, ensure_ascii=False + ) + + context["extracted_content"] = extracted_content + + # Log extraction completion + logger.info( + message="Completed for {url:.50}... | Time: {timing}s", + tag="EXTRACT", + params={"url": _url, "timing": time.perf_counter() - t1}, + ) + + return 1 + + +async def format_html_middleware(context: Dict[str, Any]) -> int: + """Format HTML if prettify is enabled""" + config = context.get("config") + cleaned_html = context.get("cleaned_html") + + # Skip if already have a crawl result + if context.get("crawl_result"): + return 1 + + # Apply HTML formatting if requested + if config.prettiify and cleaned_html: + context["cleaned_html"] = fast_format_html(cleaned_html) + + return 1 + + +async def write_cache_middleware(context: Dict[str, Any]) -> int: + """Write result to cache if appropriate""" + cache_context = context.get("cache_context") + cached_result = context.get("cached_result") + + # Skip if already have a crawl result or not using cache + if context.get("crawl_result") or not cache_context.should_write() or bool(cached_result): + return 1 + + # We'll create the CrawlResult in build_result_middleware and cache it there + # to avoid creating it twice + + return 1 + + +async def build_result_middleware(context: Dict[str, Any]) -> int: + """Build the final CrawlResult object""" + url = context.get("url") + html = context.get("html", "") + cache_context = context.get("cache_context") + cached_result = context.get("cached_result") + config = context.get("config") + logger = context.get("logger") + + # If we already have a crawl result (from an earlier middleware like robots.txt check) + if context.get("crawl_result"): + result = context["crawl_result"] + context["final_result"] = CrawlResultContainer(result) + return 1 + + # If we have a cached result + if cached_result and html: + logger.success( + message="{url:.50}... | Status: {status} | Total: {timing}", + tag="COMPLETE", + params={ + "url": cache_context.display_url, + "status": True, + "timing": f"{time.perf_counter() - context['start_time']:.2f}s", + }, + colors={"status": "green", "timing": "yellow"}, + ) + + cached_result.success = bool(html) + cached_result.session_id = getattr(config, "session_id", None) + cached_result.redirected_url = cached_result.redirected_url or url + context["final_result"] = CrawlResultContainer(cached_result) + return 1 + + # Build a new result + try: + # Get all necessary components from context + cleaned_html = context.get("cleaned_html", "") + markdown_result = context.get("markdown_result") + media = context.get("media", {}) + links = context.get("links", {}) + metadata = context.get("metadata", {}) + screenshot_data = context.get("screenshot_data") + pdf_data = context.get("pdf_data") + extracted_content = context.get("extracted_content") + async_response = context.get("async_response") + + # Create the CrawlResult + crawl_result = CrawlResult( + url=url, + html=html, + cleaned_html=cleaned_html, + markdown=markdown_result, + media=media, + links=links, + metadata=metadata, + screenshot=screenshot_data, + pdf=pdf_data, + extracted_content=extracted_content, + success=bool(html), + error_message="", + ) + + # Add response details if available + if async_response: + crawl_result.status_code = async_response.status_code + crawl_result.redirected_url = async_response.redirected_url or url + crawl_result.response_headers = async_response.response_headers + crawl_result.downloaded_files = async_response.downloaded_files + crawl_result.js_execution_result = context.get("js_execution_result") + crawl_result.ssl_certificate = async_response.ssl_certificate + + crawl_result.session_id = getattr(config, "session_id", None) + + # Log completion + logger.success( + message="{url:.50}... | Status: {status} | Total: {timing}", + tag="COMPLETE", + params={ + "url": cache_context.display_url, + "status": crawl_result.success, + "timing": f"{time.perf_counter() - context['start_time']:.2f}s", + }, + colors={ + "status": "green" if crawl_result.success else "red", + "timing": "yellow", + }, + ) + + # Update cache if appropriate + if cache_context.should_write() and not bool(cached_result): + await async_db_manager.acache_url(crawl_result) + + context["final_result"] = CrawlResultContainer(crawl_result) + return 1 + except Exception as e: + error_context = get_error_context(sys.exc_info()) + + error_message = ( + f"Unexpected error in build_result at line {error_context['line_no']} " + f"in {error_context['function']} ({error_context['filename']}):\n" + f"Error: {str(e)}\n\n" + f"Code context:\n{error_context['code_context']}" + ) + + logger.error_status( + url=url, + error=create_box_message(error_message, type="error"), + tag="ERROR", + ) + + context["final_result"] = CrawlResultContainer( + CrawlResult( + url=url, html="", success=False, error_message=error_message + ) + ) + return 1 + + +async def handle_error_middleware(context: Dict[str, Any]) -> Dict[str, Any]: + """Error handler middleware""" + url = context.get("url", "") + error_message = context.get("error_message", "Unknown error") + logger = context.get("logger") + + # Log the error + if logger: + logger.error_status( + url=url, + error=create_box_message(error_message, type="error"), + tag="ERROR", + ) + + # Create a failure result + context["final_result"] = CrawlResultContainer( + CrawlResult( + url=url, html="", success=False, error_message=error_message + ) + ) + + return context + + +# Custom middlewares as requested + +async def sentiment_analysis_middleware(context: Dict[str, Any]) -> int: + """Analyze sentiment of generated markdown using TextBlob""" + from textblob import TextBlob + + markdown_result = context.get("markdown_result") + + # Skip if no markdown or already failed + if not markdown_result or not context.get("success", True): + return 1 + + try: + # Get raw markdown text + raw_markdown = markdown_result.raw_markdown + + # Analyze sentiment + blob = TextBlob(raw_markdown) + sentiment = blob.sentiment + + # Add sentiment to context + context["sentiment_analysis"] = { + "polarity": sentiment.polarity, # -1.0 to 1.0 (negative to positive) + "subjectivity": sentiment.subjectivity, # 0.0 to 1.0 (objective to subjective) + "classification": "positive" if sentiment.polarity > 0.1 else + "negative" if sentiment.polarity < -0.1 else "neutral" + } + + return 1 + except Exception as e: + # Don't fail the pipeline on sentiment analysis failure + context["sentiment_analysis_error"] = str(e) + return 1 + + +async def log_timing_middleware(context: Dict[str, Any], name: str) -> int: + """Log timing information for a specific point in the pipeline""" + context[f"_timing_mark_{name}"] = time.perf_counter() + + # Calculate duration if we have a start time + start_key = f"_timing_start_{name}" + if start_key in context: + duration = context[f"_timing_mark_{name}"] - context[start_key] + context[f"_timing_duration_{name}"] = duration + + # Log the timing if we have a logger + logger = context.get("logger") + if logger: + logger.info( + message="{name} completed in {duration:.2f}s", + tag="TIMING", + params={"name": name, "duration": duration}, + ) + + return 1 + + +async def validate_url_middleware(context: Dict[str, Any], patterns: List[str]) -> int: + """Validate URL against glob patterns""" + import fnmatch + url = context.get("url", "") + + # If no patterns provided, allow all + if not patterns: + return 1 + + # Check if URL matches any of the allowed patterns + for pattern in patterns: + if fnmatch.fnmatch(url, pattern): + return 1 + + # If we get here, URL didn't match any patterns + context["error_message"] = f"URL '{url}' does not match any allowed patterns" + return 0 + + +# Update the default middleware list function +def create_default_middleware_list(): + """Return the default list of middleware functions for the pipeline.""" + return [ + initialize_context_middleware, + check_cache_middleware, + browser_hub_middleware, # Add browser hub middleware before fetch_content + configure_proxy_middleware, + check_robots_txt_middleware, + fetch_content_middleware, + scrape_content_middleware, + generate_markdown_middleware, + extract_structured_content_middleware, + format_html_middleware, + build_result_middleware + ] diff --git a/crawl4ai/pipeline/pipeline.py b/crawl4ai/pipeline/pipeline.py new file mode 100644 index 00000000..d4fc1aeb --- /dev/null +++ b/crawl4ai/pipeline/pipeline.py @@ -0,0 +1,281 @@ + +import time +from typing import Callable, Dict, List, Any, Optional, Awaitable + +from middlewares import create_default_middleware_list, handle_error_middleware +from crawl4ai.models import CrawlResultContainer +from crawl4ai.async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger + + +class Pipeline: + """ + A pipeline processor that executes a series of async middleware functions. + Each middleware function receives a context dictionary, updates it, + and returns 1 for success or 0 for failure. + """ + + def __init__( + self, + middleware: List[Callable[[Dict[str, Any]], Awaitable[int]]] = None, + error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None, + after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None, + crawler_strategy: Optional[AsyncCrawlerStrategy] = None, + browser_config: Optional[BrowserConfig] = None, + logger: Optional[AsyncLogger] = None, + _initial_context: Optional[Dict[str, Any]] = None + ): + self.middleware = middleware or create_default_middleware_list() + self.error_handler = error_handler or handle_error_middleware + self.after_middleware_callback = after_middleware_callback + self.browser_config = browser_config or BrowserConfig() + self.logger = logger or AsyncLogger(verbose=self.browser_config.verbose) + self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy( + browser_config=self.browser_config, + logger=self.logger + ) + self._initial_context = _initial_context + self._strategy_initialized = False + + async def _initialize_strategy__(self): + """Initialize the crawler strategy if not already initialized""" + if not self.crawler_strategy: + self.crawler_strategy = AsyncPlaywrightCrawlerStrategy( + browser_config=self.browser_config, + logger=self.logger + ) + + if not self._strategy_initialized: + await self.crawler_strategy.__aenter__() + self._strategy_initialized = True + + async def _initialize_strategy(self): + """Initialize the crawler strategy if not already initialized""" + # With our new approach, we don't need to create the crawler strategy here + # as it will be created on-demand in fetch_content_middleware + + # Just ensure browser hub is available if needed + if hasattr(self, "_initial_context") and "browser_hub" not in self._initial_context: + # If a browser_config was provided but no browser_hub yet, + # we'll let the browser_hub_middleware handle creating it + pass + + # Mark as initialized to prevent repeated initialization attempts + self._strategy_initialized = True + + async def start(self): + """Start the crawler strategy and prepare it for use""" + if not self._strategy_initialized: + await self._initialize_strategy() + self._strategy_initialized = True + if self.crawler_strategy: + await self.crawler_strategy.__aenter__() + self._strategy_initialized = True + else: + raise ValueError("Crawler strategy is not initialized.") + + async def close(self): + """Close the crawler strategy and clean up resources""" + await self.stop() + + async def stop(self): + """Close the crawler strategy and clean up resources""" + if self._strategy_initialized and self.crawler_strategy: + await self.crawler_strategy.__aexit__(None, None, None) + self._strategy_initialized = False + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def crawl(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs) -> CrawlResultContainer: + """ + Crawl a URL and process it through the pipeline. + + Args: + url: The URL to crawl + config: Optional configuration for the crawl + **kwargs: Additional arguments to pass to the middleware + + Returns: + CrawlResultContainer: The result of the crawl + """ + # Initialize strategy if needed + await self._initialize_strategy() + + # Create the initial context + context = { + "url": url, + "config": config or CrawlerRunConfig(), + "browser_config": self.browser_config, + "logger": self.logger, + "crawler_strategy": self.crawler_strategy, + "kwargs": kwargs + } + + # Process the pipeline + result_context = await self.process(context) + + # Return the final result + return result_context.get("final_result") + + async def process(self, initial_context: Dict[str, Any] = None) -> Dict[str, Any]: + """ + Process all middleware functions with the given context. + + Args: + initial_context: Initial context dictionary, defaults to empty dict + + Returns: + Updated context dictionary after all middleware have been processed + """ + context = {**self._initial_context} + if initial_context: + context.update(initial_context) + + # Record pipeline start time + context["_pipeline_start_time"] = time.perf_counter() + + for middleware_fn in self.middleware: + # Get middleware name for logging + middleware_name = getattr(middleware_fn, '__name__', str(middleware_fn)) + + # Record start time for this middleware + start_time = time.perf_counter() + context[f"_timing_start_{middleware_name}"] = start_time + + try: + # Execute middleware (all middleware functions are async) + result = await middleware_fn(context) + + # Record completion time + end_time = time.perf_counter() + context[f"_timing_end_{middleware_name}"] = end_time + context[f"_timing_duration_{middleware_name}"] = end_time - start_time + + # Execute after-middleware callback if provided + if self.after_middleware_callback: + await self.after_middleware_callback(middleware_name, context) + + # Convert boolean returns to int (True->1, False->0) + if isinstance(result, bool): + result = 1 if result else 0 + + # Handle failure + if result == 0: + if self.error_handler: + context["_error_in"] = middleware_name + context["_error_at"] = time.perf_counter() + return await self._handle_error(context) + else: + context["success"] = False + context["error_message"] = f"Pipeline failed at {middleware_name}" + break + except Exception as e: + # Record error information + context["_error_in"] = middleware_name + context["_error_at"] = time.perf_counter() + context["_exception"] = e + context["success"] = False + context["error_message"] = f"Exception in {middleware_name}: {str(e)}" + + # Call error handler if available + if self.error_handler: + return await self._handle_error(context) + break + + # Record pipeline completion time + pipeline_end_time = time.perf_counter() + context["_pipeline_end_time"] = pipeline_end_time + context["_pipeline_duration"] = pipeline_end_time - context["_pipeline_start_time"] + + # Set success to True if not already set (no failures) + if "success" not in context: + context["success"] = True + + return context + + async def _handle_error(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Handle errors by calling the error handler""" + try: + return await self.error_handler(context) + except Exception as e: + # If error handler fails, update context with this new error + context["_error_handler_exception"] = e + context["error_message"] = f"Error handler failed: {str(e)}" + return context + + + +async def create_pipeline( + middleware_list=None, + error_handler=None, + after_middleware_callback=None, + browser_config=None, + browser_hub_id=None, + browser_hub_connection=None, + browser_hub=None, + logger=None +) -> Pipeline: + """ + Factory function to create a pipeline with Browser-Hub integration. + + Args: + middleware_list: List of middleware functions + error_handler: Error handler middleware + after_middleware_callback: Callback after middleware execution + browser_config: Configuration for the browser + browser_hub_id: ID for browser hub instance + browser_hub_connection: Connection string for existing browser hub + browser_hub: Existing browser hub instance to use + logger: Logger instance + + Returns: + Pipeline: Configured pipeline instance + """ + # Use default middleware list if none provided + middleware = middleware_list or create_default_middleware_list() + + # Create the pipeline + pipeline = Pipeline( + middleware=middleware, + error_handler=error_handler, + after_middleware_callback=after_middleware_callback, + logger=logger + ) + + # Set browser-related attributes in the initial context + pipeline._initial_context = { + "browser_config": browser_config, + "browser_hub_id": browser_hub_id, + "browser_hub_connection": browser_hub_connection, + "browser_hub": browser_hub, + "logger": logger + } + + return pipeline + + + + +# async def create_pipeline( +# middleware_list: Optional[List[Callable[[Dict[str, Any]], Awaitable[int]]]] = None, +# error_handler: Optional[Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]] = None, +# after_middleware_callback: Optional[Callable[[str, Dict[str, Any]], Awaitable[None]]] = None, +# crawler_strategy = None, +# browser_config = None, +# logger = None +# ) -> Pipeline: +# """Factory function to create a pipeline with the given middleware""" +# return Pipeline( +# middleware=middleware_list, +# error_handler=error_handler, +# after_middleware_callback=after_middleware_callback, +# crawler_strategy=crawler_strategy, +# browser_config=browser_config, +# logger=logger +# ) \ No newline at end of file diff --git a/crawl4ai/pipeline/test_pipeline.py b/crawl4ai/pipeline/test_pipeline.py new file mode 100644 index 00000000..bdeca8a7 --- /dev/null +++ b/crawl4ai/pipeline/test_pipeline.py @@ -0,0 +1,109 @@ +import asyncio +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig, + CacheMode, + DefaultMarkdownGenerator, + PruningContentFilter +) +from pipeline import Pipeline + +async def main(): + # Create configuration objects + browser_config = BrowserConfig(headless=True, verbose=True) + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="fixed", + min_word_threshold=0 + ) + ), + ) + + # Create and use pipeline with context manager + async with Pipeline(browser_config=browser_config) as pipeline: + result = await pipeline.crawl( + url="https://www.example.com", + config=crawler_config + ) + + # Print the result + print(f"URL: {result.url}") + print(f"Success: {result.success}") + + if result.success: + print("\nMarkdown excerpt:") + print(result.markdown.raw_markdown[:500] + "...") + else: + print(f"Error: {result.error_message}") + +if __name__ == "__main__": + asyncio.run(main()) + + +class CrawlTarget: + def __init__(self, urls, config=None): + self.urls = urls + self.config = config + + def __repr__(self): + return f"CrawlTarget(urls={self.urls}, config={self.config})" + + + + +# async def main(): +# # Create configuration objects +# browser_config = BrowserConfig(headless=True, verbose=True) + +# # Define different configurations +# config1 = CrawlerRunConfig( +# cache_mode=CacheMode.BYPASS, +# markdown_generator=DefaultMarkdownGenerator( +# content_filter=PruningContentFilter(threshold=0.48) +# ), +# ) + +# config2 = CrawlerRunConfig( +# cache_mode=CacheMode.ENABLED, +# screenshot=True, +# pdf=True +# ) + +# # Create crawl targets +# targets = [ +# CrawlTarget( +# urls=["https://www.example.com", "https://www.wikipedia.org"], +# config=config1 +# ), +# CrawlTarget( +# urls="https://news.ycombinator.com", +# config=config2 +# ), +# CrawlTarget( +# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"], +# config=None +# ) +# ] + +# # Create and use pipeline with context manager +# async with Pipeline(browser_config=browser_config) as pipeline: +# all_results = await pipeline.crawl_batch(targets) + +# for target_key, results in all_results.items(): +# print(f"\n===== Results for {target_key} =====") +# print(f"Number of URLs crawled: {len(results)}") + +# for i, result in enumerate(results): +# print(f"\nURL {i+1}: {result.url}") +# print(f"Success: {result.success}") + +# if result.success: +# print(f"Content length: {len(result.markdown.raw_markdown)} chars") +# else: +# print(f"Error: {result.error_message}") + +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file diff --git a/tests/pipeline/demo_browser_hub_pipeline.py b/tests/pipeline/demo_browser_hub_pipeline.py new file mode 100644 index 00000000..32298f68 --- /dev/null +++ b/tests/pipeline/demo_browser_hub_pipeline.py @@ -0,0 +1,222 @@ +# demo_browser_hub.py + +import asyncio +from typing import List + +from crawl4ai.browser.browser_hub import BrowserHub +from pipeline import create_pipeline +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.models import CrawlResultContainer +from crawl4ai.cache_context import CacheMode +from crawl4ai import DefaultMarkdownGenerator +from crawl4ai import PruningContentFilter + +async def create_prewarmed_browser_hub(urls_to_crawl: List[str]): + """Create a pre-warmed browser hub with 10 browsers and 5 pages each.""" + # Set up logging + logger = AsyncLogger(verbose=True) + logger.info("Setting up pre-warmed browser hub", tag="DEMO") + + # Create browser configuration + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, # Set to False to see the browsers in action + viewport_width=1280, + viewport_height=800, + light_mode=True, # Optimize for performance + java_script_enabled=True + ) + + # Create crawler configurations for pre-warming with different user agents + # This allows pages to be ready for different scenarios + crawler_configs = [ + CrawlerRunConfig( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + wait_until="networkidle" + ), + # CrawlerRunConfig( + # user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15", + # wait_until="networkidle" + # ), + # CrawlerRunConfig( + # user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", + # wait_until="networkidle" + # ) + ] + + # Number of browsers and pages per browser + num_browsers = 1 + pages_per_browser = 1 + + # Distribute pages across configurations + # We'll create a total of 50 pages (10 browsers × 5 pages) + page_configs = [] + total_pages = num_browsers * pages_per_browser + pages_per_config = total_pages // len(crawler_configs) + + for i, config in enumerate(crawler_configs): + # For the last config, add any remaining pages + if i == len(crawler_configs) - 1: + remaining = total_pages - (pages_per_config * (len(crawler_configs) - 1)) + page_configs.append((browser_config, config, remaining)) + else: + page_configs.append((browser_config, config, pages_per_config)) + + # Create browser hub with pre-warmed pages + start_time = asyncio.get_event_loop().time() + logger.info("Initializing browser hub with pre-warmed pages...", tag="DEMO") + + hub = await BrowserHub.get_browser_manager( + config=browser_config, + hub_id="demo_hub", + logger=logger, + max_browsers_per_config=num_browsers, + max_pages_per_browser=pages_per_browser, + initial_pool_size=num_browsers, + page_configs=page_configs + ) + + end_time = asyncio.get_event_loop().time() + logger.success( + message="Browser hub initialized with {total_pages} pre-warmed pages in {duration:.2f} seconds", + tag="DEMO", + params={ + "total_pages": total_pages, + "duration": end_time - start_time + } + ) + + # Get and display pool status + status = await hub.get_pool_status() + logger.info( + message="Browser pool status: {status}", + tag="DEMO", + params={"status": status} + ) + + return hub + +async def crawl_urls_with_hub(hub, urls: List[str]) -> List[CrawlResultContainer]: + """Crawl a list of URLs using a pre-warmed browser hub.""" + logger = AsyncLogger(verbose=True) + + # Create crawler configuration + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="fixed", + min_word_threshold=0 + ) + ), + wait_until="networkidle", + screenshot=True + ) + + # Create pipeline with the browser hub + pipeline = await create_pipeline( + browser_hub=hub, + logger=logger + ) + + results = [] + + # Crawl all URLs in parallel + async def crawl_url(url): + logger.info(f"Crawling {url}...", tag="CRAWL") + result = await pipeline.crawl(url=url, config=crawler_config) + logger.success(f"Completed crawl of {url}", tag="CRAWL") + return result + + # Create tasks for all URLs + tasks = [crawl_url(url) for url in urls] + + # Execute all tasks in parallel and collect results + results = await asyncio.gather(*tasks) + + return results + +async def main(): + """Main demo function.""" + # List of URLs to crawl + urls_to_crawl = [ + "https://example.com", + # "https://www.python.org", + # "https://httpbin.org/html", + # "https://news.ycombinator.com", + # "https://github.com", + # "https://pypi.org", + # "https://docs.python.org/3/", + # "https://opensource.org", + # "https://whatismyipaddress.com", + # "https://en.wikipedia.org/wiki/Web_scraping" + ] + + # Set up logging + logger = AsyncLogger(verbose=True) + logger.info("Starting browser hub demo", tag="DEMO") + + try: + # Create pre-warmed browser hub + hub = await create_prewarmed_browser_hub(urls_to_crawl) + + # Use hub to crawl URLs + logger.info("Crawling URLs in parallel...", tag="DEMO") + start_time = asyncio.get_event_loop().time() + + results = await crawl_urls_with_hub(hub, urls_to_crawl) + + end_time = asyncio.get_event_loop().time() + + # Display results + logger.success( + message="Crawled {count} URLs in {duration:.2f} seconds (average: {avg:.2f} seconds per URL)", + tag="DEMO", + params={ + "count": len(results), + "duration": end_time - start_time, + "avg": (end_time - start_time) / len(results) + } + ) + + # Print summary of results + logger.info("Crawl results summary:", tag="DEMO") + for i, result in enumerate(results): + logger.info( + message="{idx}. {url}: Success={success}, Content length={length}", + tag="RESULT", + params={ + "idx": i+1, + "url": result.url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + + if result.success and result.markdown and result.markdown.raw_markdown: + # Print a snippet of the markdown + markdown_snippet = result.markdown.raw_markdown[:150] + "..." + logger.info( + message=" Markdown: {snippet}", + tag="RESULT", + params={"snippet": markdown_snippet} + ) + + # Display final browser pool status + status = await hub.get_pool_status() + logger.info( + message="Final browser pool status: {status}", + tag="DEMO", + params={"status": status} + ) + + finally: + # Clean up + logger.info("Shutting down browser hub...", tag="DEMO") + await BrowserHub.shutdown_all() + logger.success("Demo completed", tag="DEMO") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/pipeline/extended_browser_hub_tests.py b/tests/pipeline/extended_browser_hub_tests.py new file mode 100644 index 00000000..2f80d83b --- /dev/null +++ b/tests/pipeline/extended_browser_hub_tests.py @@ -0,0 +1,505 @@ +# extended_browser_hub_tests.py + +import asyncio + +from crawl4ai.browser.browser_hub import BrowserHub +from pipeline import create_pipeline +from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig +from crawl4ai.async_logger import AsyncLogger +from crawl4ai.cache_context import CacheMode + +# Common test URLs +TEST_URLS = [ + "https://example.com", + "https://example.com/page1", + "https://httpbin.org/html", + "https://httpbin.org/headers", + "https://httpbin.org/ip", + "https://httpstat.us/200" +] + +class TestResults: + """Simple container for test results""" + def __init__(self, name: str): + self.name = name + self.results = [] + self.start_time = None + self.end_time = None + self.errors = [] + + @property + def duration(self) -> float: + if self.start_time and self.end_time: + return self.end_time - self.start_time + return 0 + + @property + def success_rate(self) -> float: + if not self.results: + return 0 + return sum(1 for r in self.results if r.success) / len(self.results) * 100 + + def log_summary(self, logger: AsyncLogger): + logger.info(f"=== Test: {self.name} ===", tag="SUMMARY") + logger.info( + message="Duration: {duration:.2f}s, Success rate: {success_rate:.1f}%, Results: {count}", + tag="SUMMARY", + params={ + "duration": self.duration, + "success_rate": self.success_rate, + "count": len(self.results) + } + ) + + if self.errors: + logger.error( + message="Errors ({count}): {errors}", + tag="SUMMARY", + params={ + "count": len(self.errors), + "errors": "; ".join(str(e) for e in self.errors) + } + ) + +# ======== TEST SCENARIO 1: Simple default configuration ======== +async def test_default_configuration(): + """ + Test Scenario 1: Simple default configuration + + This tests the basic case where the user does not provide any specific + browser configuration, relying on default auto-setup. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Default Configuration") + + try: + # Create pipeline with no browser config + pipeline = await create_pipeline(logger=logger) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Create basic crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="domcontentloaded" + ) + + # Process each URL sequentially + for url in TEST_URLS: + try: + logger.info(f"Crawling {url} with default configuration", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + logger.success( + message="Result: url={url}, success={success}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 2: Detailed custom configuration ======== +async def test_custom_configuration(): + """ + Test Scenario 2: Detailed custom configuration + + This tests the case where the user provides detailed browser configuration + to customize the browser behavior. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Custom Configuration") + + try: + # Create custom browser config + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, + viewport_width=1920, + viewport_height=1080, + user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36", + light_mode=True, + ignore_https_errors=True, + extra_args=["--disable-extensions"] + ) + + # Create custom crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle", + page_timeout=30000, + screenshot=True, + pdf=False, + screenshot_wait_for=0.5, + wait_for_images=True, + scan_full_page=True, + scroll_delay=0.2, + process_iframes=True, + remove_overlay_elements=True + ) + + # Create pipeline with custom configuration + pipeline = await create_pipeline( + browser_config=browser_config, + logger=logger + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Process each URL sequentially + for url in TEST_URLS: + try: + logger.info(f"Crawling {url} with custom configuration", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + has_screenshot = result.screenshot is not None + + logger.success( + message="Result: url={url}, success={success}, screenshot={screenshot}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "screenshot": has_screenshot, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Get browser hub status from context + try: + # Run a dummy crawl to get the context with browser hub + context = await pipeline.process({"url": "about:blank", "config": crawler_config}) + browser_hub = context.get("browser_hub") + if browser_hub: + status = await browser_hub.get_pool_status() + logger.info( + message="Browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + except Exception as e: + logger.error(f"Failed to get browser hub status: {str(e)}", tag="TEST") + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 3: Using pre-initialized browser hub ======== +async def test_preinitalized_browser_hub(): + """ + Test Scenario 3: Using pre-initialized browser hub + + This tests the case where a browser hub is initialized separately + and then passed to the pipeline. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Pre-initialized Browser Hub") + + browser_hub = None + try: + # Create and initialize browser hub separately + logger.info("Initializing browser hub separately", tag="TEST") + + browser_config = BrowserConfig( + browser_type="chromium", + headless=True, + verbose=True + ) + + browser_hub = await BrowserHub.get_browser_manager( + config=browser_config, + hub_id="test_preinitalized", + logger=logger, + max_browsers_per_config=2, + max_pages_per_browser=3, + initial_pool_size=2 + ) + + # Display initial status + status = await browser_hub.get_pool_status() + logger.info( + message="Initial browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + + # Create pipeline with pre-initialized browser hub + pipeline = await create_pipeline( + browser_hub=browser_hub, + logger=logger + ) + + # Create crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle", + screenshot=True + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Process URLs in parallel + async def crawl_url(url): + try: + logger.info(f"Crawling {url} with pre-initialized hub", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + logger.success(f"Completed crawl of {url}", tag="TEST") + return result + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + return None + + # Create tasks for all URLs + tasks = [crawl_url(url) for url in TEST_URLS] + + # Execute all tasks in parallel and collect results + all_results = await asyncio.gather(*tasks) + results.results = [r for r in all_results if r is not None] + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Display final status + status = await browser_hub.get_pool_status() + logger.info( + message="Final browser hub status: {status}", + tag="TEST", + params={"status": status} + ) + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results, browser_hub + +# ======== TEST SCENARIO 4: Parallel pipelines sharing browser hub ======== +async def test_parallel_pipelines(): + """ + Test Scenario 4: Multiple parallel pipelines sharing browser hub + + This tests the case where multiple pipelines share the same browser hub, + demonstrating resource sharing and parallel operation. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Parallel Pipelines") + + # We'll reuse the browser hub from the previous test + _, browser_hub = await test_preinitalized_browser_hub() + + try: + # Create 3 pipelines that all share the same browser hub + pipelines = [] + for i in range(3): + pipeline = await create_pipeline( + browser_hub=browser_hub, + logger=logger + ) + pipelines.append(pipeline) + + logger.info(f"Created {len(pipelines)} pipelines sharing the same browser hub", tag="TEST") + + # Create crawler configs with different settings + configs = [ + CrawlerRunConfig(wait_until="domcontentloaded", screenshot=False), + CrawlerRunConfig(wait_until="networkidle", screenshot=True), + CrawlerRunConfig(wait_until="load", scan_full_page=True) + ] + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Function to process URLs with a specific pipeline + async def process_with_pipeline(pipeline_idx, urls): + pipeline_results = [] + for url in urls: + try: + logger.info(f"Pipeline {pipeline_idx} crawling {url}", tag="TEST") + result = await pipelines[pipeline_idx].crawl( + url=url, + config=configs[pipeline_idx] + ) + pipeline_results.append(result) + logger.success( + message="Pipeline {idx} completed: url={url}, success={success}", + tag="TEST", + params={ + "idx": pipeline_idx, + "url": url, + "success": result.success + } + ) + except Exception as e: + logger.error( + message="Pipeline {idx} error: {error}", + tag="TEST", + params={ + "idx": pipeline_idx, + "error": str(e) + } + ) + results.errors.append(e) + return pipeline_results + + # Distribute URLs among pipelines + pipeline_urls = [ + TEST_URLS[:2], + TEST_URLS[2:4], + TEST_URLS[4:5] * 2 # Duplicate the last URL to have 2 for pipeline 3 + ] + + # Execute all pipelines in parallel + tasks = [ + process_with_pipeline(i, urls) + for i, urls in enumerate(pipeline_urls) + ] + + pipeline_results = await asyncio.gather(*tasks) + + # Flatten results + for res_list in pipeline_results: + results.results.extend(res_list) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + # Display browser hub status + status = await browser_hub.get_pool_status() + logger.info( + message="Browser hub status after parallel pipelines: {status}", + tag="TEST", + params={"status": status} + ) + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== TEST SCENARIO 5: Browser hub with connection string ======== +async def test_connection_string(): + """ + Test Scenario 5: Browser hub with connection string + + This tests the case where a browser hub is initialized from a connection string, + simulating connecting to a running browser hub service. + """ + logger = AsyncLogger(verbose=True) + results = TestResults("Connection String") + + try: + # Create pipeline with connection string + # Note: In a real implementation, this would connect to an existing service + # For this test, we're using a simulated connection + connection_string = "localhost:9222" # Simulated connection string + + pipeline = await create_pipeline( + browser_hub_connection=connection_string, + logger=logger + ) + + # Create crawler config + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_until="networkidle" + ) + + # Start timing + results.start_time = asyncio.get_event_loop().time() + + # Test with a single URL + url = TEST_URLS[0] + try: + logger.info(f"Crawling {url} with connection string hub", tag="TEST") + result = await pipeline.crawl(url=url, config=crawler_config) + results.results.append(result) + + logger.success( + message="Result: url={url}, success={success}, content_length={length}", + tag="TEST", + params={ + "url": url, + "success": result.success, + "length": len(result.html) if result.html else 0 + } + ) + except Exception as e: + logger.error(f"Error crawling {url}: {str(e)}", tag="TEST") + results.errors.append(e) + + # End timing + results.end_time = asyncio.get_event_loop().time() + + except Exception as e: + logger.error(f"Test failed with error: {str(e)}", tag="TEST") + results.errors.append(e) + + # Log summary + results.log_summary(logger) + + return results + +# ======== RUN ALL TESTS ======== +async def run_all_tests(): + """Run all test scenarios""" + logger = AsyncLogger(verbose=True) + logger.info("=== STARTING BROWSER HUB TESTS ===", tag="MAIN") + + try: + # Run each test scenario + await test_default_configuration() + # await test_custom_configuration() + # await test_preinitalized_browser_hub() + # await test_parallel_pipelines() + # await test_connection_string() + + except Exception as e: + logger.error(f"Test suite failed: {str(e)}", tag="MAIN") + finally: + # Clean up all browser hubs + logger.info("Shutting down all browser hubs...", tag="MAIN") + await BrowserHub.shutdown_all() + logger.success("All tests completed", tag="MAIN") + +if __name__ == "__main__": + asyncio.run(run_all_tests()) \ No newline at end of file diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py new file mode 100644 index 00000000..bdeca8a7 --- /dev/null +++ b/tests/pipeline/test_pipeline.py @@ -0,0 +1,109 @@ +import asyncio +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig, + CacheMode, + DefaultMarkdownGenerator, + PruningContentFilter +) +from pipeline import Pipeline + +async def main(): + # Create configuration objects + browser_config = BrowserConfig(headless=True, verbose=True) + crawler_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="fixed", + min_word_threshold=0 + ) + ), + ) + + # Create and use pipeline with context manager + async with Pipeline(browser_config=browser_config) as pipeline: + result = await pipeline.crawl( + url="https://www.example.com", + config=crawler_config + ) + + # Print the result + print(f"URL: {result.url}") + print(f"Success: {result.success}") + + if result.success: + print("\nMarkdown excerpt:") + print(result.markdown.raw_markdown[:500] + "...") + else: + print(f"Error: {result.error_message}") + +if __name__ == "__main__": + asyncio.run(main()) + + +class CrawlTarget: + def __init__(self, urls, config=None): + self.urls = urls + self.config = config + + def __repr__(self): + return f"CrawlTarget(urls={self.urls}, config={self.config})" + + + + +# async def main(): +# # Create configuration objects +# browser_config = BrowserConfig(headless=True, verbose=True) + +# # Define different configurations +# config1 = CrawlerRunConfig( +# cache_mode=CacheMode.BYPASS, +# markdown_generator=DefaultMarkdownGenerator( +# content_filter=PruningContentFilter(threshold=0.48) +# ), +# ) + +# config2 = CrawlerRunConfig( +# cache_mode=CacheMode.ENABLED, +# screenshot=True, +# pdf=True +# ) + +# # Create crawl targets +# targets = [ +# CrawlTarget( +# urls=["https://www.example.com", "https://www.wikipedia.org"], +# config=config1 +# ), +# CrawlTarget( +# urls="https://news.ycombinator.com", +# config=config2 +# ), +# CrawlTarget( +# urls=["https://github.com", "https://stackoverflow.com", "https://python.org"], +# config=None +# ) +# ] + +# # Create and use pipeline with context manager +# async with Pipeline(browser_config=browser_config) as pipeline: +# all_results = await pipeline.crawl_batch(targets) + +# for target_key, results in all_results.items(): +# print(f"\n===== Results for {target_key} =====") +# print(f"Number of URLs crawled: {len(results)}") + +# for i, result in enumerate(results): +# print(f"\nURL {i+1}: {result.url}") +# print(f"Success: {result.success}") + +# if result.success: +# print(f"Content length: {len(result.markdown.raw_markdown)} chars") +# else: +# print(f"Error: {result.error_message}") + +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file