Add cancellation support for deep crawl strategies
- Add should_cancel callback parameter to BFS, DFS, and BestFirst strategies - Add cancel() method for immediate cancellation (thread-safe) - Add cancelled property to check cancellation status - Add _check_cancellation() internal method supporting both sync/async callbacks - Reset cancel event on strategy reuse for multiple crawls - Include cancelled flag in state notifications via on_state_change - Handle callback exceptions gracefully (fail-open, log warning) - Add comprehensive test suite with 26 tests covering all edge cases This enables external callers (e.g., cloud platforms) to stop a running deep crawl mid-execution and retrieve partial results.
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable
|
||||
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ..models import TraversalStats
|
||||
@@ -34,6 +34,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
# Optional resume/callback parameters for crash recovery
|
||||
resume_state: Optional[Dict[str, Any]] = None,
|
||||
on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
|
||||
# Optional cancellation callback - checked before each URL is processed
|
||||
should_cancel: Optional[Callable[[], Union[bool, Awaitable[bool]]]] = None,
|
||||
):
|
||||
self.max_depth = max_depth
|
||||
self.filter_chain = filter_chain
|
||||
@@ -54,6 +56,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
# Store for use in arun methods
|
||||
self._resume_state = resume_state
|
||||
self._on_state_change = on_state_change
|
||||
self._should_cancel = should_cancel
|
||||
self._last_state: Optional[Dict[str, Any]] = None
|
||||
|
||||
async def can_process_url(self, url: str, depth: int) -> bool:
|
||||
@@ -78,6 +81,55 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
|
||||
return True
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""
|
||||
Cancel the crawl. Thread-safe, can be called from any context.
|
||||
|
||||
The crawl will stop before processing the next URL. The current URL
|
||||
being processed (if any) will complete before the crawl stops.
|
||||
"""
|
||||
self._cancel_event.set()
|
||||
|
||||
@property
|
||||
def cancelled(self) -> bool:
|
||||
"""
|
||||
Check if the crawl was/is cancelled. Thread-safe.
|
||||
|
||||
Returns:
|
||||
True if the crawl has been cancelled, False otherwise.
|
||||
"""
|
||||
return self._cancel_event.is_set()
|
||||
|
||||
async def _check_cancellation(self) -> bool:
|
||||
"""
|
||||
Check if crawl should be cancelled.
|
||||
|
||||
Handles both internal cancel flag and external should_cancel callback.
|
||||
Supports both sync and async callbacks.
|
||||
|
||||
Returns:
|
||||
True if crawl should be cancelled, False otherwise.
|
||||
"""
|
||||
if self._cancel_event.is_set():
|
||||
return True
|
||||
|
||||
if self._should_cancel:
|
||||
try:
|
||||
# Handle both sync and async callbacks
|
||||
result = self._should_cancel()
|
||||
if asyncio.iscoroutine(result):
|
||||
result = await result
|
||||
|
||||
if result:
|
||||
self._cancel_event.set()
|
||||
self.stats.end_time = datetime.now()
|
||||
return True
|
||||
except Exception as e:
|
||||
# Fail-open: log warning and continue crawling
|
||||
self.logger.warning(f"should_cancel callback error: {e}")
|
||||
|
||||
return False
|
||||
|
||||
async def link_discovery(
|
||||
self,
|
||||
result: CrawlResult,
|
||||
@@ -162,6 +214,9 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
Batch (non-streaming) mode:
|
||||
Processes one BFS level at a time, then yields all the results.
|
||||
"""
|
||||
# Reset cancel event for strategy reuse
|
||||
self._cancel_event = asyncio.Event()
|
||||
|
||||
# Conditional state initialization for resume support
|
||||
if self._resume_state:
|
||||
visited = set(self._resume_state.get("visited", []))
|
||||
@@ -185,7 +240,12 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
if self._pages_crawled >= self.max_pages:
|
||||
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
|
||||
break
|
||||
|
||||
|
||||
# Check external cancellation callback before processing this level
|
||||
if await self._check_cancellation():
|
||||
self.logger.info("Crawl cancelled by user")
|
||||
break
|
||||
|
||||
next_level: List[Tuple[str, Optional[str]]] = []
|
||||
urls = [url for url, _ in current_level]
|
||||
|
||||
@@ -218,12 +278,26 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in next_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
"cancelled": self._cancel_event.is_set(),
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
|
||||
current_level = next_level
|
||||
|
||||
# Final state update if cancelled
|
||||
if self._cancel_event.is_set() and self._on_state_change:
|
||||
state = {
|
||||
"strategy_type": "bfs",
|
||||
"visited": list(visited),
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in current_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
"cancelled": True,
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
|
||||
return results
|
||||
|
||||
async def _arun_stream(
|
||||
@@ -236,6 +310,9 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
Streaming mode:
|
||||
Processes one BFS level at a time and yields results immediately as they arrive.
|
||||
"""
|
||||
# Reset cancel event for strategy reuse
|
||||
self._cancel_event = asyncio.Event()
|
||||
|
||||
# Conditional state initialization for resume support
|
||||
if self._resume_state:
|
||||
visited = set(self._resume_state.get("visited", []))
|
||||
@@ -252,6 +329,11 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
depths: Dict[str, int] = {start_url: 0}
|
||||
|
||||
while current_level and not self._cancel_event.is_set():
|
||||
# Check external cancellation callback before processing this level
|
||||
if await self._check_cancellation():
|
||||
self.logger.info("Crawl cancelled by user")
|
||||
break
|
||||
|
||||
next_level: List[Tuple[str, Optional[str]]] = []
|
||||
urls = [url for url, _ in current_level]
|
||||
visited.update(urls)
|
||||
@@ -293,6 +375,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in next_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
"cancelled": self._cancel_event.is_set(),
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
@@ -301,9 +384,22 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
|
||||
# by considering these URLs as visited but not counting them toward the max_pages limit
|
||||
if results_count == 0 and urls:
|
||||
self.logger.warning(f"No results returned for {len(urls)} URLs, marking as visited")
|
||||
|
||||
|
||||
current_level = next_level
|
||||
|
||||
# Final state update if cancelled
|
||||
if self._cancel_event.is_set() and self._on_state_change:
|
||||
state = {
|
||||
"strategy_type": "bfs",
|
||||
"visited": list(visited),
|
||||
"pending": [{"url": u, "parent_url": p} for u, p in current_level],
|
||||
"depths": depths,
|
||||
"pages_crawled": self._pages_crawled,
|
||||
"cancelled": True,
|
||||
}
|
||||
self._last_state = state
|
||||
await self._on_state_change(state)
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""
|
||||
Clean up resources and signal cancellation of the crawl.
|
||||
|
||||
Reference in New Issue
Block a user