Files
crawl4ai/docs/examples/deep_crawl_cancellation.py
unclecode 1e2b7fe7e6 Add documentation and example for deep crawl cancellation
- Add Section 11 "Cancellation Support for Deep Crawls" to deep-crawling.md
- Document should_cancel callback, cancel() method, and cancelled property
- Include complete example for cloud platform job cancellation
- Add docs/examples/deep_crawl_cancellation.py with 6 comprehensive examples
- Update summary section to mention cancellation feature
2026-01-22 06:10:54 +00:00

417 lines
12 KiB
Python

"""
Deep Crawl Cancellation Example
This example demonstrates how to implement cancellable deep crawls in Crawl4AI.
Useful for cloud platforms, job management systems, or any scenario where you
need to stop a running crawl mid-execution and retrieve partial results.
Features demonstrated:
1. Callback-based cancellation (check external source like Redis)
2. Direct cancellation via cancel() method
3. Checking cancellation status
4. State tracking with cancelled flag
5. Strategy reuse after cancellation
Requirements:
pip install crawl4ai redis
"""
import asyncio
import json
from typing import Dict, Any, List
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.deep_crawling import (
BFSDeepCrawlStrategy,
DFSDeepCrawlStrategy,
BestFirstCrawlingStrategy,
)
# =============================================================================
# Example 1: Basic Cancellation with Callback
# =============================================================================
async def example_callback_cancellation():
"""
Cancel a crawl after reaching a certain number of pages.
This simulates checking an external cancellation source.
"""
print("\n" + "="*60)
print("Example 1: Callback-based Cancellation")
print("="*60)
pages_crawled = 0
max_before_cancel = 5
# This callback is checked before each URL is processed
async def should_cancel():
# In production, you might check Redis, a database, or an API:
# job = await redis.get(f"job:{job_id}")
# return job.get("status") == "cancelled"
return pages_crawled >= max_before_cancel
# Track progress via state changes
async def on_state_change(state: Dict[str, Any]):
nonlocal pages_crawled
pages_crawled = state.get("pages_crawled", 0)
cancelled = state.get("cancelled", False)
print(f" Progress: {pages_crawled} pages | Cancelled: {cancelled}")
strategy = BFSDeepCrawlStrategy(
max_depth=3,
max_pages=100, # Would crawl up to 100, but we'll cancel at 5
should_cancel=should_cancel,
on_state_change=on_state_change,
)
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False,
)
print(f"Starting crawl (will cancel after {max_before_cancel} pages)...")
async with AsyncWebCrawler() as crawler:
results = await crawler.arun(
"https://docs.crawl4ai.com",
config=config
)
print(f"\nResults:")
print(f" - Crawled {len(results)} pages")
print(f" - Strategy cancelled: {strategy.cancelled}")
print(f" - Pages crawled counter: {strategy._pages_crawled}")
return results
# =============================================================================
# Example 2: Direct Cancellation via cancel() Method
# =============================================================================
async def example_direct_cancellation():
"""
Cancel a crawl directly using the cancel() method.
This is useful when you have direct access to the strategy instance.
"""
print("\n" + "="*60)
print("Example 2: Direct Cancellation via cancel()")
print("="*60)
strategy = BFSDeepCrawlStrategy(
max_depth=3,
max_pages=100,
)
# Cancel after 3 seconds
async def cancel_after_delay():
await asyncio.sleep(3)
print(" Calling strategy.cancel()...")
strategy.cancel()
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False,
)
print("Starting crawl (will cancel after 3 seconds)...")
async with AsyncWebCrawler() as crawler:
# Start cancellation timer in background
cancel_task = asyncio.create_task(cancel_after_delay())
try:
results = await crawler.arun(
"https://docs.crawl4ai.com",
config=config
)
finally:
cancel_task.cancel()
try:
await cancel_task
except asyncio.CancelledError:
pass
print(f"\nResults:")
print(f" - Crawled {len(results)} pages")
print(f" - Strategy cancelled: {strategy.cancelled}")
return results
# =============================================================================
# Example 3: Streaming Mode with Cancellation
# =============================================================================
async def example_streaming_cancellation():
"""
Cancel a streaming crawl and process partial results as they arrive.
"""
print("\n" + "="*60)
print("Example 3: Streaming Mode with Cancellation")
print("="*60)
results_count = 0
cancel_after = 3
async def should_cancel():
return results_count >= cancel_after
strategy = DFSDeepCrawlStrategy(
max_depth=5,
max_pages=50,
should_cancel=should_cancel,
)
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
stream=True, # Enable streaming
verbose=False,
)
print(f"Starting streaming crawl (will cancel after {cancel_after} results)...")
results = []
async with AsyncWebCrawler() as crawler:
async for result in await crawler.arun(
"https://docs.crawl4ai.com",
config=config
):
results_count += 1
results.append(result)
print(f" Received result {results_count}: {result.url[:60]}...")
print(f"\nResults:")
print(f" - Received {len(results)} results")
print(f" - Strategy cancelled: {strategy.cancelled}")
return results
# =============================================================================
# Example 4: Strategy Reuse After Cancellation
# =============================================================================
async def example_strategy_reuse():
"""
Demonstrate that a strategy can be reused after cancellation.
The cancel flag is automatically reset on the next crawl.
"""
print("\n" + "="*60)
print("Example 4: Strategy Reuse After Cancellation")
print("="*60)
crawl_number = 0
async def cancel_first_crawl_only():
# Only cancel during the first crawl
return crawl_number == 1
strategy = BFSDeepCrawlStrategy(
max_depth=1,
max_pages=10,
should_cancel=cancel_first_crawl_only,
)
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False,
)
async with AsyncWebCrawler() as crawler:
# First crawl - will be cancelled immediately
crawl_number = 1
print("First crawl (will be cancelled)...")
results1 = await crawler.arun(
"https://docs.crawl4ai.com",
config=config
)
print(f" - Results: {len(results1)}, Cancelled: {strategy.cancelled}")
# Second crawl - should work normally
crawl_number = 2
print("\nSecond crawl (should complete normally)...")
results2 = await crawler.arun(
"https://docs.crawl4ai.com",
config=config
)
print(f" - Results: {len(results2)}, Cancelled: {strategy.cancelled}")
print(f"\nSummary:")
print(f" - First crawl: {len(results1)} results (cancelled)")
print(f" - Second crawl: {len(results2)} results (completed)")
# =============================================================================
# Example 5: Best-First Strategy with Cancellation
# =============================================================================
async def example_best_first_cancellation():
"""
Cancel a Best-First crawl that prioritizes URLs by relevance score.
"""
print("\n" + "="*60)
print("Example 5: Best-First Strategy with Cancellation")
print("="*60)
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
pages_crawled = 0
async def should_cancel():
return pages_crawled >= 3
async def track_progress(state: Dict[str, Any]):
nonlocal pages_crawled
pages_crawled = state.get("pages_crawled", 0)
print(f" Pages: {pages_crawled}, Cancelled: {state.get('cancelled', False)}")
scorer = KeywordRelevanceScorer(
keywords=["api", "example", "tutorial"],
weight=0.8
)
strategy = BestFirstCrawlingStrategy(
max_depth=2,
max_pages=50,
url_scorer=scorer,
should_cancel=should_cancel,
on_state_change=track_progress,
)
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
stream=True,
verbose=False,
)
print("Starting Best-First crawl (will cancel after 3 pages)...")
results = []
async with AsyncWebCrawler() as crawler:
async for result in await crawler.arun(
"https://docs.crawl4ai.com",
config=config
):
results.append(result)
score = result.metadata.get("score", 0)
print(f" Result: {result.url[:50]}... (score: {score:.2f})")
print(f"\nResults:")
print(f" - Crawled {len(results)} high-priority pages")
print(f" - Strategy cancelled: {strategy.cancelled}")
# =============================================================================
# Example 6: Production Pattern - Redis-Based Cancellation (Simulated)
# =============================================================================
async def example_production_pattern():
"""
Simulate a production pattern where cancellation is checked from Redis.
This pattern is suitable for cloud platforms with job management.
"""
print("\n" + "="*60)
print("Example 6: Production Pattern (Simulated Redis)")
print("="*60)
# Simulate Redis storage
redis_storage: Dict[str, str] = {}
job_id = "crawl-job-12345"
# Simulate Redis operations
async def redis_get(key: str) -> str:
return redis_storage.get(key)
async def redis_set(key: str, value: str):
redis_storage[key] = value
# Initialize job status
await redis_set(f"{job_id}:status", "running")
# Cancellation check
async def check_cancelled():
status = await redis_get(f"{job_id}:status")
return status == "cancelled"
# Progress tracking
async def save_progress(state: Dict[str, Any]):
await redis_set(f"{job_id}:state", json.dumps(state))
await redis_set(f"{job_id}:pages", str(state["pages_crawled"]))
print(f" Saved progress: {state['pages_crawled']} pages")
strategy = BFSDeepCrawlStrategy(
max_depth=2,
max_pages=20,
should_cancel=check_cancelled,
on_state_change=save_progress,
)
config = CrawlerRunConfig(
deep_crawl_strategy=strategy,
verbose=False,
)
# Simulate external cancellation after 2 seconds
async def external_cancel():
await asyncio.sleep(2)
print("\n [External] Setting job status to 'cancelled'...")
await redis_set(f"{job_id}:status", "cancelled")
print("Starting crawl with simulated Redis job management...")
async with AsyncWebCrawler() as crawler:
cancel_task = asyncio.create_task(external_cancel())
try:
results = await crawler.arun(
"https://docs.crawl4ai.com",
config=config
)
finally:
cancel_task.cancel()
try:
await cancel_task
except asyncio.CancelledError:
pass
# Final status
final_status = "cancelled" if strategy.cancelled else "completed"
await redis_set(f"{job_id}:status", final_status)
print(f"\nJob completed:")
print(f" - Final status: {final_status}")
print(f" - Pages crawled: {await redis_get(f'{job_id}:pages')}")
print(f" - Results returned: {len(results)}")
# Show final state
final_state = json.loads(await redis_get(f"{job_id}:state"))
print(f" - State saved: {len(final_state.get('visited', []))} URLs visited")
# =============================================================================
# Main
# =============================================================================
async def main():
"""Run all examples."""
print("="*60)
print("Deep Crawl Cancellation Examples")
print("="*60)
await example_callback_cancellation()
await example_direct_cancellation()
await example_streaming_cancellation()
await example_strategy_reuse()
await example_best_first_cancellation()
await example_production_pattern()
print("\n" + "="*60)
print("All examples completed!")
print("="*60)
if __name__ == "__main__":
asyncio.run(main())