From 66925eb1d6a84e81ab29f2818981b63319f8ee4f Mon Sep 17 00:00:00 2001 From: ntohidi Date: Thu, 7 Aug 2025 15:28:43 +0800 Subject: [PATCH] fix(deep_crawling): fix priority queue ordering and link truncation in BestFirstCrawlingStrategy - ref #1253 This commit fixes two critical issues in BestFirstCrawlingStrategy: 1. Priority Queue Ordering: - Problem: asyncio.PriorityQueue is a min-heap, causing lower-scored URLs to be crawled first - Solution: Use negative scores as priority to achieve max-heap behavior - Result: Higher-scoring pages are now correctly prioritized and crawled first 2. Link Discovery Truncation: - Problem: Links were truncated before scoring, potentially discarding high-value pages - Solution: Score all valid links first, sort by score, then truncate to capacity - Result: Only the highest-scoring links are kept when capacity limits are reached Changes: - Modified link_discovery to score URLs before truncation - Updated priority queue to use negative scores for proper ordering - Changed tuple structure to include pre-calculated scores - Added logging to show when high-scoring URLs are being kept This ensures best-first crawling behavior works as documented, with higher-value pages being prioritized over lower-value ones. --- crawl4ai/deep_crawling/bff_strategy.py | 63 ++++++++++++++------------ 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/crawl4ai/deep_crawling/bff_strategy.py b/crawl4ai/deep_crawling/bff_strategy.py index 7779c9f4..280fe95a 100644 --- a/crawl4ai/deep_crawling/bff_strategy.py +++ b/crawl4ai/deep_crawling/bff_strategy.py @@ -80,12 +80,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): source_url: str, current_depth: int, visited: Set[str], - next_links: List[Tuple[str, Optional[str]]], + next_links: List[Tuple[str, Optional[str], float]], depths: Dict[str, int], ) -> None: """ - Extract links from the crawl result, validate them, and append new URLs - (with their parent references) to next_links. + Extract links from the crawl result, validate them, score them, + and append the highest-scoring URLs (with their parent references and scores) to next_links. Also updates the depths dictionary. """ new_depth = current_depth + 1 @@ -103,8 +103,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): if self.include_external: links += result.links.get("external", []) - # If we have more links than remaining capacity, limit how many we'll process - valid_links = [] + # Collect and validate all links + valid_links_with_scores = [] for link in links: url = link.get("href") base_url = normalize_url_for_deep_crawl(url, source_url) @@ -113,18 +113,23 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): if not await self.can_process_url(url, new_depth): self.stats.urls_skipped += 1 continue - - valid_links.append(base_url) - # If we have more valid links than capacity, limit them - if len(valid_links) > remaining_capacity: - valid_links = valid_links[:remaining_capacity] - self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit") + # Score the URL + score = self.url_scorer.score(base_url) if self.url_scorer else 0.0 + valid_links_with_scores.append((base_url, score)) + + # Sort by score descending (highest scores first) + valid_links_with_scores.sort(key=lambda x: x[1], reverse=True) + + # If we have more valid links than capacity, keep only the highest-scoring ones + if len(valid_links_with_scores) > remaining_capacity: + self.logger.info(f"Keeping top {remaining_capacity} highest-scoring URLs out of {len(valid_links_with_scores)} valid links") + valid_links_with_scores = valid_links_with_scores[:remaining_capacity] - # Record the new depths and add to next_links - for url in valid_links: + # Record the new depths and add to next_links with scores + for url, score in valid_links_with_scores: depths[url] = new_depth - next_links.append((url, source_url)) + next_links.append((url, source_url, score)) async def _arun_best_first( self, @@ -135,12 +140,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): """ Core best-first crawl method using a priority queue. - The queue items are tuples of (score, depth, url, parent_url). Lower scores - are treated as higher priority. URLs are processed in batches for efficiency. + The queue items are tuples of (priority, depth, url, parent_url, original_score). + We use negative scores as priority to achieve max-heap behavior (higher scores = higher priority). + URLs are processed in batches for efficiency. """ queue: asyncio.PriorityQueue = asyncio.PriorityQueue() - # Push the initial URL with score 0 and depth 0. - await queue.put((0, 0, start_url, None)) + # Push the initial URL with priority 0 (will be processed first) and depth 0. + await queue.put((0, 0, start_url, None, 0.0)) visited: Set[str] = set() depths: Dict[str, int] = {start_url: 0} @@ -158,17 +164,17 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl") break - batch: List[Tuple[float, int, str, Optional[str]]] = [] + batch: List[Tuple[float, int, str, Optional[str], float]] = [] # Retrieve up to BATCH_SIZE items from the priority queue. - for _ in range(BATCH_SIZE): + for _ in range(batch_size): if queue.empty(): break item = await queue.get() - score, depth, url, parent_url = item + priority, depth, url, parent_url, original_score = item if url in visited: continue visited.add(url) - batch.append(item) + batch.append((priority, depth, url, parent_url, original_score)) if not batch: continue @@ -183,11 +189,11 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): corresponding = next((item for item in batch if item[2] == result_url), None) if not corresponding: continue - score, depth, url, parent_url = corresponding + priority, depth, url, parent_url, original_score = corresponding result.metadata = result.metadata or {} result.metadata["depth"] = depth result.metadata["parent_url"] = parent_url - result.metadata["score"] = score + result.metadata["score"] = original_score # Count only successful crawls toward max_pages limit if result.success: @@ -202,13 +208,14 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): # Only discover links from successful crawls if result.success: # Discover new links from this result - new_links: List[Tuple[str, Optional[str]]] = [] + new_links: List[Tuple[str, Optional[str], float]] = [] await self.link_discovery(result, result_url, depth, visited, new_links, depths) - for new_url, new_parent in new_links: + for new_url, new_parent, new_score in new_links: new_depth = depths.get(new_url, depth + 1) - new_score = self.url_scorer.score(new_url) if self.url_scorer else 0 - await queue.put((new_score, new_depth, new_url, new_parent)) + # Use negative score as priority for max-heap behavior + priority = -new_score if new_score > 0 else 0 + await queue.put((priority, new_depth, new_url, new_parent, new_score)) # End of crawl.