diff --git a/crawl4ai/deep_crawling/bff_strategy.py b/crawl4ai/deep_crawling/bff_strategy.py index 58209bcb..fdb96248 100644 --- a/crawl4ai/deep_crawling/bff_strategy.py +++ b/crawl4ai/deep_crawling/bff_strategy.py @@ -2,7 +2,7 @@ import asyncio import logging from datetime import datetime -from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable from urllib.parse import urlparse from ..models import TraversalStats @@ -41,6 +41,9 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): include_external: bool = False, max_pages: int = infinity, logger: Optional[logging.Logger] = None, + # Optional resume/callback parameters for crash recovery + resume_state: Optional[Dict[str, Any]] = None, + on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None, ): self.max_depth = max_depth self.filter_chain = filter_chain @@ -57,6 +60,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): self.stats = TraversalStats(start_time=datetime.now()) self._cancel_event = asyncio.Event() self._pages_crawled = 0 + # Store for use in arun methods + self._resume_state = resume_state + self._on_state_change = on_state_change + self._last_state: Optional[Dict[str, Any]] = None + # Shadow list for queue items (only used when on_state_change is set) + self._queue_shadow: Optional[List[Tuple[float, int, str, Optional[str]]]] = None async def can_process_url(self, url: str, depth: int) -> bool: """ @@ -135,16 +144,36 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): ) -> AsyncGenerator[CrawlResult, None]: """ Core best-first crawl method using a priority queue. - + The queue items are tuples of (score, depth, url, parent_url). Lower scores are treated as higher priority. URLs are processed in batches for efficiency. """ queue: asyncio.PriorityQueue = asyncio.PriorityQueue() - # Push the initial URL with score 0 and depth 0. - initial_score = self.url_scorer.score(start_url) if self.url_scorer else 0 - await queue.put((-initial_score, 0, start_url, None)) - visited: Set[str] = set() - depths: Dict[str, int] = {start_url: 0} + + # Conditional state initialization for resume support + if self._resume_state: + visited = set(self._resume_state.get("visited", [])) + depths = dict(self._resume_state.get("depths", {})) + self._pages_crawled = self._resume_state.get("pages_crawled", 0) + # Restore queue from saved items + queue_items = self._resume_state.get("queue_items", []) + for item in queue_items: + await queue.put((item["score"], item["depth"], item["url"], item["parent_url"])) + # Initialize shadow list if callback is set + if self._on_state_change: + self._queue_shadow = [ + (item["score"], item["depth"], item["url"], item["parent_url"]) + for item in queue_items + ] + else: + # Original initialization + initial_score = self.url_scorer.score(start_url) if self.url_scorer else 0 + await queue.put((-initial_score, 0, start_url, None)) + visited: Set[str] = set() + depths: Dict[str, int] = {start_url: 0} + # Initialize shadow list if callback is set + if self._on_state_change: + self._queue_shadow = [(-initial_score, 0, start_url, None)] while not queue.empty() and not self._cancel_event.is_set(): # Stop if we've reached the max pages limit @@ -166,6 +195,12 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): if queue.empty(): break item = await queue.get() + # Remove from shadow list if tracking + if self._on_state_change and self._queue_shadow is not None: + try: + self._queue_shadow.remove(item) + except ValueError: + pass # Item may have been removed already score, depth, url, parent_url = item if url in visited: continue @@ -210,7 +245,26 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): for new_url, new_parent in new_links: new_depth = depths.get(new_url, depth + 1) new_score = self.url_scorer.score(new_url) if self.url_scorer else 0 - await queue.put((-new_score, new_depth, new_url, new_parent)) + queue_item = (-new_score, new_depth, new_url, new_parent) + await queue.put(queue_item) + # Add to shadow list if tracking + if self._on_state_change and self._queue_shadow is not None: + self._queue_shadow.append(queue_item) + + # Capture state after EACH URL processed (if callback set) + if self._on_state_change and self._queue_shadow is not None: + state = { + "strategy_type": "best_first", + "visited": list(visited), + "queue_items": [ + {"score": s, "depth": d, "url": u, "parent_url": p} + for s, d, u, p in self._queue_shadow + ], + "depths": depths, + "pages_crawled": self._pages_crawled, + } + self._last_state = state + await self._on_state_change(state) # End of crawl. @@ -269,3 +323,15 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy): """ self._cancel_event.set() self.stats.end_time = datetime.now() + + def export_state(self) -> Optional[Dict[str, Any]]: + """ + Export current crawl state for external persistence. + + Note: This returns the last captured state. For real-time state, + use the on_state_change callback. + + Returns: + Dict with strategy state, or None if no state captured yet. + """ + return self._last_state diff --git a/crawl4ai/deep_crawling/bfs_strategy.py b/crawl4ai/deep_crawling/bfs_strategy.py index eb699f82..35b66939 100644 --- a/crawl4ai/deep_crawling/bfs_strategy.py +++ b/crawl4ai/deep_crawling/bfs_strategy.py @@ -2,7 +2,7 @@ import asyncio import logging from datetime import datetime -from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable from urllib.parse import urlparse from ..models import TraversalStats @@ -26,11 +26,14 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): self, max_depth: int, filter_chain: FilterChain = FilterChain(), - url_scorer: Optional[URLScorer] = None, + url_scorer: Optional[URLScorer] = None, include_external: bool = False, score_threshold: float = -infinity, max_pages: int = infinity, logger: Optional[logging.Logger] = None, + # Optional resume/callback parameters for crash recovery + resume_state: Optional[Dict[str, Any]] = None, + on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None, ): self.max_depth = max_depth self.filter_chain = filter_chain @@ -48,6 +51,10 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): self.stats = TraversalStats(start_time=datetime.now()) self._cancel_event = asyncio.Event() self._pages_crawled = 0 + # Store for use in arun methods + self._resume_state = resume_state + self._on_state_change = on_state_change + self._last_state: Optional[Dict[str, Any]] = None async def can_process_url(self, url: str, depth: int) -> bool: """ @@ -155,10 +162,21 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): Batch (non-streaming) mode: Processes one BFS level at a time, then yields all the results. """ - visited: Set[str] = set() - # current_level holds tuples: (url, parent_url) - current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)] - depths: Dict[str, int] = {start_url: 0} + # Conditional state initialization for resume support + if self._resume_state: + visited = set(self._resume_state.get("visited", [])) + current_level = [ + (item["url"], item["parent_url"]) + for item in self._resume_state.get("pending", []) + ] + depths = dict(self._resume_state.get("depths", {})) + self._pages_crawled = self._resume_state.get("pages_crawled", 0) + else: + # Original initialization + visited: Set[str] = set() + # current_level holds tuples: (url, parent_url) + current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)] + depths: Dict[str, int] = {start_url: 0} results: List[CrawlResult] = [] @@ -174,11 +192,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): # Clone the config to disable deep crawling recursion and enforce batch mode. batch_config = config.clone(deep_crawl_strategy=None, stream=False) batch_results = await crawler.arun_many(urls=urls, config=batch_config) - - # Update pages crawled counter - count only successful crawls - successful_results = [r for r in batch_results if r.success] - self._pages_crawled += len(successful_results) - + for result in batch_results: url = result.url depth = depths.get(url, 0) @@ -187,12 +201,27 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): parent_url = next((parent for (u, parent) in current_level if u == url), None) result.metadata["parent_url"] = parent_url results.append(result) - + # Only discover links from successful crawls if result.success: + # Increment pages crawled per URL for accurate state tracking + self._pages_crawled += 1 + # Link discovery will handle the max pages limit internally await self.link_discovery(result, url, depth, visited, next_level, depths) + # Capture state after EACH URL processed (if callback set) + if self._on_state_change: + state = { + "strategy_type": "bfs", + "visited": list(visited), + "pending": [{"url": u, "parent_url": p} for u, p in next_level], + "depths": depths, + "pages_crawled": self._pages_crawled, + } + self._last_state = state + await self._on_state_change(state) + current_level = next_level return results @@ -207,9 +236,20 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): Streaming mode: Processes one BFS level at a time and yields results immediately as they arrive. """ - visited: Set[str] = set() - current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)] - depths: Dict[str, int] = {start_url: 0} + # Conditional state initialization for resume support + if self._resume_state: + visited = set(self._resume_state.get("visited", [])) + current_level = [ + (item["url"], item["parent_url"]) + for item in self._resume_state.get("pending", []) + ] + depths = dict(self._resume_state.get("depths", {})) + self._pages_crawled = self._resume_state.get("pages_crawled", 0) + else: + # Original initialization + visited: Set[str] = set() + current_level: List[Tuple[str, Optional[str]]] = [(start_url, None)] + depths: Dict[str, int] = {start_url: 0} while current_level and not self._cancel_event.is_set(): next_level: List[Tuple[str, Optional[str]]] = [] @@ -244,7 +284,19 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): if result.success: # Link discovery will handle the max pages limit internally await self.link_discovery(result, url, depth, visited, next_level, depths) - + + # Capture state after EACH URL processed (if callback set) + if self._on_state_change: + state = { + "strategy_type": "bfs", + "visited": list(visited), + "pending": [{"url": u, "parent_url": p} for u, p in next_level], + "depths": depths, + "pages_crawled": self._pages_crawled, + } + self._last_state = state + await self._on_state_change(state) + # If we didn't get results back (e.g. due to errors), avoid getting stuck in an infinite loop # by considering these URLs as visited but not counting them toward the max_pages limit if results_count == 0 and urls: @@ -258,3 +310,15 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): """ self._cancel_event.set() self.stats.end_time = datetime.now() + + def export_state(self) -> Optional[Dict[str, Any]]: + """ + Export current crawl state for external persistence. + + Note: This returns the last captured state. For real-time state, + use the on_state_change callback. + + Returns: + Dict with strategy state, or None if no state captured yet. + """ + return self._last_state diff --git a/crawl4ai/deep_crawling/dfs_strategy.py b/crawl4ai/deep_crawling/dfs_strategy.py index c710a2a5..d98d06a7 100644 --- a/crawl4ai/deep_crawling/dfs_strategy.py +++ b/crawl4ai/deep_crawling/dfs_strategy.py @@ -38,12 +38,25 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): in control of traversal. Every successful page bumps ``_pages_crawled`` and seeds new stack items discovered via :meth:`link_discovery`. """ - visited: Set[str] = set() - # Stack items: (url, parent_url, depth) - stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] - depths: Dict[str, int] = {start_url: 0} - results: List[CrawlResult] = [] - self._reset_seen(start_url) + # Conditional state initialization for resume support + if self._resume_state: + visited = set(self._resume_state.get("visited", [])) + stack = [ + (item["url"], item["parent_url"], item["depth"]) + for item in self._resume_state.get("stack", []) + ] + depths = dict(self._resume_state.get("depths", {})) + self._pages_crawled = self._resume_state.get("pages_crawled", 0) + self._dfs_seen = set(self._resume_state.get("dfs_seen", [])) + results: List[CrawlResult] = [] + else: + # Original initialization + visited: Set[str] = set() + # Stack items: (url, parent_url, depth) + stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] + depths: Dict[str, int] = {start_url: 0} + results: List[CrawlResult] = [] + self._reset_seen(start_url) while stack and not self._cancel_event.is_set(): url, parent, depth = stack.pop() @@ -79,6 +92,22 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): for new_url, new_parent in reversed(new_links): new_depth = depths.get(new_url, depth + 1) stack.append((new_url, new_parent, new_depth)) + + # Capture state after each URL processed (if callback set) + if self._on_state_change: + state = { + "strategy_type": "dfs", + "visited": list(visited), + "stack": [ + {"url": u, "parent_url": p, "depth": d} + for u, p, d in stack + ], + "depths": depths, + "pages_crawled": self._pages_crawled, + "dfs_seen": list(self._dfs_seen), + } + self._last_state = state + await self._on_state_change(state) return results async def _arun_stream( @@ -94,10 +123,22 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): yielded before we even look at the next stack entry. Successful crawls still feed :meth:`link_discovery`, keeping DFS order intact. """ - visited: Set[str] = set() - stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] - depths: Dict[str, int] = {start_url: 0} - self._reset_seen(start_url) + # Conditional state initialization for resume support + if self._resume_state: + visited = set(self._resume_state.get("visited", [])) + stack = [ + (item["url"], item["parent_url"], item["depth"]) + for item in self._resume_state.get("stack", []) + ] + depths = dict(self._resume_state.get("depths", {})) + self._pages_crawled = self._resume_state.get("pages_crawled", 0) + self._dfs_seen = set(self._resume_state.get("dfs_seen", [])) + else: + # Original initialization + visited: Set[str] = set() + stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] + depths: Dict[str, int] = {start_url: 0} + self._reset_seen(start_url) while stack and not self._cancel_event.is_set(): url, parent, depth = stack.pop() @@ -130,6 +171,22 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): new_depth = depths.get(new_url, depth + 1) stack.append((new_url, new_parent, new_depth)) + # Capture state after each URL processed (if callback set) + if self._on_state_change: + state = { + "strategy_type": "dfs", + "visited": list(visited), + "stack": [ + {"url": u, "parent_url": p, "depth": d} + for u, p, d in stack + ], + "depths": depths, + "pages_crawled": self._pages_crawled, + "dfs_seen": list(self._dfs_seen), + } + self._last_state = state + await self._on_state_change(state) + async def link_discovery( self, result: CrawlResult, diff --git a/tests/deep_crawling/__init__.py b/tests/deep_crawling/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/deep_crawling/test_deep_crawl_resume.py b/tests/deep_crawling/test_deep_crawl_resume.py new file mode 100644 index 00000000..4b41e0fe --- /dev/null +++ b/tests/deep_crawling/test_deep_crawl_resume.py @@ -0,0 +1,773 @@ +""" +Test Suite: Deep Crawl Resume/Crash Recovery Tests + +Tests that verify: +1. State export produces valid JSON-serializable data +2. Resume from checkpoint continues without duplicates +3. Simulated crash at various points recovers correctly +4. State callback fires at expected intervals +5. No damage to existing system behavior (regression tests) +""" + +import pytest +import asyncio +import json +from typing import Dict, Any, List +from unittest.mock import AsyncMock, MagicMock + +from crawl4ai.deep_crawling import ( + BFSDeepCrawlStrategy, + DFSDeepCrawlStrategy, + BestFirstCrawlingStrategy, + FilterChain, + URLPatternFilter, + DomainFilter, +) +from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer + + +# ============================================================================ +# Helper Functions for Mock Crawler +# ============================================================================ + +def create_mock_config(stream=False): + """Create a mock CrawlerRunConfig.""" + config = MagicMock() + config.clone = MagicMock(return_value=config) + config.stream = stream + return config + + +def create_mock_crawler_with_links(num_links: int = 3, include_keyword: bool = False): + """Create mock crawler that returns results with links.""" + call_count = 0 + + async def mock_arun_many(urls, config): + nonlocal call_count + results = [] + for url in urls: + call_count += 1 + result = MagicMock() + result.url = url + result.success = True + result.metadata = {} + + # Generate child links + links = [] + for i in range(num_links): + link_url = f"{url}/child{call_count}_{i}" + if include_keyword: + link_url = f"{url}/important-child{call_count}_{i}" + links.append({"href": link_url}) + + result.links = {"internal": links, "external": []} + results.append(result) + + # For streaming mode, return async generator + if config.stream: + async def gen(): + for r in results: + yield r + return gen() + return results + + crawler = MagicMock() + crawler.arun_many = mock_arun_many + return crawler + + +def create_mock_crawler_tracking(crawl_order: List[str], return_no_links: bool = False): + """Create mock crawler that tracks crawl order.""" + + async def mock_arun_many(urls, config): + results = [] + for url in urls: + crawl_order.append(url) + result = MagicMock() + result.url = url + result.success = True + result.metadata = {} + result.links = {"internal": [], "external": []} if return_no_links else {"internal": [{"href": f"{url}/child"}], "external": []} + results.append(result) + + # For streaming mode, return async generator + if config.stream: + async def gen(): + for r in results: + yield r + return gen() + return results + + crawler = MagicMock() + crawler.arun_many = mock_arun_many + return crawler + + +def create_simple_mock_crawler(): + """Basic mock crawler returning 1 result with 2 child links.""" + call_count = 0 + + async def mock_arun_many(urls, config): + nonlocal call_count + results = [] + for url in urls: + call_count += 1 + result = MagicMock() + result.url = url + result.success = True + result.metadata = {} + result.links = { + "internal": [ + {"href": f"{url}/child1"}, + {"href": f"{url}/child2"}, + ], + "external": [] + } + results.append(result) + + if config.stream: + async def gen(): + for r in results: + yield r + return gen() + return results + + crawler = MagicMock() + crawler.arun_many = mock_arun_many + return crawler + + +def create_mock_crawler_unlimited_links(): + """Mock crawler that always returns links (for testing limits).""" + async def mock_arun_many(urls, config): + results = [] + for url in urls: + result = MagicMock() + result.url = url + result.success = True + result.metadata = {} + result.links = { + "internal": [{"href": f"{url}/link{i}"} for i in range(10)], + "external": [] + } + results.append(result) + + if config.stream: + async def gen(): + for r in results: + yield r + return gen() + return results + + crawler = MagicMock() + crawler.arun_many = mock_arun_many + return crawler + + +# ============================================================================ +# TEST SUITE 1: Crash Recovery Tests +# ============================================================================ + +class TestBFSResume: + """BFS strategy resume tests.""" + + @pytest.mark.asyncio + async def test_state_export_json_serializable(self): + """Verify exported state can be JSON serialized.""" + captured_states: List[Dict] = [] + + async def capture_state(state: Dict[str, Any]): + # Verify JSON serializable + json_str = json.dumps(state) + parsed = json.loads(json_str) + captured_states.append(parsed) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=10, + on_state_change=capture_state, + ) + + # Create mock crawler that returns predictable results + mock_crawler = create_mock_crawler_with_links(num_links=3) + mock_config = create_mock_config() + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # Verify states were captured + assert len(captured_states) > 0 + + # Verify state structure + for state in captured_states: + assert state["strategy_type"] == "bfs" + assert "visited" in state + assert "pending" in state + assert "depths" in state + assert "pages_crawled" in state + assert isinstance(state["visited"], list) + assert isinstance(state["pending"], list) + assert isinstance(state["depths"], dict) + assert isinstance(state["pages_crawled"], int) + + @pytest.mark.asyncio + async def test_resume_continues_from_checkpoint(self): + """Verify resume starts from saved state, not beginning.""" + # Simulate state from previous crawl (visited 5 URLs, 3 pending) + saved_state = { + "strategy_type": "bfs", + "visited": [ + "https://example.com", + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3", + "https://example.com/page4", + ], + "pending": [ + {"url": "https://example.com/page5", "parent_url": "https://example.com/page2"}, + {"url": "https://example.com/page6", "parent_url": "https://example.com/page3"}, + {"url": "https://example.com/page7", "parent_url": "https://example.com/page3"}, + ], + "depths": { + "https://example.com": 0, + "https://example.com/page1": 1, + "https://example.com/page2": 1, + "https://example.com/page3": 1, + "https://example.com/page4": 1, + "https://example.com/page5": 2, + "https://example.com/page6": 2, + "https://example.com/page7": 2, + }, + "pages_crawled": 5, + } + + crawled_urls: List[str] = [] + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=20, + resume_state=saved_state, + ) + + # Verify internal state was restored + assert strategy._resume_state == saved_state + + mock_crawler = create_mock_crawler_tracking(crawled_urls, return_no_links=True) + mock_config = create_mock_config() + + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # Should NOT re-crawl already visited URLs + for visited_url in saved_state["visited"]: + assert visited_url not in crawled_urls, f"Re-crawled already visited: {visited_url}" + + # Should crawl pending URLs + for pending in saved_state["pending"]: + assert pending["url"] in crawled_urls, f"Did not crawl pending: {pending['url']}" + + @pytest.mark.asyncio + async def test_simulated_crash_mid_crawl(self): + """Simulate crash at URL N, verify resume continues from pending URLs.""" + crash_after = 3 + states_before_crash: List[Dict] = [] + + async def capture_until_crash(state: Dict[str, Any]): + states_before_crash.append(state) + if state["pages_crawled"] >= crash_after: + raise Exception("Simulated crash!") + + strategy1 = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=10, + on_state_change=capture_until_crash, + ) + + mock_crawler = create_mock_crawler_with_links(num_links=5) + mock_config = create_mock_config() + + # First crawl - crashes + with pytest.raises(Exception, match="Simulated crash"): + await strategy1._arun_batch("https://example.com", mock_crawler, mock_config) + + # Get last state before crash + last_state = states_before_crash[-1] + assert last_state["pages_crawled"] >= crash_after + + # Calculate which URLs were already crawled vs pending + pending_urls = {item["url"] for item in last_state["pending"]} + visited_urls = set(last_state["visited"]) + already_crawled_urls = visited_urls - pending_urls + + # Resume from checkpoint + crawled_in_resume: List[str] = [] + + strategy2 = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=10, + resume_state=last_state, + ) + + mock_crawler2 = create_mock_crawler_tracking(crawled_in_resume, return_no_links=True) + + await strategy2._arun_batch("https://example.com", mock_crawler2, mock_config) + + # Verify already-crawled URLs are not re-crawled + for crawled_url in already_crawled_urls: + assert crawled_url not in crawled_in_resume, f"Re-crawled already visited: {crawled_url}" + + # Verify pending URLs are crawled + for pending_url in pending_urls: + assert pending_url in crawled_in_resume, f"Did not crawl pending: {pending_url}" + + @pytest.mark.asyncio + async def test_callback_fires_per_url(self): + """Verify callback fires after each URL for maximum granularity.""" + callback_count = 0 + pages_crawled_sequence: List[int] = [] + + async def count_callbacks(state: Dict[str, Any]): + nonlocal callback_count + callback_count += 1 + pages_crawled_sequence.append(state["pages_crawled"]) + + strategy = BFSDeepCrawlStrategy( + max_depth=1, + max_pages=5, + on_state_change=count_callbacks, + ) + + mock_crawler = create_mock_crawler_with_links(num_links=2) + mock_config = create_mock_config() + + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # Callback should fire once per successful URL + assert callback_count == strategy._pages_crawled, \ + f"Callback fired {callback_count} times, expected {strategy._pages_crawled} (per URL)" + + # pages_crawled should increment by 1 each callback + for i, count in enumerate(pages_crawled_sequence): + assert count == i + 1, f"Expected pages_crawled={i+1} at callback {i}, got {count}" + + @pytest.mark.asyncio + async def test_export_state_returns_last_captured(self): + """Verify export_state() returns last captured state.""" + last_state = None + + async def capture(state): + nonlocal last_state + last_state = state + + strategy = BFSDeepCrawlStrategy(max_depth=2, max_pages=5, on_state_change=capture) + + mock_crawler = create_mock_crawler_with_links(num_links=2) + mock_config = create_mock_config() + + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + exported = strategy.export_state() + assert exported == last_state + + +class TestDFSResume: + """DFS strategy resume tests.""" + + @pytest.mark.asyncio + async def test_state_export_includes_stack_and_dfs_seen(self): + """Verify DFS state includes stack structure and _dfs_seen.""" + captured_states: List[Dict] = [] + + async def capture_state(state: Dict[str, Any]): + captured_states.append(state) + + strategy = DFSDeepCrawlStrategy( + max_depth=3, + max_pages=10, + on_state_change=capture_state, + ) + + mock_crawler = create_mock_crawler_with_links(num_links=2) + mock_config = create_mock_config() + + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + assert len(captured_states) > 0 + + for state in captured_states: + assert state["strategy_type"] == "dfs" + assert "stack" in state + assert "dfs_seen" in state + # Stack items should have depth + for item in state["stack"]: + assert "url" in item + assert "parent_url" in item + assert "depth" in item + + @pytest.mark.asyncio + async def test_resume_restores_stack_order(self): + """Verify DFS stack order is preserved on resume.""" + saved_state = { + "strategy_type": "dfs", + "visited": ["https://example.com"], + "stack": [ + {"url": "https://example.com/deep3", "parent_url": "https://example.com/deep2", "depth": 3}, + {"url": "https://example.com/deep2", "parent_url": "https://example.com/deep1", "depth": 2}, + {"url": "https://example.com/page1", "parent_url": "https://example.com", "depth": 1}, + ], + "depths": {"https://example.com": 0}, + "pages_crawled": 1, + "dfs_seen": ["https://example.com", "https://example.com/deep3", "https://example.com/deep2", "https://example.com/page1"], + } + + crawl_order: List[str] = [] + + strategy = DFSDeepCrawlStrategy( + max_depth=3, + max_pages=10, + resume_state=saved_state, + ) + + mock_crawler = create_mock_crawler_tracking(crawl_order, return_no_links=True) + mock_config = create_mock_config() + + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # DFS pops from end of stack, so order should be: page1, deep2, deep3 + assert crawl_order[0] == "https://example.com/page1" + assert crawl_order[1] == "https://example.com/deep2" + assert crawl_order[2] == "https://example.com/deep3" + + +class TestBestFirstResume: + """Best-First strategy resume tests.""" + + @pytest.mark.asyncio + async def test_state_export_includes_scored_queue(self): + """Verify Best-First state includes queue with scores.""" + captured_states: List[Dict] = [] + + async def capture_state(state: Dict[str, Any]): + captured_states.append(state) + + scorer = KeywordRelevanceScorer(keywords=["important"], weight=1.0) + + strategy = BestFirstCrawlingStrategy( + max_depth=2, + max_pages=10, + url_scorer=scorer, + on_state_change=capture_state, + ) + + mock_crawler = create_mock_crawler_with_links(num_links=3, include_keyword=True) + mock_config = create_mock_config(stream=True) + + async for _ in strategy._arun_stream("https://example.com", mock_crawler, mock_config): + pass + + assert len(captured_states) > 0 + + for state in captured_states: + assert state["strategy_type"] == "best_first" + assert "queue_items" in state + for item in state["queue_items"]: + assert "score" in item + assert "depth" in item + assert "url" in item + assert "parent_url" in item + + @pytest.mark.asyncio + async def test_resume_maintains_priority_order(self): + """Verify priority queue order is maintained on resume.""" + saved_state = { + "strategy_type": "best_first", + "visited": ["https://example.com"], + "queue_items": [ + {"score": -0.9, "depth": 1, "url": "https://example.com/high-priority", "parent_url": "https://example.com"}, + {"score": -0.5, "depth": 1, "url": "https://example.com/medium-priority", "parent_url": "https://example.com"}, + {"score": -0.1, "depth": 1, "url": "https://example.com/low-priority", "parent_url": "https://example.com"}, + ], + "depths": {"https://example.com": 0}, + "pages_crawled": 1, + } + + crawl_order: List[str] = [] + + strategy = BestFirstCrawlingStrategy( + max_depth=2, + max_pages=10, + resume_state=saved_state, + ) + + mock_crawler = create_mock_crawler_tracking(crawl_order, return_no_links=True) + mock_config = create_mock_config(stream=True) + + async for _ in strategy._arun_stream("https://example.com", mock_crawler, mock_config): + pass + + # Higher negative score = higher priority (min-heap) + # So -0.9 should be crawled first + assert crawl_order[0] == "https://example.com/high-priority" + + +class TestCrossStrategyResume: + """Tests that apply to all strategies.""" + + @pytest.mark.asyncio + @pytest.mark.parametrize("strategy_class,strategy_type", [ + (BFSDeepCrawlStrategy, "bfs"), + (DFSDeepCrawlStrategy, "dfs"), + (BestFirstCrawlingStrategy, "best_first"), + ]) + async def test_no_callback_means_no_overhead(self, strategy_class, strategy_type): + """Verify no state tracking when callback is None.""" + strategy = strategy_class(max_depth=2, max_pages=5) + + # _queue_shadow should be None for Best-First when no callback + if strategy_class == BestFirstCrawlingStrategy: + assert strategy._queue_shadow is None + + # _last_state should be None initially + assert strategy._last_state is None + + @pytest.mark.asyncio + @pytest.mark.parametrize("strategy_class", [ + BFSDeepCrawlStrategy, + DFSDeepCrawlStrategy, + BestFirstCrawlingStrategy, + ]) + async def test_export_state_returns_last_captured(self, strategy_class): + """Verify export_state() returns last captured state.""" + last_state = None + + async def capture(state): + nonlocal last_state + last_state = state + + strategy = strategy_class(max_depth=2, max_pages=5, on_state_change=capture) + + mock_crawler = create_mock_crawler_with_links(num_links=2) + + if strategy_class == BestFirstCrawlingStrategy: + mock_config = create_mock_config(stream=True) + async for _ in strategy._arun_stream("https://example.com", mock_crawler, mock_config): + pass + else: + mock_config = create_mock_config() + await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + exported = strategy.export_state() + assert exported == last_state + + +# ============================================================================ +# TEST SUITE 2: Regression Tests (No Damage to Current System) +# ============================================================================ + +class TestBFSRegressions: + """Ensure BFS works identically when new params not used.""" + + @pytest.mark.asyncio + async def test_default_params_unchanged(self): + """Constructor with only original params works.""" + strategy = BFSDeepCrawlStrategy( + max_depth=2, + include_external=False, + max_pages=10, + ) + + assert strategy.max_depth == 2 + assert strategy.include_external == False + assert strategy.max_pages == 10 + assert strategy._resume_state is None + assert strategy._on_state_change is None + + @pytest.mark.asyncio + async def test_filter_chain_still_works(self): + """FilterChain integration unchanged.""" + filter_chain = FilterChain([ + URLPatternFilter(patterns=["*/blog/*"]), + DomainFilter(allowed_domains=["example.com"]), + ]) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + filter_chain=filter_chain, + ) + + # Test filter still applies + assert await strategy.can_process_url("https://example.com/blog/post1", 1) == True + assert await strategy.can_process_url("https://other.com/blog/post1", 1) == False + + @pytest.mark.asyncio + async def test_url_scorer_still_works(self): + """URL scoring integration unchanged.""" + scorer = KeywordRelevanceScorer(keywords=["python", "tutorial"], weight=1.0) + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + url_scorer=scorer, + score_threshold=0.5, + ) + + assert strategy.url_scorer is not None + assert strategy.score_threshold == 0.5 + + # Scorer should work + score = scorer.score("https://example.com/python-tutorial") + assert score > 0 + + @pytest.mark.asyncio + async def test_batch_mode_returns_list(self): + """Batch mode still returns List[CrawlResult].""" + strategy = BFSDeepCrawlStrategy(max_depth=1, max_pages=5) + + mock_crawler = create_simple_mock_crawler() + mock_config = create_mock_config(stream=False) + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + assert isinstance(results, list) + assert len(results) > 0 + + @pytest.mark.asyncio + async def test_max_pages_limit_respected(self): + """max_pages limit still enforced.""" + strategy = BFSDeepCrawlStrategy(max_depth=10, max_pages=3) + + mock_crawler = create_mock_crawler_unlimited_links() + mock_config = create_mock_config() + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # Should stop at max_pages + assert strategy._pages_crawled <= 3 + + @pytest.mark.asyncio + async def test_max_depth_limit_respected(self): + """max_depth limit still enforced.""" + strategy = BFSDeepCrawlStrategy(max_depth=2, max_pages=100) + + mock_crawler = create_mock_crawler_unlimited_links() + mock_config = create_mock_config() + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # All results should have depth <= max_depth + for result in results: + assert result.metadata.get("depth", 0) <= 2 + + @pytest.mark.asyncio + async def test_metadata_depth_still_set(self): + """Result metadata still includes depth.""" + strategy = BFSDeepCrawlStrategy(max_depth=2, max_pages=5) + + mock_crawler = create_simple_mock_crawler() + mock_config = create_mock_config() + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + for result in results: + assert "depth" in result.metadata + assert isinstance(result.metadata["depth"], int) + + @pytest.mark.asyncio + async def test_metadata_parent_url_still_set(self): + """Result metadata still includes parent_url.""" + strategy = BFSDeepCrawlStrategy(max_depth=2, max_pages=5) + + mock_crawler = create_simple_mock_crawler() + mock_config = create_mock_config() + + results = await strategy._arun_batch("https://example.com", mock_crawler, mock_config) + + # First result (start URL) should have parent_url = None + assert results[0].metadata.get("parent_url") is None + + # Child results should have parent_url set + for result in results[1:]: + assert "parent_url" in result.metadata + + +class TestDFSRegressions: + """Ensure DFS works identically when new params not used.""" + + @pytest.mark.asyncio + async def test_inherits_bfs_params(self): + """DFS still inherits all BFS parameters.""" + strategy = DFSDeepCrawlStrategy( + max_depth=3, + include_external=True, + max_pages=20, + score_threshold=0.5, + ) + + assert strategy.max_depth == 3 + assert strategy.include_external == True + assert strategy.max_pages == 20 + assert strategy.score_threshold == 0.5 + + @pytest.mark.asyncio + async def test_dfs_seen_initialized(self): + """DFS _dfs_seen set still initialized.""" + strategy = DFSDeepCrawlStrategy(max_depth=2) + + assert hasattr(strategy, '_dfs_seen') + assert isinstance(strategy._dfs_seen, set) + + +class TestBestFirstRegressions: + """Ensure Best-First works identically when new params not used.""" + + @pytest.mark.asyncio + async def test_default_params_unchanged(self): + """Constructor with only original params works.""" + strategy = BestFirstCrawlingStrategy( + max_depth=2, + include_external=False, + max_pages=10, + ) + + assert strategy.max_depth == 2 + assert strategy.include_external == False + assert strategy.max_pages == 10 + assert strategy._resume_state is None + assert strategy._on_state_change is None + assert strategy._queue_shadow is None # Not initialized without callback + + @pytest.mark.asyncio + async def test_scorer_integration(self): + """URL scorer still affects crawl priority.""" + scorer = KeywordRelevanceScorer(keywords=["important"], weight=1.0) + + strategy = BestFirstCrawlingStrategy( + max_depth=2, + max_pages=10, + url_scorer=scorer, + ) + + assert strategy.url_scorer is scorer + + +class TestAPICompatibility: + """Ensure API/serialization compatibility.""" + + def test_strategy_signature_backward_compatible(self): + """Old code calling with positional/keyword args still works.""" + # Positional args (old style) + s1 = BFSDeepCrawlStrategy(2) + assert s1.max_depth == 2 + + # Keyword args (old style) + s2 = BFSDeepCrawlStrategy(max_depth=3, max_pages=10) + assert s2.max_depth == 3 + + # Mixed (old style) + s3 = BFSDeepCrawlStrategy(2, FilterChain(), None, False, float('-inf'), 100) + assert s3.max_depth == 2 + assert s3.max_pages == 100 + + def test_no_required_new_params(self): + """New params are optional, not required.""" + # Should not raise + BFSDeepCrawlStrategy(max_depth=2) + DFSDeepCrawlStrategy(max_depth=2) + BestFirstCrawlingStrategy(max_depth=2) diff --git a/tests/deep_crawling/test_deep_crawl_resume_integration.py b/tests/deep_crawling/test_deep_crawl_resume_integration.py new file mode 100644 index 00000000..1c7a8645 --- /dev/null +++ b/tests/deep_crawling/test_deep_crawl_resume_integration.py @@ -0,0 +1,162 @@ +""" +Integration Test: Deep Crawl Resume with Real URLs + +Tests the crash recovery feature using books.toscrape.com - a site +designed for scraping practice with a clear hierarchy: +- Home page → Category pages → Book detail pages +""" + +import pytest +import asyncio +import json +from typing import Dict, Any, List + +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig +from crawl4ai.deep_crawling import BFSDeepCrawlStrategy + + +class TestBFSResumeIntegration: + """Integration tests for BFS resume with real crawling.""" + + @pytest.mark.asyncio + async def test_real_crawl_state_capture_and_resume(self): + """ + Test crash recovery with real URLs from books.toscrape.com. + + Flow: + 1. Start crawl with state callback + 2. Stop after N pages (simulated crash) + 3. Resume from saved state + 4. Verify no duplicate crawls + """ + # Phase 1: Initial crawl that "crashes" after 3 pages + crash_after = 3 + captured_states: List[Dict[str, Any]] = [] + crawled_urls_phase1: List[str] = [] + + async def capture_state_until_crash(state: Dict[str, Any]): + captured_states.append(state) + crawled_urls_phase1.clear() + crawled_urls_phase1.extend(state["visited"]) + + if state["pages_crawled"] >= crash_after: + raise Exception("Simulated crash!") + + strategy1 = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=10, + on_state_change=capture_state_until_crash, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy1, + stream=False, + verbose=False, + ) + + async with AsyncWebCrawler(verbose=False) as crawler: + # First crawl - will crash after 3 pages + with pytest.raises(Exception, match="Simulated crash"): + await crawler.arun("https://books.toscrape.com", config=config) + + # Verify we captured state before crash + assert len(captured_states) > 0, "No states captured before crash" + last_state = captured_states[-1] + + print(f"\n=== Phase 1: Crashed after {last_state['pages_crawled']} pages ===") + print(f"Visited URLs: {len(last_state['visited'])}") + print(f"Pending URLs: {len(last_state['pending'])}") + + # Verify state structure + assert last_state["strategy_type"] == "bfs" + assert last_state["pages_crawled"] >= crash_after + assert len(last_state["visited"]) > 0 + assert "pending" in last_state + assert "depths" in last_state + + # Verify state is JSON serializable (important for Redis/DB storage) + json_str = json.dumps(last_state) + restored_state = json.loads(json_str) + assert restored_state == last_state, "State not JSON round-trip safe" + + # Phase 2: Resume from checkpoint + crawled_urls_phase2: List[str] = [] + + async def track_resumed_crawl(state: Dict[str, Any]): + # Track what's being crawled in phase 2 + new_visited = set(state["visited"]) - set(last_state["visited"]) + for url in new_visited: + if url not in crawled_urls_phase2: + crawled_urls_phase2.append(url) + + strategy2 = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=10, + resume_state=restored_state, + on_state_change=track_resumed_crawl, + ) + + config2 = CrawlerRunConfig( + deep_crawl_strategy=strategy2, + stream=False, + verbose=False, + ) + + async with AsyncWebCrawler(verbose=False) as crawler: + results = await crawler.arun("https://books.toscrape.com", config=config2) + + print(f"\n=== Phase 2: Resumed crawl ===") + print(f"New URLs crawled: {len(crawled_urls_phase2)}") + print(f"Final pages_crawled: {strategy2._pages_crawled}") + + # Verify no duplicates - URLs from phase 1 should not be re-crawled + already_crawled = set(last_state["visited"]) - {item["url"] for item in last_state["pending"]} + duplicates = set(crawled_urls_phase2) & already_crawled + + assert len(duplicates) == 0, f"Duplicate crawls detected: {duplicates}" + + # Verify we made progress (crawled some of the pending URLs) + pending_urls = {item["url"] for item in last_state["pending"]} + crawled_pending = set(crawled_urls_phase2) & pending_urls + + print(f"Pending URLs crawled in phase 2: {len(crawled_pending)}") + + # Final state should show more pages crawled than before crash + final_state = strategy2.export_state() + if final_state: + assert final_state["pages_crawled"] >= last_state["pages_crawled"], \ + "Resume did not make progress" + + print("\n=== Integration test PASSED ===") + + @pytest.mark.asyncio + async def test_state_export_method(self): + """Test that export_state() returns valid state during crawl.""" + states_from_callback: List[Dict] = [] + + async def capture(state): + states_from_callback.append(state) + + strategy = BFSDeepCrawlStrategy( + max_depth=1, + max_pages=3, + on_state_change=capture, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + stream=False, + verbose=False, + ) + + async with AsyncWebCrawler(verbose=False) as crawler: + await crawler.arun("https://books.toscrape.com", config=config) + + # export_state should return the last captured state + exported = strategy.export_state() + + assert exported is not None, "export_state() returned None" + assert exported == states_from_callback[-1], "export_state() doesn't match last callback" + + print(f"\n=== export_state() test PASSED ===") + print(f"Final state: {exported['pages_crawled']} pages, {len(exported['visited'])} visited")