Compare commits
1 Commits
main
...
fix/deep-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66925eb1d6 |
@@ -80,12 +80,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
source_url: str,
|
source_url: str,
|
||||||
current_depth: int,
|
current_depth: int,
|
||||||
visited: Set[str],
|
visited: Set[str],
|
||||||
next_links: List[Tuple[str, Optional[str]]],
|
next_links: List[Tuple[str, Optional[str], float]],
|
||||||
depths: Dict[str, int],
|
depths: Dict[str, int],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Extract links from the crawl result, validate them, and append new URLs
|
Extract links from the crawl result, validate them, score them,
|
||||||
(with their parent references) to next_links.
|
and append the highest-scoring URLs (with their parent references and scores) to next_links.
|
||||||
Also updates the depths dictionary.
|
Also updates the depths dictionary.
|
||||||
"""
|
"""
|
||||||
new_depth = current_depth + 1
|
new_depth = current_depth + 1
|
||||||
@@ -103,8 +103,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
if self.include_external:
|
if self.include_external:
|
||||||
links += result.links.get("external", [])
|
links += result.links.get("external", [])
|
||||||
|
|
||||||
# If we have more links than remaining capacity, limit how many we'll process
|
# Collect and validate all links
|
||||||
valid_links = []
|
valid_links_with_scores = []
|
||||||
for link in links:
|
for link in links:
|
||||||
url = link.get("href")
|
url = link.get("href")
|
||||||
base_url = normalize_url_for_deep_crawl(url, source_url)
|
base_url = normalize_url_for_deep_crawl(url, source_url)
|
||||||
@@ -113,18 +113,23 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
if not await self.can_process_url(url, new_depth):
|
if not await self.can_process_url(url, new_depth):
|
||||||
self.stats.urls_skipped += 1
|
self.stats.urls_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
valid_links.append(base_url)
|
|
||||||
|
|
||||||
# If we have more valid links than capacity, limit them
|
# Score the URL
|
||||||
if len(valid_links) > remaining_capacity:
|
score = self.url_scorer.score(base_url) if self.url_scorer else 0.0
|
||||||
valid_links = valid_links[:remaining_capacity]
|
valid_links_with_scores.append((base_url, score))
|
||||||
self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit")
|
|
||||||
|
# Sort by score descending (highest scores first)
|
||||||
|
valid_links_with_scores.sort(key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
|
# If we have more valid links than capacity, keep only the highest-scoring ones
|
||||||
|
if len(valid_links_with_scores) > remaining_capacity:
|
||||||
|
self.logger.info(f"Keeping top {remaining_capacity} highest-scoring URLs out of {len(valid_links_with_scores)} valid links")
|
||||||
|
valid_links_with_scores = valid_links_with_scores[:remaining_capacity]
|
||||||
|
|
||||||
# Record the new depths and add to next_links
|
# Record the new depths and add to next_links with scores
|
||||||
for url in valid_links:
|
for url, score in valid_links_with_scores:
|
||||||
depths[url] = new_depth
|
depths[url] = new_depth
|
||||||
next_links.append((url, source_url))
|
next_links.append((url, source_url, score))
|
||||||
|
|
||||||
async def _arun_best_first(
|
async def _arun_best_first(
|
||||||
self,
|
self,
|
||||||
@@ -135,12 +140,13 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
"""
|
"""
|
||||||
Core best-first crawl method using a priority queue.
|
Core best-first crawl method using a priority queue.
|
||||||
|
|
||||||
The queue items are tuples of (score, depth, url, parent_url). Lower scores
|
The queue items are tuples of (priority, depth, url, parent_url, original_score).
|
||||||
are treated as higher priority. URLs are processed in batches for efficiency.
|
We use negative scores as priority to achieve max-heap behavior (higher scores = higher priority).
|
||||||
|
URLs are processed in batches for efficiency.
|
||||||
"""
|
"""
|
||||||
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
|
queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
|
||||||
# Push the initial URL with score 0 and depth 0.
|
# Push the initial URL with priority 0 (will be processed first) and depth 0.
|
||||||
await queue.put((0, 0, start_url, None))
|
await queue.put((0, 0, start_url, None, 0.0))
|
||||||
visited: Set[str] = set()
|
visited: Set[str] = set()
|
||||||
depths: Dict[str, int] = {start_url: 0}
|
depths: Dict[str, int] = {start_url: 0}
|
||||||
|
|
||||||
@@ -158,17 +164,17 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
|
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
|
||||||
break
|
break
|
||||||
|
|
||||||
batch: List[Tuple[float, int, str, Optional[str]]] = []
|
batch: List[Tuple[float, int, str, Optional[str], float]] = []
|
||||||
# Retrieve up to BATCH_SIZE items from the priority queue.
|
# Retrieve up to BATCH_SIZE items from the priority queue.
|
||||||
for _ in range(BATCH_SIZE):
|
for _ in range(batch_size):
|
||||||
if queue.empty():
|
if queue.empty():
|
||||||
break
|
break
|
||||||
item = await queue.get()
|
item = await queue.get()
|
||||||
score, depth, url, parent_url = item
|
priority, depth, url, parent_url, original_score = item
|
||||||
if url in visited:
|
if url in visited:
|
||||||
continue
|
continue
|
||||||
visited.add(url)
|
visited.add(url)
|
||||||
batch.append(item)
|
batch.append((priority, depth, url, parent_url, original_score))
|
||||||
|
|
||||||
if not batch:
|
if not batch:
|
||||||
continue
|
continue
|
||||||
@@ -183,11 +189,11 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
corresponding = next((item for item in batch if item[2] == result_url), None)
|
corresponding = next((item for item in batch if item[2] == result_url), None)
|
||||||
if not corresponding:
|
if not corresponding:
|
||||||
continue
|
continue
|
||||||
score, depth, url, parent_url = corresponding
|
priority, depth, url, parent_url, original_score = corresponding
|
||||||
result.metadata = result.metadata or {}
|
result.metadata = result.metadata or {}
|
||||||
result.metadata["depth"] = depth
|
result.metadata["depth"] = depth
|
||||||
result.metadata["parent_url"] = parent_url
|
result.metadata["parent_url"] = parent_url
|
||||||
result.metadata["score"] = score
|
result.metadata["score"] = original_score
|
||||||
|
|
||||||
# Count only successful crawls toward max_pages limit
|
# Count only successful crawls toward max_pages limit
|
||||||
if result.success:
|
if result.success:
|
||||||
@@ -202,13 +208,14 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
|
|||||||
# Only discover links from successful crawls
|
# Only discover links from successful crawls
|
||||||
if result.success:
|
if result.success:
|
||||||
# Discover new links from this result
|
# Discover new links from this result
|
||||||
new_links: List[Tuple[str, Optional[str]]] = []
|
new_links: List[Tuple[str, Optional[str], float]] = []
|
||||||
await self.link_discovery(result, result_url, depth, visited, new_links, depths)
|
await self.link_discovery(result, result_url, depth, visited, new_links, depths)
|
||||||
|
|
||||||
for new_url, new_parent in new_links:
|
for new_url, new_parent, new_score in new_links:
|
||||||
new_depth = depths.get(new_url, depth + 1)
|
new_depth = depths.get(new_url, depth + 1)
|
||||||
new_score = self.url_scorer.score(new_url) if self.url_scorer else 0
|
# Use negative score as priority for max-heap behavior
|
||||||
await queue.put((new_score, new_depth, new_url, new_parent))
|
priority = -new_score if new_score > 0 else 0
|
||||||
|
await queue.put((priority, new_depth, new_url, new_parent, new_score))
|
||||||
|
|
||||||
# End of crawl.
|
# End of crawl.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user