From 85847ff13f11d0abc5fa0ee0736cc049b933ff6a Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Tue, 28 Jan 2025 12:39:45 +0530 Subject: [PATCH] feat: 1. Make active_crawls into a dict instead of set and remove jobs array. Effective lookup and storage of active crawls and crawl control. 2. Put a lock on active_crawls, so similtanious push and pop by coroutines doesn't cause a race condition 3. Move the depth check logic outside the child link for loop, as source_url doesn't change in the loop. --- crawl4ai/scraper/bfs_scraper_strategy.py | 65 ++++++++++-------------- docs/scraper/scraper_quickstart.py | 10 ++-- 2 files changed, 33 insertions(+), 42 deletions(-) diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 11568ed7..93100d1d 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -46,6 +46,7 @@ class BFSScraperStrategy(ScraperStrategy): self.stats = CrawlStats(start_time=datetime.now()) self._cancel_event = asyncio.Event() self.process_external_links = process_external_links + self._active_crawls_lock = asyncio.Lock() async def can_process_url(self, url: str, depth: int) -> bool: """Check if URL can be processed based on filters @@ -77,21 +78,25 @@ class BFSScraperStrategy(ScraperStrategy): self, result: CrawlResult, source_url: str, - depth: int, queue: asyncio.PriorityQueue, visited: Set[str], depths: Dict[str, int], ): """Process extracted links from crawl result. This is our link processor that: - Handles both internal and external links - Checks if URL can be processed - validates URL, applies Filters with can_process_url Checks depth limits + Handles both internal and external links + Checks if URL is visited already + Checks if URL can be processed - validates URL, applies Filters with can_process_url Scores URLs for priority - Updates depth tracking + Updates depth tracking dictionary Adds valid URLs to the queue Updates maximum depth statistics """ + next_depth = depths[source_url] + 1 + # If depth limit reached, exit without processing links + if next_depth > self.max_depth: + return links_to_process = result.links["internal"] if self.process_external_links: links_to_process += result.links["external"] @@ -99,17 +104,14 @@ class BFSScraperStrategy(ScraperStrategy): url = link["href"] if url in visited: continue - new_depth = depths[source_url] + 1 - if new_depth > self.max_depth: - continue - if not await self.can_process_url(url, new_depth): + if not await self.can_process_url(url, next_depth): self.stats.urls_skipped += 1 continue score = self.url_scorer.score(url) if self.url_scorer else 0 - await queue.put((score, new_depth, url)) - depths[url] = new_depth + await queue.put((score, next_depth, url)) + depths[url] = next_depth self.stats.total_depth_reached = max( - self.stats.total_depth_reached, new_depth + self.stats.total_depth_reached, next_depth ) async def ascrape( @@ -134,7 +136,7 @@ class BFSScraperStrategy(ScraperStrategy): await queue.put((0, 0, start_url)) visited: Set[str] = set() depths = {start_url: 0} - active_crawls = set() # Track URLs currently being processed + active_crawls = {} # Track URLs currently being processed with depth and score async with AsyncWebCrawler( config=browser_config, verbose=True, @@ -149,39 +151,31 @@ class BFSScraperStrategy(ScraperStrategy): - Or while there are active crawls still running (arun_many) - Can be interrupted via cancellation (not self._cancel_event.is_set()) """ - # Collect batch of jobs to process - jobs = [] - # Fill batch with available jobs - while len(jobs) < SCRAPER_BATCH_SIZE and not queue.empty(): - score, depth, url = await queue.get() - if ( - url not in active_crawls - ): # Only add if not currently processing - jobs.append((score, depth, url)) - active_crawls.add(url) + # Collect batch of URLs into active_crawls to process + async with self._active_crawls_lock: + while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty(): + score, depth, url = await queue.get() + active_crawls[url] = {"depth": depth, "score": score} self.stats.current_depth = depth - if not jobs: - # If no jobs but active crawls exist, wait a bit and continue - if active_crawls: - await asyncio.sleep(0.1) + if not active_crawls: + # If no active crawls exist, wait a bit and continue + await asyncio.sleep(0.1) continue # Process batch try: async for result in await crawler.arun_many( - urls=[url for _, _, url in jobs], + urls=list(active_crawls.keys()), config=crawler_config.clone(stream=True), ): - source_url, depth = next( - (url, depth) - for _, depth, url in jobs - if url == result.url - ) - active_crawls.remove(source_url) # Remove from active set + source_url = result.url + depth = active_crawls[source_url]["depth"] + async with self._active_crawls_lock: + active_crawls.pop(source_url, None) if result.success: await self._process_links( - result, source_url, depth, queue, visited, depths + result, source_url, queue, visited, depths ) yield result else: @@ -189,9 +183,6 @@ class BFSScraperStrategy(ScraperStrategy): f"Failed to crawl {result.url}: {result.error_message}" ) except Exception as e: - # Remove failed URLs from active set - for _, _, url in jobs: - active_crawls.discard(url) self.logger.error(f"Batch processing error: {e}") # Continue processing other batches continue diff --git a/docs/scraper/scraper_quickstart.py b/docs/scraper/scraper_quickstart.py index 893dd5ce..749914e0 100644 --- a/docs/scraper/scraper_quickstart.py +++ b/docs/scraper/scraper_quickstart.py @@ -188,11 +188,11 @@ if __name__ == "__main__": import time # Run basic example - # start_time = time.perf_counter() - # print("Running basic scraper example...") - # asyncio.run(basic_scraper_example()) - # end_time = time.perf_counter() - # print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") + start_time = time.perf_counter() + print("Running basic scraper example...") + asyncio.run(basic_scraper_example()) + end_time = time.perf_counter() + print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") # # Run advanced example print("\nRunning advanced scraper example...")