fix(deep_crawling): fix priority queue ordering and link truncation in BestFirstCrawlingStrategy - ref #1253

This commit fixes two critical issues in BestFirstCrawlingStrategy:

1. Priority Queue Ordering:
   - Problem: asyncio.PriorityQueue is a min-heap, causing lower-scored URLs to be crawled first
   - Solution: Use negative scores as priority to achieve max-heap behavior
   - Result: Higher-scoring pages are now correctly prioritized and crawled first

2. Link Discovery Truncation:
   - Problem: Links were truncated before scoring, potentially discarding high-value pages
   - Solution: Score all valid links first, sort by score, then truncate to capacity
   - Result: Only the highest-scoring links are kept when capacity limits are reached

Changes:
- Modified link_discovery to score URLs before truncation
- Updated priority queue to use negative scores for proper ordering
- Changed tuple structure to include pre-calculated scores
- Added logging to show when high-scoring URLs are being kept

This ensures best-first crawling behavior works as documented, with higher-value
pages being prioritized over lower-value ones.
This commit is contained in:
ntohidi
2025-08-07 15:28:43 +08:00
parent a5bcac4c9d
commit 66925eb1d6

View File

@@ -80,12 +80,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
source_url: str,
current_depth: int,
visited: Set[str],
next_links: List[Tuple[str, Optional[str]]],
next_links: List[Tuple[str, Optional[str], float]],
depths: Dict[str, int],
) -> None:
"""
Extract links from the crawl result, validate them, and append new URLs
(with their parent references) to next_links.
Extract links from the crawl result, validate them, score them,
and append the highest-scoring URLs (with their parent references and scores) to next_links.
Also updates the depths dictionary.
"""
new_depth = current_depth + 1
@@ -103,8 +103,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
if self.include_external:
links += result.links.get("external", [])
# If we have more links than remaining capacity, limit how many we'll process
valid_links = []
# Collect and validate all links
valid_links_with_scores = []
for link in links:
url = link.get("href")
base_url = normalize_url_for_deep_crawl(url, source_url)
@@ -113,18 +113,23 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
if not await self.can_process_url(url, new_depth):
self.stats.urls_skipped += 1
continue
valid_links.append(base_url)
# If we have more valid links than capacity, limit them
if len(valid_links) > remaining_capacity:
valid_links = valid_links[:remaining_capacity]
self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit")
# Score the URL
score = self.url_scorer.score(base_url) if self.url_scorer else 0.0
valid_links_with_scores.append((base_url, score))
# Sort by score descending (highest scores first)
valid_links_with_scores.sort(key=lambda x: x[1], reverse=True)
# If we have more valid links than capacity, keep only the highest-scoring ones
if len(valid_links_with_scores) > remaining_capacity:
self.logger.info(f"Keeping top {remaining_capacity} highest-scoring URLs out of {len(valid_links_with_scores)} valid links")
valid_links_with_scores = valid_links_with_scores[:remaining_capacity]
# Record the new depths and add to next_links
for url in valid_links:
# Record the new depths and add to next_links with scores
for url, score in valid_links_with_scores:
depths[url] = new_depth
next_links.append((url, source_url))
next_links.append((url, source_url, score))
async def _arun_best_first(
self,
@@ -135,12 +140,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
"""
Core best-first crawl method using a priority queue.
The queue items are tuples of (score, depth, url, parent_url). Lower scores
are treated as higher priority. URLs are processed in batches for efficiency.
The queue items are tuples of (priority, depth, url, parent_url, original_score).
We use negative scores as priority to achieve max-heap behavior (higher scores = higher priority).
URLs are processed in batches for efficiency.
"""
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
# Push the initial URL with score 0 and depth 0.
await queue.put((0, 0, start_url, None))
# Push the initial URL with priority 0 (will be processed first) and depth 0.
await queue.put((0, 0, start_url, None, 0.0))
visited: Set[str] = set()
depths: Dict[str, int] = {start_url: 0}
@@ -158,17 +164,17 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
break
batch: List[Tuple[float, int, str, Optional[str]]] = []
batch: List[Tuple[float, int, str, Optional[str], float]] = []
# Retrieve up to BATCH_SIZE items from the priority queue.
for _ in range(BATCH_SIZE):
for _ in range(batch_size):
if queue.empty():
break
item = await queue.get()
score, depth, url, parent_url = item
priority, depth, url, parent_url, original_score = item
if url in visited:
continue
visited.add(url)
batch.append(item)
batch.append((priority, depth, url, parent_url, original_score))
if not batch:
continue
@@ -183,11 +189,11 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
corresponding = next((item for item in batch if item[2] == result_url), None)
if not corresponding:
continue
score, depth, url, parent_url = corresponding
priority, depth, url, parent_url, original_score = corresponding
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
result.metadata["parent_url"] = parent_url
result.metadata["score"] = score
result.metadata["score"] = original_score
# Count only successful crawls toward max_pages limit
if result.success:
@@ -202,13 +208,14 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
# Only discover links from successful crawls
if result.success:
# Discover new links from this result
new_links: List[Tuple[str, Optional[str]]] = []
new_links: List[Tuple[str, Optional[str], float]] = []
await self.link_discovery(result, result_url, depth, visited, new_links, depths)
for new_url, new_parent in new_links:
for new_url, new_parent, new_score in new_links:
new_depth = depths.get(new_url, depth + 1)
new_score = self.url_scorer.score(new_url) if self.url_scorer else 0
await queue.put((new_score, new_depth, new_url, new_parent))
# Use negative score as priority for max-heap behavior
priority = -new_score if new_score > 0 else 0
await queue.put((priority, new_depth, new_url, new_parent, new_score))
# End of crawl.