From f7ce2d42c996c3eba06cfd70a57c24326b35637a Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Thu, 30 Jan 2025 17:49:58 +0530 Subject: [PATCH] feat: Add deep crawl capabilities to arun_many function --- crawl4ai/async_webcrawler.py | 37 +++++++ .../deep_crawl/bfs_deep_crawl_strategy.py | 1 - docs/deep_crawl/deep_crawl_quickstart.py | 104 +++++++++++++++--- 3 files changed, 124 insertions(+), 18 deletions(-) diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 3c9026ee..2550ee63 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -798,6 +798,22 @@ class AsyncWebCrawler: ): print(f"Processed {result.url}: {len(result.markdown)} chars") """ + + async def merge_async_generators(generators): + tasks = {asyncio.create_task(gen.__anext__()): gen for gen in generators} + while tasks: + done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + gen = tasks.pop(task) # Get the generator associated with this task + + try: + result = task.result() + yield result # Yield the result + tasks[asyncio.create_task(gen.__anext__())] = gen # Fetch next item + except StopAsyncIteration: + pass # Generator is exhausted, don't add it back to the tasks + if config is None: config = CrawlerRunConfig( word_count_threshold=word_count_threshold, @@ -838,6 +854,27 @@ class AsyncWebCrawler: stream = config.stream + if config.deep_crawl_strategy: + if config.stream: + generators = [] + for url in urls: + generators.append( + config.deep_crawl_strategy.arun( + start_url=url, crawler=self, crawler_run_config=config + ) + ) + return merge_async_generators(generators) + else: + results = [] + for url in urls: + url_results = [] + async for result in config.deep_crawl_strategy.arun( + start_url=url, crawler=self, crawler_run_config=config + ): + url_results.append(result) + results.append(url_results) + return results + if stream: async def result_transformer(): diff --git a/crawl4ai/deep_crawl/bfs_deep_crawl_strategy.py b/crawl4ai/deep_crawl/bfs_deep_crawl_strategy.py index da70c6d9..3cab442a 100644 --- a/crawl4ai/deep_crawl/bfs_deep_crawl_strategy.py +++ b/crawl4ai/deep_crawl/bfs_deep_crawl_strategy.py @@ -187,7 +187,6 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): finally: self.stats.end_time = datetime.now() - await crawler.close() async def shutdown(self): """Clean up resources and stop crawling""" diff --git a/docs/deep_crawl/deep_crawl_quickstart.py b/docs/deep_crawl/deep_crawl_quickstart.py index 7fe604e3..ea9b7d7f 100644 --- a/docs/deep_crawl/deep_crawl_quickstart.py +++ b/docs/deep_crawl/deep_crawl_quickstart.py @@ -1,4 +1,3 @@ -# basic_scraper_example.py from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy from crawl4ai.deep_crawl import ( @@ -20,9 +19,9 @@ import logging browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) -async def basic_scraper_example(): +async def basic_example(): """ - Basic example: Scrape a blog site for articles + Basic example: Deep crawl a blog site for articles - Crawls only HTML pages - Stays within the blog section - Collects all results at once @@ -45,14 +44,15 @@ async def basic_scraper_example(): process_external_links=True, ) - # Create the crawler and scraper + # Create the crawler async with AsyncWebCrawler( config=browser_config, ) as crawler: # Start scraping try: results = await crawler.arun( - "https://crawl4ai.com/mkdocs", CrawlerRunConfig(deep_crawl_strategy=bfs_strategy) + "https://crawl4ai.com/mkdocs", + CrawlerRunConfig(deep_crawl_strategy=bfs_strategy), ) # Process results print(f"Crawled {len(results)} pages:") @@ -62,9 +62,10 @@ async def basic_scraper_example(): except Exception as e: print(f"Error during scraping: {e}") -async def advanced_scraper_example(): + +async def advanced_example(): """ - Advanced example: Intelligent news site scraping + Advanced example: Intelligent news site crawling - Uses all filter types - Implements sophisticated scoring - Streams results @@ -72,7 +73,7 @@ async def advanced_scraper_example(): """ # Set up logging logging.basicConfig(level=logging.INFO) - logger = logging.getLogger("advanced_scraper") + logger = logging.getLogger("advanced_deep_crawler") # Create sophisticated filter chain filter_chain = FilterChain( @@ -115,7 +116,7 @@ async def advanced_scraper_example(): max_depth=2, filter_chain=filter_chain, url_scorer=scorer ) - # Create crawler and scraper + # Create crawler async with AsyncWebCrawler( config=browser_config, ) as crawler: @@ -128,8 +129,7 @@ async def advanced_scraper_example(): results = [] result_generator = await crawler.arun( "https://techcrunch.com", - config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy, - stream=True) + config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy, stream=True), ) async for result in result_generator: stats["processed"] += 1 @@ -174,17 +174,87 @@ async def advanced_scraper_example(): ) +async def basic_example_many_urls(): + filter_chain = FilterChain( + [ + URLPatternFilter("*/basic/*"), + ContentTypeFilter(["text/html"]), + ] + ) + # Initialize the strategy with basic configuration + bfs_strategy = BFSDeepCrawlStrategy( + max_depth=2, # Only go 2 levels deep + filter_chain=filter_chain, + url_scorer=None, # Use default scoring + process_external_links=False, + ) + + # Create the crawler + async with AsyncWebCrawler( + config=browser_config, + ) as crawler: + # Start scraping + try: + results = await crawler.arun_many( + urls=["https://crawl4ai.com/mkdocs","https://aravindkarnam.com"], + config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy), + ) + # Process results + print(f"Crawled {len(results)} pages:") + for url_result in results: + for result in url_result: + print(f"- {result.url}: {len(result.html)} bytes") + + except Exception as e: + print(f"Error during scraping: {e}") + +async def basic_example_many_urls_stream(): + filter_chain = FilterChain( + [ + URLPatternFilter("*/basic/*"), + ContentTypeFilter(["text/html"]), + ] + ) + # Initialize the strategy with basic configuration + bfs_strategy = BFSDeepCrawlStrategy( + max_depth=2, # Only go 2 levels deep + filter_chain=filter_chain, + url_scorer=None, # Use default scoring + process_external_links=False, + ) + + # Create the crawler + async with AsyncWebCrawler( + config=browser_config, + ) as crawler: + # Start scraping + try: + async for result in await crawler.arun_many( + urls=["https://crawl4ai.com/mkdocs","https://aravindkarnam.com"], + config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy,stream=True), + ): + # Process results + print(f"- {result.url}: {len(result.html)} bytes") + except Exception as e: + print(f"Error during scraping: {e}") + if __name__ == "__main__": import asyncio import time # Run basic example start_time = time.perf_counter() - print("Running basic scraper example...") - asyncio.run(basic_scraper_example()) + print("Running basic Deep crawl example...") + asyncio.run(basic_example()) end_time = time.perf_counter() - print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") + print(f"Basic deep crawl example completed in {end_time - start_time:.2f} seconds") - # # Run advanced example - print("\nRunning advanced scraper example...") - asyncio.run(advanced_scraper_example()) + # Run advanced example + print("\nRunning advanced deep crawl example...") + asyncio.run(advanced_example()) + + print("\nRunning advanced deep crawl example with arun_many...") + asyncio.run(basic_example_many_urls()) + + print("\nRunning advanced deep crawl example with arun_many streaming enabled...") + asyncio.run(basic_example_many_urls_stream())