Add crash recovery for deep crawl strategies
Add optional resume_state and on_state_change parameters to all deep crawl strategies (BFS, DFS, Best-First) for cloud deployment crash recovery. Features: - resume_state: Pass saved state to resume from checkpoint - on_state_change: Async callback fired after each URL for real-time state persistence to external storage (Redis, DB, etc.) - export_state(): Get last captured state manually - Zero overhead when features are disabled (None defaults) State includes visited URLs, pending queue/stack, depths, and pages_crawled count. All state is JSON-serializable.
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
|
||||
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ..models import TraversalStats
|
||||
@@ -26,11 +26,14 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
self,
|
||||
max_depth: int,
|
||||
filter_chain: FilterChain = FilterChain(),
|
||||
url_scorer: Optional[URLScorer] = None,
|
||||
url_scorer: Optional[URLScorer] = None,
|
||||
include_external: bool = False,
|
||||
score_threshold: float = -infinity,
|
||||
max_pages: int = infinity,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
# Optional resume/callback parameters for crash recovery
|
||||
resume_state: Optional[Dict[str, Any]] = None,
|
||||
on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
|
||||
):
|
||||
self.max_depth = max_depth
|
||||
self.filter_chain = filter_chain
|
||||
@@ -48,6 +51,10 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
self.stats = TraversalStats(start_time=datetime.now())
|
||||
self._cancel_event = asyncio.Event()
|
||||
self._pages_crawled = 0
|
||||
# Store for use in arun methods
|
||||
self._resume_state = resume_state
|
||||
self._on_state_change = on_state_change
|
||||
self._last_state: Optional[Dict[str, Any]] = None
|
||||
|
||||
async def can_process_url(self, url: str, depth: int) -> bool:
|
||||
"""
|
||||
@@ -155,10 +162,21 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
Batch (non-streaming) mode:
|
||||
Processes one BFS level at a time, then yields all the results.
|
||||
"""
|
||||
visited: Set[str] = set()
|
||||
# current_level holds tuples: (url, parent_url)
|
||||
current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)]
|
||||
depths: Dict[str, int] = {start_url: 0}
|
||||
# Conditional state initialization for resume support
|
||||
if self._resume_state:
|
||||
visited = set(self._resume_state.get("visited", []))
|
||||
current_level = [
|
||||
(item["url"], item["parent_url"])
|
||||
for item in self._resume_state.get("pending", [])
|
||||
]
|
||||
depths = dict(self._resume_state.get("depths", {}))
|
||||
self._pages_crawled = self._resume_state.get("pages_crawled", 0)
|
||||
else:
|
||||
# Original initialization
|
||||
visited: Set[str] = set()
|
||||
# current_level holds tuples: (url, parent_url)
|
||||
current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)]
|
||||
depths: Dict[str, int] = {start_url: 0}
|
||||
|
||||
results: List[CrawlResult] = []
|
||||
|
||||
@@ -174,11 +192,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
# Clone the config to disable deep crawling recursion and enforce batch mode.
|
||||
batch_config = config.clone(deep_crawl_strategy=None, stream=False)
|
||||
batch_results = await crawler.arun_many(urls=urls, config=batch_config)
|
||||
|
||||
# Update pages crawled counter - count only successful crawls
|
||||
successful_results = [r for r in batch_results if r.success]
|
||||
self._pages_crawled += len(successful_results)
|
||||
|
||||
|
||||
for result in batch_results:
|
||||
url = result.url
|
||||
depth = depths.get(url, 0)
|
||||
@@ -187,12 +201,27 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
parent_url = next((parent for (u, parent) in current_level if u == url), None)
|
||||
result.metadata["parent_url"] = parent_url
|
||||
results.append(result)
|
||||
|
||||
|
||||
# Only discover links from successful crawls
|
||||
if result.success:
|
||||
# Increment pages crawled per URL for accurate state tracking
|
||||
self._pages_crawled += 1
|
||||
|
||||
# Link discovery will handle the max pages limit internally
|
||||
await self.link_discovery(result, url, depth, visited, next_level, depths)
|
||||
|
||||
# Capture state after EACH URL processed (if callback set)
|
||||
if self._on_state_change:
|
||||
state = {
|
||||
"strategy_type": "bfs",
|
||||
"visited": list(visited),
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in next_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
|
||||
current_level = next_level
|
||||
|
||||
return results
|
||||
@@ -207,9 +236,20 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
Streaming mode:
|
||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||
"""
|
||||
visited: Set[str] = set()
|
||||
current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)]
|
||||
depths: Dict[str, int] = {start_url: 0}
|
||||
# Conditional state initialization for resume support
|
||||
if self._resume_state:
|
||||
visited = set(self._resume_state.get("visited", []))
|
||||
current_level = [
|
||||
(item["url"], item["parent_url"])
|
||||
for item in self._resume_state.get("pending", [])
|
||||
]
|
||||
depths = dict(self._resume_state.get("depths", {}))
|
||||
self._pages_crawled = self._resume_state.get("pages_crawled", 0)
|
||||
else:
|
||||
# Original initialization
|
||||
visited: Set[str] = set()
|
||||
current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)]
|
||||
depths: Dict[str, int] = {start_url: 0}
|
||||
|
||||
while current_level and not self._cancel_event.is_set():
|
||||
next_level: List[Tuple[str, Optional[str]]] = []
|
||||
@@ -244,7 +284,19 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
if result.success:
|
||||
# Link discovery will handle the max pages limit internally
|
||||
await self.link_discovery(result, url, depth, visited, next_level, depths)
|
||||
|
||||
|
||||
# Capture state after EACH URL processed (if callback set)
|
||||
if self._on_state_change:
|
||||
state = {
|
||||
"strategy_type": "bfs",
|
||||
"visited": list(visited),
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in next_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
|
||||
# If we didn't get results back (e.g. due to errors), avoid getting stuck in an infinite loop
|
||||
# by considering these URLs as visited but not counting them toward the max_pages limit
|
||||
if results_count == 0 and urls:
|
||||
@@ -258,3 +310,15 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
"""
|
||||
self._cancel_event.set()
|
||||
self.stats.end_time = datetime.now()
|
||||
|
||||
def export_state(self) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Export current crawl state for external persistence.
|
||||
|
||||
Note: This returns the last captured state. For real-time state,
|
||||
use the on_state_change callback.
|
||||
|
||||
Returns:
|
||||
Dict with strategy state, or None if no state captured yet.
|
||||
"""
|
||||
return self._last_state
|
||||
|
||||
Reference in New Issue
Block a user