Compare commits

..

1 Commits

Author SHA1 Message Date
ntohidi
88a9fbbb7e fix(deep-crawl): BestFirst priority inversion; remove pre-scoring truncation. ref #1253
Use negative scores in PQ to visit high-score URLs first and drop link cap prior to scoring; add test for ordering.
2025-08-11 18:16:57 +08:00
2 changed files with 142 additions and 36 deletions

View File

@@ -80,12 +80,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
source_url: str,
current_depth: int,
visited: Set[str],
next_links: List[Tuple[str, Optional[str], float]],
next_links: List[Tuple[str, Optional[str]]],
depths: Dict[str, int],
) -> None:
"""
Extract links from the crawl result, validate them, score them,
and append the highest-scoring URLs (with their parent references and scores) to next_links.
Extract links from the crawl result, validate them, and append new URLs
(with their parent references) to next_links.
Also updates the depths dictionary.
"""
new_depth = current_depth + 1
@@ -103,8 +103,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
if self.include_external:
links += result.links.get("external", [])
# Collect and validate all links
valid_links_with_scores = []
# If we have more links than remaining capacity, limit how many we'll process
valid_links = []
for link in links:
url = link.get("href")
base_url = normalize_url_for_deep_crawl(url, source_url)
@@ -113,23 +113,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
if not await self.can_process_url(url, new_depth):
self.stats.urls_skipped += 1
continue
valid_links.append(base_url)
# Score the URL
score = self.url_scorer.score(base_url) if self.url_scorer else 0.0
valid_links_with_scores.append((base_url, score))
# Sort by score descending (highest scores first)
valid_links_with_scores.sort(key=lambda x: x[1], reverse=True)
# If we have more valid links than capacity, keep only the highest-scoring ones
if len(valid_links_with_scores) > remaining_capacity:
self.logger.info(f"Keeping top {remaining_capacity} highest-scoring URLs out of {len(valid_links_with_scores)} valid links")
valid_links_with_scores = valid_links_with_scores[:remaining_capacity]
# Record the new depths and add to next_links with scores
for url, score in valid_links_with_scores:
# Record the new depths and add to next_links
for url in valid_links:
depths[url] = new_depth
next_links.append((url, source_url, score))
next_links.append((url, source_url))
async def _arun_best_first(
self,
@@ -140,13 +130,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
"""
Core best-first crawl method using a priority queue.
The queue items are tuples of (priority, depth, url, parent_url, original_score).
We use negative scores as priority to achieve max-heap behavior (higher scores = higher priority).
URLs are processed in batches for efficiency.
The queue items are tuples of (score, depth, url, parent_url). Lower scores
are treated as higher priority. URLs are processed in batches for efficiency.
"""
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
# Push the initial URL with priority 0 (will be processed first) and depth 0.
await queue.put((0, 0, start_url, None, 0.0))
# Push the initial URL with score 0 and depth 0.
initial_score = self.url_scorer.score(start_url) if self.url_scorer else 0
await queue.put((-initial_score, 0, start_url, None))
visited: Set[str] = set()
depths: Dict[str, int] = {start_url: 0}
@@ -164,17 +154,17 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
break
batch: List[Tuple[float, int, str, Optional[str], float]] = []
batch: List[Tuple[float, int, str, Optional[str]]] = []
# Retrieve up to BATCH_SIZE items from the priority queue.
for _ in range(batch_size):
for _ in range(BATCH_SIZE):
if queue.empty():
break
item = await queue.get()
priority, depth, url, parent_url, original_score = item
score, depth, url, parent_url = item
if url in visited:
continue
visited.add(url)
batch.append((priority, depth, url, parent_url, original_score))
batch.append(item)
if not batch:
continue
@@ -189,11 +179,11 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
corresponding = next((item for item in batch if item[2] == result_url), None)
if not corresponding:
continue
priority, depth, url, parent_url, original_score = corresponding
score, depth, url, parent_url = corresponding
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
result.metadata["parent_url"] = parent_url
result.metadata["score"] = original_score
result.metadata["score"] = -score
# Count only successful crawls toward max_pages limit
if result.success:
@@ -208,14 +198,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
# Only discover links from successful crawls
if result.success:
# Discover new links from this result
new_links: List[Tuple[str, Optional[str], float]] = []
new_links: List[Tuple[str, Optional[str]]] = []
await self.link_discovery(result, result_url, depth, visited, new_links, depths)
for new_url, new_parent, new_score in new_links:
for new_url, new_parent in new_links:
new_depth = depths.get(new_url, depth + 1)
# Use negative score as priority for max-heap behavior
priority = -new_score if new_score > 0 else 0
await queue.put((priority, new_depth, new_url, new_parent, new_score))
new_score = self.url_scorer.score(new_url) if self.url_scorer else 0
await queue.put((-new_score, new_depth, new_url, new_parent))
# End of crawl.

View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""
Simple test to verify BestFirstCrawlingStrategy fixes.
This test crawls a real website and shows that:
1. Higher-scoring pages are crawled first (priority queue fix)
2. Links are scored before truncation (link discovery fix)
"""
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.deep_crawling import BestFirstCrawlingStrategy
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
async def test_best_first_strategy():
"""Test BestFirstCrawlingStrategy with keyword scoring"""
print("=" * 70)
print("Testing BestFirstCrawlingStrategy with Real URL")
print("=" * 70)
print("\nThis test will:")
print("1. Crawl Python.org documentation")
print("2. Score pages based on keywords: 'tutorial', 'guide', 'reference'")
print("3. Show that higher-scoring pages are crawled first")
print("-" * 70)
# Create a keyword scorer that prioritizes tutorial/guide pages
scorer = KeywordRelevanceScorer(
keywords=["tutorial", "guide", "reference", "documentation"],
weight=1.0,
case_sensitive=False
)
# Create the strategy with scoring
strategy = BestFirstCrawlingStrategy(
max_depth=2, # Crawl 2 levels deep
max_pages=10, # Limit to 10 pages total
url_scorer=scorer, # Use keyword scoring
include_external=False # Only internal links
)
# Configure browser and crawler
browser_config = BrowserConfig(
headless=True, # Run in background
verbose=False # Reduce output noise
)
crawler_config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False
)
print("\nStarting crawl of https://docs.python.org/3/")
print("Looking for pages with keywords: tutorial, guide, reference, documentation")
print("-" * 70)
crawled_urls = []
async with AsyncWebCrawler(config=browser_config) as crawler:
# Crawl and collect results
results = await crawler.arun(
url="https://docs.python.org/3/",
config=crawler_config
)
# Process results
if isinstance(results, list):
for result in results:
score = result.metadata.get('score', 0) if result.metadata else 0
depth = result.metadata.get('depth', 0) if result.metadata else 0
crawled_urls.append({
'url': result.url,
'score': score,
'depth': depth,
'success': result.success
})
print("\n" + "=" * 70)
print("CRAWL RESULTS (in order of crawling)")
print("=" * 70)
for i, item in enumerate(crawled_urls, 1):
status = "" if item['success'] else ""
# Highlight high-scoring pages
if item['score'] > 0.5:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print(f" ^ HIGH SCORE - Contains keywords!")
else:
print(f"{i:2}. [{status}] Score: {item['score']:.2f} | Depth: {item['depth']} | {item['url']}")
print("\n" + "=" * 70)
print("ANALYSIS")
print("=" * 70)
# Check if higher scores appear early in the crawl
scores = [item['score'] for item in crawled_urls[1:]] # Skip initial URL
high_score_indices = [i for i, s in enumerate(scores) if s > 0.3]
if high_score_indices and high_score_indices[0] < len(scores) / 2:
print("✅ SUCCESS: Higher-scoring pages (with keywords) were crawled early!")
print(" This confirms the priority queue fix is working.")
else:
print("⚠️ Check the crawl order above - higher scores should appear early")
# Show score distribution
print(f"\nScore Statistics:")
print(f" - Total pages crawled: {len(crawled_urls)}")
print(f" - Average score: {sum(item['score'] for item in crawled_urls) / len(crawled_urls):.2f}")
print(f" - Max score: {max(item['score'] for item in crawled_urls):.2f}")
print(f" - Pages with keywords: {sum(1 for item in crawled_urls if item['score'] > 0.3)}")
print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)
if __name__ == "__main__":
print("\n🔍 BestFirstCrawlingStrategy Simple Test\n")
asyncio.run(test_best_first_strategy())