refactor(deep-crawl): add max_pages limit and improve crawl control

Add max_pages parameter to all deep crawling strategies to limit total pages crawled.
Add score_threshold parameter to BFS/DFS strategies for quality control.
Remove legacy parameter handling in AsyncWebCrawler.
Improve error handling and logging in crawl strategies.

BREAKING CHANGE: Removed support for legacy parameters in AsyncWebCrawler.run_many()
This commit is contained in:
UncleCode
2025-03-03 21:51:11 +08:00
parent c612f9a852
commit d024749633
7 changed files with 372 additions and 91 deletions

View File

@@ -24,17 +24,22 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self,
max_depth: int,
filter_chain: FilterChain = FilterChain(),
url_scorer: Optional[URLScorer] = None,
url_scorer: Optional[URLScorer] = None,
include_external: bool = False,
score_threshold: float = float('-inf'),
max_pages: int = float('inf'),
logger: Optional[logging.Logger] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.include_external = include_external
self.score_threshold = score_threshold
self.max_pages = max_pages
self.logger = logger or logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0
async def can_process_url(self, url: str, depth: int) -> bool:
"""
@@ -72,16 +77,25 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
prepares the next level of URLs.
Each valid URL is appended to next_level as a tuple (url, parent_url)
and its depth is tracked.
"""
"""
next_depth = current_depth + 1
if next_depth > self.max_depth:
return
# If we've reached the max pages limit, don't discover new links
remaining_capacity = self.max_pages - self._pages_crawled
if remaining_capacity <= 0:
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping link discovery")
return
# Get internal links and, if enabled, external links.
links = result.links.get("internal", [])
if self.include_external:
links += result.links.get("external", [])
valid_links = []
# First collect all valid links
for link in links:
url = link.get("href")
if url in visited:
@@ -90,10 +104,29 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.stats.urls_skipped += 1
continue
# Score the URL if a scorer is provided. In this simple BFS
# the score is not used for ordering.
# Score the URL if a scorer is provided
score = self.url_scorer.score(url) if self.url_scorer else 0
# attach the score to metadata if needed.
# Skip URLs with scores below the threshold
if score < self.score_threshold:
self.logger.debug(f"URL {url} skipped: score {score} below threshold {self.score_threshold}")
self.stats.urls_skipped += 1
continue
valid_links.append((url, score))
# If we have more valid links than capacity, sort by score and take the top ones
if len(valid_links) > remaining_capacity:
if self.url_scorer:
# Sort by score in descending order
valid_links.sort(key=lambda x: x[1], reverse=True)
# Take only as many as we have capacity for
valid_links = valid_links[:remaining_capacity]
self.logger.info(f"Limiting to {remaining_capacity} URLs due to max_pages limit")
# Process the final selected links
for url, score in valid_links:
# attach the score to metadata if needed
if score:
result.metadata = result.metadata or {}
result.metadata["score"] = score
@@ -125,7 +158,11 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
# Clone the config to disable deep crawling recursion and enforce batch mode.
batch_config = config.clone(deep_crawl_strategy=None, stream=False)
batch_results = await crawler.arun_many(urls=urls, config=batch_config)
# Update pages crawled counter - count only successful crawls
successful_results = [r for r in batch_results if r.success]
self._pages_crawled += len(successful_results)
for result in batch_results:
url = result.url
depth = depths.get(url, 0)
@@ -134,7 +171,11 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
parent_url = next((parent for (u, parent) in current_level if u == url), None)
result.metadata["parent_url"] = parent_url
results.append(result)
await self.link_discovery(result, url, depth, visited, next_level, depths)
# Only discover links from successful crawls
if result.success:
# Link discovery will handle the max pages limit internally
await self.link_discovery(result, url, depth, visited, next_level, depths)
current_level = next_level
@@ -161,6 +202,9 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
stream_config = config.clone(deep_crawl_strategy=None, stream=True)
stream_gen = await crawler.arun_many(urls=urls, config=stream_config)
# Keep track of processed results for this batch
results_count = 0
async for result in stream_gen:
url = result.url
depth = depths.get(url, 0)
@@ -168,9 +212,24 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
result.metadata["depth"] = depth
parent_url = next((parent for (u, parent) in current_level if u == url), None)
result.metadata["parent_url"] = parent_url
# Count only successful crawls
if result.success:
self._pages_crawled += 1
results_count += 1
yield result
await self.link_discovery(result, url, depth, visited, next_level, depths)
# Only discover links from successful crawls
if result.success:
# Link discovery will handle the max pages limit internally
await self.link_discovery(result, url, depth, visited, next_level, depths)
# If we didn't get results back (e.g. due to errors), avoid getting stuck in an infinite loop
# by considering these URLs as visited but not counting them toward the max_pages limit
if results_count == 0 and urls:
self.logger.warning(f"No results returned for {len(urls)} URLs, marking as visited")
current_level = next_level
async def shutdown(self) -> None: