diff --git a/deploy/docker/WEBHOOK_EXAMPLES.md b/deploy/docker/WEBHOOK_EXAMPLES.md new file mode 100644 index 00000000..a9a61bb7 --- /dev/null +++ b/deploy/docker/WEBHOOK_EXAMPLES.md @@ -0,0 +1,281 @@ +# Webhook Feature Examples + +This document provides examples of how to use the webhook feature for crawl jobs in Crawl4AI. + +## Overview + +The webhook feature allows you to receive notifications when crawl jobs complete, eliminating the need for polling. Webhooks are sent with exponential backoff retry logic to ensure reliable delivery. + +## Configuration + +### Global Configuration (config.yml) + +You can configure default webhook settings in `config.yml`: + +```yaml +webhooks: + enabled: true + default_url: null # Optional: default webhook URL for all jobs + data_in_payload: false # Optional: default behavior for including data + retry: + max_attempts: 5 + initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff + max_delay_ms: 32000 + timeout_ms: 30000 # 30s timeout per webhook call + headers: # Optional: default headers to include + User-Agent: "Crawl4AI-Webhook/1.0" +``` + +## API Usage Examples + +### Example 1: Basic Webhook (Notification Only) + +Send a webhook notification without including the crawl data in the payload. + +**Request:** +```bash +curl -X POST http://localhost:11235/crawl/job \ + -H "Content-Type: application/json" \ + -d '{ + "urls": ["https://example.com"], + "webhook_config": { + "webhook_url": "https://myapp.com/webhooks/crawl-complete", + "webhook_data_in_payload": false + } + }' +``` + +**Response:** +```json +{ + "task_id": "crawl_a1b2c3d4" +} +``` + +**Webhook Payload Received:** +```json +{ + "task_id": "crawl_a1b2c3d4", + "task_type": "crawl", + "status": "completed", + "timestamp": "2025-10-21T10:30:00.000000+00:00", + "urls": ["https://example.com"] +} +``` + +Your webhook handler should then fetch the results: +```bash +curl http://localhost:11235/crawl/job/crawl_a1b2c3d4 +``` + +### Example 2: Webhook with Data Included + +Include the full crawl results in the webhook payload. + +**Request:** +```bash +curl -X POST http://localhost:11235/crawl/job \ + -H "Content-Type: application/json" \ + -d '{ + "urls": ["https://example.com"], + "webhook_config": { + "webhook_url": "https://myapp.com/webhooks/crawl-complete", + "webhook_data_in_payload": true + } + }' +``` + +**Webhook Payload Received:** +```json +{ + "task_id": "crawl_a1b2c3d4", + "task_type": "crawl", + "status": "completed", + "timestamp": "2025-10-21T10:30:00.000000+00:00", + "urls": ["https://example.com"], + "data": { + "markdown": "...", + "html": "...", + "links": {...}, + "metadata": {...} + } +} +``` + +### Example 3: Webhook with Custom Headers + +Include custom headers for authentication or identification. + +**Request:** +```bash +curl -X POST http://localhost:11235/crawl/job \ + -H "Content-Type: application/json" \ + -d '{ + "urls": ["https://example.com"], + "webhook_config": { + "webhook_url": "https://myapp.com/webhooks/crawl-complete", + "webhook_data_in_payload": false, + "webhook_headers": { + "X-Webhook-Secret": "my-secret-token", + "X-Service-ID": "crawl4ai-production" + } + } + }' +``` + +The webhook will be sent with these additional headers plus the default headers from config. + +### Example 4: Failure Notification + +When a crawl job fails, a webhook is sent with error details. + +**Webhook Payload on Failure:** +```json +{ + "task_id": "crawl_a1b2c3d4", + "task_type": "crawl", + "status": "failed", + "timestamp": "2025-10-21T10:30:00.000000+00:00", + "urls": ["https://example.com"], + "error": "Connection timeout after 30s" +} +``` + +### Example 5: Using Global Default Webhook + +If you set a `default_url` in config.yml, jobs without webhook_config will use it: + +**config.yml:** +```yaml +webhooks: + enabled: true + default_url: "https://myapp.com/webhooks/default" + data_in_payload: false +``` + +**Request (no webhook_config needed):** +```bash +curl -X POST http://localhost:11235/crawl/job \ + -H "Content-Type: application/json" \ + -d '{ + "urls": ["https://example.com"] + }' +``` + +The webhook will be sent to the default URL configured in config.yml. + +## Webhook Handler Example + +Here's a simple Python Flask webhook handler: + +```python +from flask import Flask, request, jsonify +import requests + +app = Flask(__name__) + +@app.route('/webhooks/crawl-complete', methods=['POST']) +def handle_crawl_webhook(): + payload = request.json + + task_id = payload['task_id'] + status = payload['status'] + + if status == 'completed': + # If data not in payload, fetch it + if 'data' not in payload: + response = requests.get(f'http://localhost:11235/crawl/job/{task_id}') + data = response.json() + else: + data = payload['data'] + + # Process the crawl data + print(f"Processing crawl results for {task_id}") + # Your business logic here... + + elif status == 'failed': + error = payload.get('error', 'Unknown error') + print(f"Crawl job {task_id} failed: {error}") + # Handle failure... + + return jsonify({"status": "received"}), 200 + +if __name__ == '__main__': + app.run(port=8080) +``` + +## Retry Logic + +The webhook delivery service uses exponential backoff retry logic: + +- **Attempts:** Up to 5 attempts by default +- **Delays:** 1s → 2s → 4s → 8s → 16s +- **Timeout:** 30 seconds per attempt +- **Retry Conditions:** + - Server errors (5xx status codes) + - Network errors + - Timeouts +- **No Retry:** + - Client errors (4xx status codes) + - Successful delivery (2xx status codes) + +## Benefits + +1. **No Polling Required** - Eliminates constant API calls to check job status +2. **Real-time Notifications** - Immediate notification when jobs complete +3. **Reliable Delivery** - Exponential backoff ensures webhooks are delivered +4. **Flexible** - Choose between notification-only or full data delivery +5. **Secure** - Support for custom headers for authentication +6. **Configurable** - Global defaults or per-job configuration + +## TypeScript Client Example + +```typescript +interface WebhookConfig { + webhook_url: string; + webhook_data_in_payload?: boolean; + webhook_headers?: Record; +} + +interface CrawlJobRequest { + urls: string[]; + browser_config?: Record; + crawler_config?: Record; + webhook_config?: WebhookConfig; +} + +async function createCrawlJob(request: CrawlJobRequest) { + const response = await fetch('http://localhost:11235/crawl/job', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(request) + }); + + const { task_id } = await response.json(); + return task_id; +} + +// Usage +const taskId = await createCrawlJob({ + urls: ['https://example.com'], + webhook_config: { + webhook_url: 'https://myapp.com/webhooks/crawl-complete', + webhook_data_in_payload: false, + webhook_headers: { + 'X-Webhook-Secret': 'my-secret' + } + } +}); +``` + +## Monitoring and Debugging + +Webhook delivery attempts are logged at INFO level: +- Successful deliveries +- Retry attempts with delays +- Final failures after max attempts + +Check the application logs for webhook delivery status: +```bash +docker logs crawl4ai-container | grep -i webhook +``` diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 58d8c01f..0d717183 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -44,6 +44,7 @@ from utils import ( get_llm_api_key, validate_llm_provider ) +from webhook import WebhookDeliveryService import psutil, time @@ -567,6 +568,7 @@ async def handle_crawl_job( browser_config: Dict, crawler_config: Dict, config: Dict, + webhook_config: Optional[Dict] = None, ) -> Dict: """ Fire-and-forget version of handle_crawl_request. @@ -574,13 +576,24 @@ async def handle_crawl_job( lets /crawl/job/{task_id} polling fetch the result. """ task_id = f"crawl_{uuid4().hex[:8]}" - await redis.hset(f"task:{task_id}", mapping={ + + # Store task data in Redis + task_data = { "status": TaskStatus.PROCESSING, # <-- keep enum values consistent "created_at": datetime.utcnow().isoformat(), "url": json.dumps(urls), # store list as JSON string "result": "", "error": "", - }) + } + + # Store webhook config if provided + if webhook_config: + task_data["webhook_config"] = json.dumps(webhook_config) + + await redis.hset(f"task:{task_id}", mapping=task_data) + + # Initialize webhook service + webhook_service = WebhookDeliveryService(config) async def _runner(): try: @@ -594,6 +607,17 @@ async def handle_crawl_job( "status": TaskStatus.COMPLETED, "result": json.dumps(result), }) + + # Send webhook notification on successful completion + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="crawl", + status="completed", + urls=urls, + webhook_config=webhook_config, + result=result + ) + await asyncio.sleep(5) # Give Redis time to process the update except Exception as exc: await redis.hset(f"task:{task_id}", mapping={ @@ -601,5 +625,15 @@ async def handle_crawl_job( "error": str(exc), }) + # Send webhook notification on failure + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="crawl", + status="failed", + urls=urls, + webhook_config=webhook_config, + error=str(exc) + ) + background_tasks.add_task(_runner) return {"task_id": task_id} \ No newline at end of file diff --git a/deploy/docker/config.yml b/deploy/docker/config.yml index c81badc4..9445cd76 100644 --- a/deploy/docker/config.yml +++ b/deploy/docker/config.yml @@ -88,4 +88,17 @@ observability: enabled: True endpoint: "/metrics" health_check: - endpoint: "/health" \ No newline at end of file + endpoint: "/health" + +# Webhook Configuration +webhooks: + enabled: true + default_url: null # Optional: default webhook URL for all jobs + data_in_payload: false # Optional: default behavior for including data + retry: + max_attempts: 5 + initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff + max_delay_ms: 32000 + timeout_ms: 30000 # 30s timeout per webhook call + headers: # Optional: default headers to include + User-Agent: "Crawl4AI-Webhook/1.0" \ No newline at end of file diff --git a/deploy/docker/job.py b/deploy/docker/job.py index 10d83fdd..b92288c9 100644 --- a/deploy/docker/job.py +++ b/deploy/docker/job.py @@ -12,6 +12,7 @@ from api import ( handle_crawl_job, handle_task_status, ) +from schemas import WebhookConfig # ------------- dependency placeholders ------------- _redis = None # will be injected from server.py @@ -43,6 +44,7 @@ class CrawlJobPayload(BaseModel): urls: list[HttpUrl] browser_config: Dict = {} crawler_config: Dict = {} + webhook_config: Optional[WebhookConfig] = None # ---------- LL​M job --------------------------------------------------------- @@ -82,6 +84,10 @@ async def crawl_job_enqueue( background_tasks: BackgroundTasks, _td: Dict = Depends(lambda: _token_dep()), ): + webhook_config = None + if payload.webhook_config: + webhook_config = payload.webhook_config.dict() + return await handle_crawl_job( _redis, background_tasks, @@ -89,6 +95,7 @@ async def crawl_job_enqueue( payload.browser_config, payload.crawler_config, config=_config, + webhook_config=webhook_config, ) diff --git a/deploy/docker/schemas.py b/deploy/docker/schemas.py index 96196633..72f4f016 100644 --- a/deploy/docker/schemas.py +++ b/deploy/docker/schemas.py @@ -1,6 +1,6 @@ from typing import List, Optional, Dict from enum import Enum -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, HttpUrl from utils import FilterType @@ -39,4 +39,22 @@ class JSEndpointRequest(BaseModel): scripts: List[str] = Field( ..., description="List of separated JavaScript snippets to execute" - ) \ No newline at end of file + ) + + +class WebhookConfig(BaseModel): + """Configuration for webhook notifications.""" + webhook_url: HttpUrl + webhook_data_in_payload: bool = False + webhook_headers: Optional[Dict[str, str]] = None + + +class WebhookPayload(BaseModel): + """Payload sent to webhook endpoints.""" + task_id: str + task_type: str # "crawl", "llm_extraction", etc. + status: str # "completed" or "failed" + timestamp: str # ISO 8601 format + urls: List[str] + error: Optional[str] = None + data: Optional[Dict] = None # Included only if webhook_data_in_payload=True \ No newline at end of file diff --git a/deploy/docker/webhook.py b/deploy/docker/webhook.py new file mode 100644 index 00000000..ebee9dff --- /dev/null +++ b/deploy/docker/webhook.py @@ -0,0 +1,159 @@ +""" +Webhook delivery service for Crawl4AI. + +This module provides webhook notification functionality with exponential backoff retry logic. +""" +import asyncio +import httpx +import logging +from typing import Dict, Optional +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + + +class WebhookDeliveryService: + """Handles webhook delivery with exponential backoff retry logic.""" + + def __init__(self, config: Dict): + """ + Initialize the webhook delivery service. + + Args: + config: Application configuration dictionary containing webhook settings + """ + self.config = config.get("webhooks", {}) + self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5) + self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000 + self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000 + self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000 + + async def send_webhook( + self, + webhook_url: str, + payload: Dict, + headers: Optional[Dict[str, str]] = None + ) -> bool: + """ + Send webhook with exponential backoff retry logic. + + Args: + webhook_url: The URL to send the webhook to + payload: The JSON payload to send + headers: Optional custom headers + + Returns: + bool: True if delivered successfully, False otherwise + """ + default_headers = self.config.get("headers", {}) + merged_headers = {**default_headers, **(headers or {})} + merged_headers["Content-Type"] = "application/json" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + for attempt in range(self.max_attempts): + try: + logger.info( + f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}" + ) + + response = await client.post( + webhook_url, + json=payload, + headers=merged_headers + ) + + # Success or client error (don't retry client errors) + if response.status_code < 500: + if 200 <= response.status_code < 300: + logger.info(f"Webhook delivered successfully to {webhook_url}") + return True + else: + logger.warning( + f"Webhook rejected with status {response.status_code}: {response.text[:200]}" + ) + return False # Client error - don't retry + + # Server error - retry with backoff + logger.warning( + f"Webhook failed with status {response.status_code}, will retry" + ) + + except httpx.TimeoutException as exc: + logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}") + except httpx.RequestError as exc: + logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}") + except Exception as exc: + logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}") + + # Calculate exponential backoff delay + if attempt < self.max_attempts - 1: + delay = min(self.initial_delay * (2 ** attempt), self.max_delay) + logger.info(f"Retrying in {delay}s...") + await asyncio.sleep(delay) + + logger.error( + f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}" + ) + return False + + async def notify_job_completion( + self, + task_id: str, + task_type: str, + status: str, + urls: list, + webhook_config: Optional[Dict], + result: Optional[Dict] = None, + error: Optional[str] = None + ): + """ + Notify webhook of job completion. + + Args: + task_id: The task identifier + task_type: Type of task (e.g., "crawl", "llm_extraction") + status: Task status ("completed" or "failed") + urls: List of URLs that were crawled + webhook_config: Webhook configuration from the job request + result: Optional crawl result data + error: Optional error message if failed + """ + # Determine webhook URL + webhook_url = None + data_in_payload = self.config.get("data_in_payload", False) + custom_headers = None + + if webhook_config: + webhook_url = webhook_config.get("webhook_url") + data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload) + custom_headers = webhook_config.get("webhook_headers") + + if not webhook_url: + webhook_url = self.config.get("default_url") + + if not webhook_url: + logger.debug("No webhook URL configured, skipping notification") + return + + # Check if webhooks are enabled + if not self.config.get("enabled", True): + logger.debug("Webhooks are disabled, skipping notification") + return + + # Build payload + payload = { + "task_id": task_id, + "task_type": task_type, + "status": status, + "timestamp": datetime.now(timezone.utc).isoformat(), + "urls": urls + } + + if error: + payload["error"] = error + + if data_in_payload and result: + payload["data"] = result + + # Send webhook (fire and forget - don't block on completion) + await self.send_webhook(webhook_url, payload, custom_headers)