feat: add webhook notifications for crawl job completion

Implements webhook support for the crawl job API to eliminate polling requirements.

Changes:
- Added WebhookConfig and WebhookPayload schemas to schemas.py
- Created webhook.py with WebhookDeliveryService class
- Integrated webhook notifications in api.py handle_crawl_job
- Updated job.py CrawlJobPayload to accept webhook_config
- Added webhook configuration section to config.yml
- Included comprehensive usage examples in WEBHOOK_EXAMPLES.md

Features:
- Webhook notifications on job completion (success/failure)
- Configurable data inclusion in webhook payload
- Custom webhook headers support
- Global default webhook URL configuration
- Exponential backoff retry logic (5 attempts: 1s, 2s, 4s, 8s, 16s)
- 30-second timeout per webhook call

Usage:
POST /crawl/job with optional webhook_config:
- webhook_url: URL to receive notifications
- webhook_data_in_payload: include full results (default: false)
- webhook_headers: custom headers for authentication

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2025-10-21 16:17:40 +00:00
parent fdbcddbf1a
commit 8a37710313
6 changed files with 517 additions and 5 deletions

View File

@@ -0,0 +1,281 @@
# Webhook Feature Examples
This document provides examples of how to use the webhook feature for crawl jobs in Crawl4AI.
## Overview
The webhook feature allows you to receive notifications when crawl jobs complete, eliminating the need for polling. Webhooks are sent with exponential backoff retry logic to ensure reliable delivery.
## Configuration
### Global Configuration (config.yml)
You can configure default webhook settings in `config.yml`:
```yaml
webhooks:
enabled: true
default_url: null # Optional: default webhook URL for all jobs
data_in_payload: false # Optional: default behavior for including data
retry:
max_attempts: 5
initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
max_delay_ms: 32000
timeout_ms: 30000 # 30s timeout per webhook call
headers: # Optional: default headers to include
User-Agent: "Crawl4AI-Webhook/1.0"
```
## API Usage Examples
### Example 1: Basic Webhook (Notification Only)
Send a webhook notification without including the crawl data in the payload.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": false
}
}'
```
**Response:**
```json
{
"task_id": "crawl_a1b2c3d4"
}
```
**Webhook Payload Received:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"]
}
```
Your webhook handler should then fetch the results:
```bash
curl http://localhost:11235/crawl/job/crawl_a1b2c3d4
```
### Example 2: Webhook with Data Included
Include the full crawl results in the webhook payload.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": true
}
}'
```
**Webhook Payload Received:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"data": {
"markdown": "...",
"html": "...",
"links": {...},
"metadata": {...}
}
}
```
### Example 3: Webhook with Custom Headers
Include custom headers for authentication or identification.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": false,
"webhook_headers": {
"X-Webhook-Secret": "my-secret-token",
"X-Service-ID": "crawl4ai-production"
}
}
}'
```
The webhook will be sent with these additional headers plus the default headers from config.
### Example 4: Failure Notification
When a crawl job fails, a webhook is sent with error details.
**Webhook Payload on Failure:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "failed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"error": "Connection timeout after 30s"
}
```
### Example 5: Using Global Default Webhook
If you set a `default_url` in config.yml, jobs without webhook_config will use it:
**config.yml:**
```yaml
webhooks:
enabled: true
default_url: "https://myapp.com/webhooks/default"
data_in_payload: false
```
**Request (no webhook_config needed):**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"]
}'
```
The webhook will be sent to the default URL configured in config.yml.
## Webhook Handler Example
Here's a simple Python Flask webhook handler:
```python
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/webhooks/crawl-complete', methods=['POST'])
def handle_crawl_webhook():
payload = request.json
task_id = payload['task_id']
status = payload['status']
if status == 'completed':
# If data not in payload, fetch it
if 'data' not in payload:
response = requests.get(f'http://localhost:11235/crawl/job/{task_id}')
data = response.json()
else:
data = payload['data']
# Process the crawl data
print(f"Processing crawl results for {task_id}")
# Your business logic here...
elif status == 'failed':
error = payload.get('error', 'Unknown error')
print(f"Crawl job {task_id} failed: {error}")
# Handle failure...
return jsonify({"status": "received"}), 200
if __name__ == '__main__':
app.run(port=8080)
```
## Retry Logic
The webhook delivery service uses exponential backoff retry logic:
- **Attempts:** Up to 5 attempts by default
- **Delays:** 1s → 2s → 4s → 8s → 16s
- **Timeout:** 30 seconds per attempt
- **Retry Conditions:**
- Server errors (5xx status codes)
- Network errors
- Timeouts
- **No Retry:**
- Client errors (4xx status codes)
- Successful delivery (2xx status codes)
## Benefits
1. **No Polling Required** - Eliminates constant API calls to check job status
2. **Real-time Notifications** - Immediate notification when jobs complete
3. **Reliable Delivery** - Exponential backoff ensures webhooks are delivered
4. **Flexible** - Choose between notification-only or full data delivery
5. **Secure** - Support for custom headers for authentication
6. **Configurable** - Global defaults or per-job configuration
## TypeScript Client Example
```typescript
interface WebhookConfig {
webhook_url: string;
webhook_data_in_payload?: boolean;
webhook_headers?: Record<string, string>;
}
interface CrawlJobRequest {
urls: string[];
browser_config?: Record<string, any>;
crawler_config?: Record<string, any>;
webhook_config?: WebhookConfig;
}
async function createCrawlJob(request: CrawlJobRequest) {
const response = await fetch('http://localhost:11235/crawl/job', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request)
});
const { task_id } = await response.json();
return task_id;
}
// Usage
const taskId = await createCrawlJob({
urls: ['https://example.com'],
webhook_config: {
webhook_url: 'https://myapp.com/webhooks/crawl-complete',
webhook_data_in_payload: false,
webhook_headers: {
'X-Webhook-Secret': 'my-secret'
}
}
});
```
## Monitoring and Debugging
Webhook delivery attempts are logged at INFO level:
- Successful deliveries
- Retry attempts with delays
- Final failures after max attempts
Check the application logs for webhook delivery status:
```bash
docker logs crawl4ai-container | grep -i webhook
```

View File

@@ -44,6 +44,7 @@ from utils import (
get_llm_api_key,
validate_llm_provider
)
from webhook import WebhookDeliveryService
import psutil, time
@@ -567,6 +568,7 @@ async def handle_crawl_job(
browser_config: Dict,
crawler_config: Dict,
config: Dict,
webhook_config: Optional[Dict] = None,
) -> Dict:
"""
Fire-and-forget version of handle_crawl_request.
@@ -574,13 +576,24 @@ async def handle_crawl_job(
lets /crawl/job/{task_id} polling fetch the result.
"""
task_id = f"crawl_{uuid4().hex[:8]}"
await redis.hset(f"task:{task_id}", mapping={
# Store task data in Redis
task_data = {
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.utcnow().isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",
})
}
# Store webhook config if provided
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)
await redis.hset(f"task:{task_id}", mapping=task_data)
# Initialize webhook service
webhook_service = WebhookDeliveryService(config)
async def _runner():
try:
@@ -594,6 +607,17 @@ async def handle_crawl_job(
"status": TaskStatus.COMPLETED,
"result": json.dumps(result),
})
# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="crawl",
status="completed",
urls=urls,
webhook_config=webhook_config,
result=result
)
await asyncio.sleep(5) # Give Redis time to process the update
except Exception as exc:
await redis.hset(f"task:{task_id}", mapping={
@@ -601,5 +625,15 @@ async def handle_crawl_job(
"error": str(exc),
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="crawl",
status="failed",
urls=urls,
webhook_config=webhook_config,
error=str(exc)
)
background_tasks.add_task(_runner)
return {"task_id": task_id}

View File

@@ -88,4 +88,17 @@ observability:
enabled: True
endpoint: "/metrics"
health_check:
endpoint: "/health"
endpoint: "/health"
# Webhook Configuration
webhooks:
enabled: true
default_url: null # Optional: default webhook URL for all jobs
data_in_payload: false # Optional: default behavior for including data
retry:
max_attempts: 5
initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
max_delay_ms: 32000
timeout_ms: 30000 # 30s timeout per webhook call
headers: # Optional: default headers to include
User-Agent: "Crawl4AI-Webhook/1.0"

View File

@@ -12,6 +12,7 @@ from api import (
handle_crawl_job,
handle_task_status,
)
from schemas import WebhookConfig
# ------------- dependency placeholders -------------
_redis = None # will be injected from server.py
@@ -43,6 +44,7 @@ class CrawlJobPayload(BaseModel):
urls: list[HttpUrl]
browser_config: Dict = {}
crawler_config: Dict = {}
webhook_config: Optional[WebhookConfig] = None
# ---------- LLM job ---------------------------------------------------------
@@ -82,6 +84,10 @@ async def crawl_job_enqueue(
background_tasks: BackgroundTasks,
_td: Dict = Depends(lambda: _token_dep()),
):
webhook_config = None
if payload.webhook_config:
webhook_config = payload.webhook_config.dict()
return await handle_crawl_job(
_redis,
background_tasks,
@@ -89,6 +95,7 @@ async def crawl_job_enqueue(
payload.browser_config,
payload.crawler_config,
config=_config,
webhook_config=webhook_config,
)

View File

@@ -1,6 +1,6 @@
from typing import List, Optional, Dict
from enum import Enum
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, HttpUrl
from utils import FilterType
@@ -39,4 +39,22 @@ class JSEndpointRequest(BaseModel):
scripts: List[str] = Field(
...,
description="List of separated JavaScript snippets to execute"
)
)
class WebhookConfig(BaseModel):
"""Configuration for webhook notifications."""
webhook_url: HttpUrl
webhook_data_in_payload: bool = False
webhook_headers: Optional[Dict[str, str]] = None
class WebhookPayload(BaseModel):
"""Payload sent to webhook endpoints."""
task_id: str
task_type: str # "crawl", "llm_extraction", etc.
status: str # "completed" or "failed"
timestamp: str # ISO 8601 format
urls: List[str]
error: Optional[str] = None
data: Optional[Dict] = None # Included only if webhook_data_in_payload=True

159
deploy/docker/webhook.py Normal file
View File

@@ -0,0 +1,159 @@
"""
Webhook delivery service for Crawl4AI.
This module provides webhook notification functionality with exponential backoff retry logic.
"""
import asyncio
import httpx
import logging
from typing import Dict, Optional
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class WebhookDeliveryService:
"""Handles webhook delivery with exponential backoff retry logic."""
def __init__(self, config: Dict):
"""
Initialize the webhook delivery service.
Args:
config: Application configuration dictionary containing webhook settings
"""
self.config = config.get("webhooks", {})
self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5)
self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000
self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000
self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000
async def send_webhook(
self,
webhook_url: str,
payload: Dict,
headers: Optional[Dict[str, str]] = None
) -> bool:
"""
Send webhook with exponential backoff retry logic.
Args:
webhook_url: The URL to send the webhook to
payload: The JSON payload to send
headers: Optional custom headers
Returns:
bool: True if delivered successfully, False otherwise
"""
default_headers = self.config.get("headers", {})
merged_headers = {**default_headers, **(headers or {})}
merged_headers["Content-Type"] = "application/json"
async with httpx.AsyncClient(timeout=self.timeout) as client:
for attempt in range(self.max_attempts):
try:
logger.info(
f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}"
)
response = await client.post(
webhook_url,
json=payload,
headers=merged_headers
)
# Success or client error (don't retry client errors)
if response.status_code < 500:
if 200 <= response.status_code < 300:
logger.info(f"Webhook delivered successfully to {webhook_url}")
return True
else:
logger.warning(
f"Webhook rejected with status {response.status_code}: {response.text[:200]}"
)
return False # Client error - don't retry
# Server error - retry with backoff
logger.warning(
f"Webhook failed with status {response.status_code}, will retry"
)
except httpx.TimeoutException as exc:
logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}")
except httpx.RequestError as exc:
logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}")
except Exception as exc:
logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}")
# Calculate exponential backoff delay
if attempt < self.max_attempts - 1:
delay = min(self.initial_delay * (2 ** attempt), self.max_delay)
logger.info(f"Retrying in {delay}s...")
await asyncio.sleep(delay)
logger.error(
f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}"
)
return False
async def notify_job_completion(
self,
task_id: str,
task_type: str,
status: str,
urls: list,
webhook_config: Optional[Dict],
result: Optional[Dict] = None,
error: Optional[str] = None
):
"""
Notify webhook of job completion.
Args:
task_id: The task identifier
task_type: Type of task (e.g., "crawl", "llm_extraction")
status: Task status ("completed" or "failed")
urls: List of URLs that were crawled
webhook_config: Webhook configuration from the job request
result: Optional crawl result data
error: Optional error message if failed
"""
# Determine webhook URL
webhook_url = None
data_in_payload = self.config.get("data_in_payload", False)
custom_headers = None
if webhook_config:
webhook_url = webhook_config.get("webhook_url")
data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload)
custom_headers = webhook_config.get("webhook_headers")
if not webhook_url:
webhook_url = self.config.get("default_url")
if not webhook_url:
logger.debug("No webhook URL configured, skipping notification")
return
# Check if webhooks are enabled
if not self.config.get("enabled", True):
logger.debug("Webhooks are disabled, skipping notification")
return
# Build payload
payload = {
"task_id": task_id,
"task_type": task_type,
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
"urls": urls
}
if error:
payload["error"] = error
if data_in_payload and result:
payload["data"] = result
# Send webhook (fire and forget - don't block on completion)
await self.send_webhook(webhook_url, payload, custom_headers)