From 1e2b7fe7e6b9a405c07ea78c81b433c5aa5679d9 Mon Sep 17 00:00:00 2001 From: unclecode Date: Thu, 22 Jan 2026 06:10:54 +0000 Subject: [PATCH] Add documentation and example for deep crawl cancellation - Add Section 11 "Cancellation Support for Deep Crawls" to deep-crawling.md - Document should_cancel callback, cancel() method, and cancelled property - Include complete example for cloud platform job cancellation - Add docs/examples/deep_crawl_cancellation.py with 6 comprehensive examples - Update summary section to mention cancellation feature --- docs/examples/deep_crawl_cancellation.py | 416 +++++++++++++++++++++++ docs/md_v2/core/deep-crawling.md | 176 +++++++++- 2 files changed, 585 insertions(+), 7 deletions(-) create mode 100644 docs/examples/deep_crawl_cancellation.py diff --git a/docs/examples/deep_crawl_cancellation.py b/docs/examples/deep_crawl_cancellation.py new file mode 100644 index 00000000..b709fb08 --- /dev/null +++ b/docs/examples/deep_crawl_cancellation.py @@ -0,0 +1,416 @@ +""" +Deep Crawl Cancellation Example + +This example demonstrates how to implement cancellable deep crawls in Crawl4AI. +Useful for cloud platforms, job management systems, or any scenario where you +need to stop a running crawl mid-execution and retrieve partial results. + +Features demonstrated: +1. Callback-based cancellation (check external source like Redis) +2. Direct cancellation via cancel() method +3. Checking cancellation status +4. State tracking with cancelled flag +5. Strategy reuse after cancellation + +Requirements: + pip install crawl4ai redis +""" + +import asyncio +import json +from typing import Dict, Any, List +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig +from crawl4ai.deep_crawling import ( + BFSDeepCrawlStrategy, + DFSDeepCrawlStrategy, + BestFirstCrawlingStrategy, +) + + +# ============================================================================= +# Example 1: Basic Cancellation with Callback +# ============================================================================= + +async def example_callback_cancellation(): + """ + Cancel a crawl after reaching a certain number of pages. + This simulates checking an external cancellation source. + """ + print("\n" + "="*60) + print("Example 1: Callback-based Cancellation") + print("="*60) + + pages_crawled = 0 + max_before_cancel = 5 + + # This callback is checked before each URL is processed + async def should_cancel(): + # In production, you might check Redis, a database, or an API: + # job = await redis.get(f"job:{job_id}") + # return job.get("status") == "cancelled" + return pages_crawled >= max_before_cancel + + # Track progress via state changes + async def on_state_change(state: Dict[str, Any]): + nonlocal pages_crawled + pages_crawled = state.get("pages_crawled", 0) + cancelled = state.get("cancelled", False) + print(f" Progress: {pages_crawled} pages | Cancelled: {cancelled}") + + strategy = BFSDeepCrawlStrategy( + max_depth=3, + max_pages=100, # Would crawl up to 100, but we'll cancel at 5 + should_cancel=should_cancel, + on_state_change=on_state_change, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + verbose=False, + ) + + print(f"Starting crawl (will cancel after {max_before_cancel} pages)...") + + async with AsyncWebCrawler() as crawler: + results = await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ) + + print(f"\nResults:") + print(f" - Crawled {len(results)} pages") + print(f" - Strategy cancelled: {strategy.cancelled}") + print(f" - Pages crawled counter: {strategy._pages_crawled}") + + return results + + +# ============================================================================= +# Example 2: Direct Cancellation via cancel() Method +# ============================================================================= + +async def example_direct_cancellation(): + """ + Cancel a crawl directly using the cancel() method. + This is useful when you have direct access to the strategy instance. + """ + print("\n" + "="*60) + print("Example 2: Direct Cancellation via cancel()") + print("="*60) + + strategy = BFSDeepCrawlStrategy( + max_depth=3, + max_pages=100, + ) + + # Cancel after 3 seconds + async def cancel_after_delay(): + await asyncio.sleep(3) + print(" Calling strategy.cancel()...") + strategy.cancel() + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + verbose=False, + ) + + print("Starting crawl (will cancel after 3 seconds)...") + + async with AsyncWebCrawler() as crawler: + # Start cancellation timer in background + cancel_task = asyncio.create_task(cancel_after_delay()) + + try: + results = await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ) + finally: + cancel_task.cancel() + try: + await cancel_task + except asyncio.CancelledError: + pass + + print(f"\nResults:") + print(f" - Crawled {len(results)} pages") + print(f" - Strategy cancelled: {strategy.cancelled}") + + return results + + +# ============================================================================= +# Example 3: Streaming Mode with Cancellation +# ============================================================================= + +async def example_streaming_cancellation(): + """ + Cancel a streaming crawl and process partial results as they arrive. + """ + print("\n" + "="*60) + print("Example 3: Streaming Mode with Cancellation") + print("="*60) + + results_count = 0 + cancel_after = 3 + + async def should_cancel(): + return results_count >= cancel_after + + strategy = DFSDeepCrawlStrategy( + max_depth=5, + max_pages=50, + should_cancel=should_cancel, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + stream=True, # Enable streaming + verbose=False, + ) + + print(f"Starting streaming crawl (will cancel after {cancel_after} results)...") + + results = [] + async with AsyncWebCrawler() as crawler: + async for result in await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ): + results_count += 1 + results.append(result) + print(f" Received result {results_count}: {result.url[:60]}...") + + print(f"\nResults:") + print(f" - Received {len(results)} results") + print(f" - Strategy cancelled: {strategy.cancelled}") + + return results + + +# ============================================================================= +# Example 4: Strategy Reuse After Cancellation +# ============================================================================= + +async def example_strategy_reuse(): + """ + Demonstrate that a strategy can be reused after cancellation. + The cancel flag is automatically reset on the next crawl. + """ + print("\n" + "="*60) + print("Example 4: Strategy Reuse After Cancellation") + print("="*60) + + crawl_number = 0 + + async def cancel_first_crawl_only(): + # Only cancel during the first crawl + return crawl_number == 1 + + strategy = BFSDeepCrawlStrategy( + max_depth=1, + max_pages=10, + should_cancel=cancel_first_crawl_only, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + verbose=False, + ) + + async with AsyncWebCrawler() as crawler: + # First crawl - will be cancelled immediately + crawl_number = 1 + print("First crawl (will be cancelled)...") + results1 = await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ) + print(f" - Results: {len(results1)}, Cancelled: {strategy.cancelled}") + + # Second crawl - should work normally + crawl_number = 2 + print("\nSecond crawl (should complete normally)...") + results2 = await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ) + print(f" - Results: {len(results2)}, Cancelled: {strategy.cancelled}") + + print(f"\nSummary:") + print(f" - First crawl: {len(results1)} results (cancelled)") + print(f" - Second crawl: {len(results2)} results (completed)") + + +# ============================================================================= +# Example 5: Best-First Strategy with Cancellation +# ============================================================================= + +async def example_best_first_cancellation(): + """ + Cancel a Best-First crawl that prioritizes URLs by relevance score. + """ + print("\n" + "="*60) + print("Example 5: Best-First Strategy with Cancellation") + print("="*60) + + from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer + + pages_crawled = 0 + + async def should_cancel(): + return pages_crawled >= 3 + + async def track_progress(state: Dict[str, Any]): + nonlocal pages_crawled + pages_crawled = state.get("pages_crawled", 0) + print(f" Pages: {pages_crawled}, Cancelled: {state.get('cancelled', False)}") + + scorer = KeywordRelevanceScorer( + keywords=["api", "example", "tutorial"], + weight=0.8 + ) + + strategy = BestFirstCrawlingStrategy( + max_depth=2, + max_pages=50, + url_scorer=scorer, + should_cancel=should_cancel, + on_state_change=track_progress, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + stream=True, + verbose=False, + ) + + print("Starting Best-First crawl (will cancel after 3 pages)...") + + results = [] + async with AsyncWebCrawler() as crawler: + async for result in await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ): + results.append(result) + score = result.metadata.get("score", 0) + print(f" Result: {result.url[:50]}... (score: {score:.2f})") + + print(f"\nResults:") + print(f" - Crawled {len(results)} high-priority pages") + print(f" - Strategy cancelled: {strategy.cancelled}") + + +# ============================================================================= +# Example 6: Production Pattern - Redis-Based Cancellation (Simulated) +# ============================================================================= + +async def example_production_pattern(): + """ + Simulate a production pattern where cancellation is checked from Redis. + This pattern is suitable for cloud platforms with job management. + """ + print("\n" + "="*60) + print("Example 6: Production Pattern (Simulated Redis)") + print("="*60) + + # Simulate Redis storage + redis_storage: Dict[str, str] = {} + + job_id = "crawl-job-12345" + + # Simulate Redis operations + async def redis_get(key: str) -> str: + return redis_storage.get(key) + + async def redis_set(key: str, value: str): + redis_storage[key] = value + + # Initialize job status + await redis_set(f"{job_id}:status", "running") + + # Cancellation check + async def check_cancelled(): + status = await redis_get(f"{job_id}:status") + return status == "cancelled" + + # Progress tracking + async def save_progress(state: Dict[str, Any]): + await redis_set(f"{job_id}:state", json.dumps(state)) + await redis_set(f"{job_id}:pages", str(state["pages_crawled"])) + print(f" Saved progress: {state['pages_crawled']} pages") + + strategy = BFSDeepCrawlStrategy( + max_depth=2, + max_pages=20, + should_cancel=check_cancelled, + on_state_change=save_progress, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + verbose=False, + ) + + # Simulate external cancellation after 2 seconds + async def external_cancel(): + await asyncio.sleep(2) + print("\n [External] Setting job status to 'cancelled'...") + await redis_set(f"{job_id}:status", "cancelled") + + print("Starting crawl with simulated Redis job management...") + + async with AsyncWebCrawler() as crawler: + cancel_task = asyncio.create_task(external_cancel()) + + try: + results = await crawler.arun( + "https://docs.crawl4ai.com", + config=config + ) + finally: + cancel_task.cancel() + try: + await cancel_task + except asyncio.CancelledError: + pass + + # Final status + final_status = "cancelled" if strategy.cancelled else "completed" + await redis_set(f"{job_id}:status", final_status) + + print(f"\nJob completed:") + print(f" - Final status: {final_status}") + print(f" - Pages crawled: {await redis_get(f'{job_id}:pages')}") + print(f" - Results returned: {len(results)}") + + # Show final state + final_state = json.loads(await redis_get(f"{job_id}:state")) + print(f" - State saved: {len(final_state.get('visited', []))} URLs visited") + + +# ============================================================================= +# Main +# ============================================================================= + +async def main(): + """Run all examples.""" + print("="*60) + print("Deep Crawl Cancellation Examples") + print("="*60) + + await example_callback_cancellation() + await example_direct_cancellation() + await example_streaming_cancellation() + await example_strategy_reuse() + await example_best_first_cancellation() + await example_production_pattern() + + print("\n" + "="*60) + print("All examples completed!") + print("="*60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/md_v2/core/deep-crawling.md b/docs/md_v2/core/deep-crawling.md index ded7638d..c171cc83 100644 --- a/docs/md_v2/core/deep-crawling.md +++ b/docs/md_v2/core/deep-crawling.md @@ -635,11 +635,172 @@ When `resume_state=None` and `on_state_change=None` (the defaults), there is no --- -## 11. Prefetch Mode for Fast URL Discovery +## 11. Cancellation Support for Deep Crawls + +For production environments like cloud platforms, you often need to stop a running crawl mid-execution—whether the user changed their mind, specified the wrong URL, or wants to control costs. Crawl4AI provides built-in cancellation support for all deep crawl strategies. + +### 11.1 Two Ways to Cancel + +**Option A: Callback-based cancellation** (recommended for external systems) + +Use `should_cancel` to check an external source (Redis, database, API) before each URL: + +```python +from crawl4ai.deep_crawling import BFSDeepCrawlStrategy + +async def check_if_cancelled(): + # Check Redis, database, or any external source + job = await redis.get(f"job:{job_id}") + return job.get("status") == "cancelled" + +strategy = BFSDeepCrawlStrategy( + max_depth=3, + max_pages=1000, + should_cancel=check_if_cancelled, # Called before each URL +) +``` + +**Option B: Direct cancellation** (for in-process control) + +Call `cancel()` directly on the strategy instance: + +```python +strategy = BFSDeepCrawlStrategy(max_depth=3, max_pages=1000) + +# In another coroutine or thread: +strategy.cancel() # Thread-safe, stops before next URL +``` + +### 11.2 Checking Cancellation Status + +Use the `cancelled` property to check if a crawl was cancelled: + +```python +async with AsyncWebCrawler() as crawler: + results = await crawler.arun(url, config=config) + +if strategy.cancelled: + print(f"Crawl was cancelled after {len(results)} pages") +else: + print(f"Crawl completed with {len(results)} pages") +``` + +### 11.3 State Notifications Include Cancelled Flag + +When using `on_state_change`, the state dictionary includes a `cancelled` field: + +```python +async def handle_state(state: dict): + if state.get("cancelled"): + print("Crawl was cancelled!") + print(f"Crawled {state['pages_crawled']} pages before cancellation") + # Save state for potential resume + await redis.set("crawl_state", json.dumps(state)) + +strategy = BFSDeepCrawlStrategy( + max_depth=3, + should_cancel=check_cancelled, + on_state_change=handle_state, +) +``` + +### 11.4 Key Behaviors + +| Scenario | Behavior | +|----------|----------| +| Cancel before first URL | Returns empty results, `cancelled=True` | +| Cancel during crawl | Completes current URL, then stops | +| Callback raises exception | Logged as warning, crawl continues (fail-open) | +| Strategy reuse after cancel | Works normally (cancel flag auto-resets) | +| Sync callback function | Supported (auto-detected and handled) | + +### 11.5 Complete Example: Cloud Platform Job Cancellation + +```python +import asyncio +import json +import redis.asyncio as redis +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig +from crawl4ai.deep_crawling import BFSDeepCrawlStrategy + +async def run_cancellable_crawl(job_id: str, start_url: str): + redis_client = redis.Redis(host='localhost', port=6379, db=0) + + # Check external cancellation source + async def check_cancelled(): + status = await redis_client.get(f"job:{job_id}:status") + return status == b"cancelled" + + # Save progress for monitoring and recovery + async def save_progress(state: dict): + await redis_client.set( + f"job:{job_id}:state", + json.dumps(state) + ) + # Update job progress + await redis_client.set( + f"job:{job_id}:pages_crawled", + state["pages_crawled"] + ) + + strategy = BFSDeepCrawlStrategy( + max_depth=3, + max_pages=500, + should_cancel=check_cancelled, + on_state_change=save_progress, + ) + + config = CrawlerRunConfig( + deep_crawl_strategy=strategy, + stream=True, + ) + + results = [] + try: + async with AsyncWebCrawler() as crawler: + async for result in await crawler.arun(start_url, config=config): + results.append(result) + print(f"Crawled: {result.url}") + finally: + # Report final status + if strategy.cancelled: + await redis_client.set(f"job:{job_id}:status", "cancelled") + print(f"Job cancelled after {len(results)} pages") + else: + await redis_client.set(f"job:{job_id}:status", "completed") + print(f"Job completed with {len(results)} pages") + + await redis_client.close() + + return results + +# Usage +# asyncio.run(run_cancellable_crawl("job-123", "https://example.com")) +# +# To cancel from another process: +# redis_client.set("job:job-123:status", "cancelled") +``` + +### 11.6 Supported Strategies + +Cancellation works identically across all deep crawl strategies: + +- **BFSDeepCrawlStrategy** - Breadth-first search +- **DFSDeepCrawlStrategy** - Depth-first search +- **BestFirstCrawlingStrategy** - Priority-based crawling + +All strategies support: +- `should_cancel` callback parameter +- `cancel()` method +- `cancelled` property + +--- + +## 12. Prefetch Mode for Fast URL Discovery When you need to quickly discover URLs without full page processing, use **prefetch mode**. This is ideal for two-phase crawling where you first map the site, then selectively process specific pages. -### 11.1 Enabling Prefetch Mode +### 12.1 Enabling Prefetch Mode ```python from crawl4ai import AsyncWebCrawler, CrawlerRunConfig @@ -654,7 +815,7 @@ async with AsyncWebCrawler() as crawler: print(f"Found {len(result.links['external'])} external links") ``` -### 11.2 What Gets Skipped +### 12.2 What Gets Skipped Prefetch mode uses a fast path that bypasses heavy processing: @@ -667,14 +828,14 @@ Prefetch mode uses a fast path that bypasses heavy processing: | Media extraction | ✅ | ❌ Skipped | | LLM extraction | ✅ | ❌ Skipped | -### 11.3 Performance Benefit +### 12.3 Performance Benefit - **Normal mode**: Full pipeline (~2-5 seconds per page) - **Prefetch mode**: HTML + links only (~200-500ms per page) This makes prefetch mode **5-10x faster** for URL discovery. -### 11.4 Two-Phase Crawling Pattern +### 12.4 Two-Phase Crawling Pattern The most common use case is two-phase crawling: @@ -720,7 +881,7 @@ if __name__ == "__main__": print(f"Fully processed {len(results)} pages") ``` -### 11.5 Use Cases +### 12.5 Use Cases - **Site mapping**: Quickly discover all URLs before deciding what to process - **Link validation**: Check which pages exist without heavy processing @@ -729,7 +890,7 @@ if __name__ == "__main__": --- -## 12. Summary & Next Steps +## 13. Summary & Next Steps In this **Deep Crawling with Crawl4AI** tutorial, you learned to: @@ -740,6 +901,7 @@ In this **Deep Crawling with Crawl4AI** tutorial, you learned to: - Limit crawls with `max_pages` and `score_threshold` parameters - Build a complete advanced crawler with combined techniques - **Implement crash recovery** with `resume_state` and `on_state_change` for production deployments +- **Cancel running crawls** with `should_cancel` callback or `cancel()` method for cloud platform job management - **Use prefetch mode** for fast URL discovery and two-phase crawling With these tools, you can efficiently extract structured data from websites at scale, focusing precisely on the content you need for your specific use case.