Merge branch 'implement-webhook-crawl-feature-011CULZY1Jy8N5MUkZqXkRVp' into develop

This commit is contained in:
ntohidi
2025-10-22 13:12:25 +02:00
14 changed files with 2579 additions and 10 deletions

View File

@@ -12,6 +12,7 @@
- [Python SDK](#python-sdk)
- [Understanding Request Schema](#understanding-request-schema)
- [REST API Examples](#rest-api-examples)
- [Asynchronous Jobs with Webhooks](#asynchronous-jobs-with-webhooks)
- [Additional API Endpoints](#additional-api-endpoints)
- [HTML Extraction Endpoint](#html-extraction-endpoint)
- [Screenshot Endpoint](#screenshot-endpoint)
@@ -648,6 +649,146 @@ async def test_stream_crawl(token: str = None): # Made token optional
# asyncio.run(test_stream_crawl())
```
### Asynchronous Jobs with Webhooks
For long-running crawls or when you want to avoid keeping connections open, use the job queue endpoints. Instead of polling for results, configure a webhook to receive notifications when jobs complete.
#### Why Use Jobs & Webhooks?
- **No Polling Required** - Get notified when crawls complete instead of constantly checking status
- **Better Resource Usage** - Free up client connections while jobs run in the background
- **Scalable Architecture** - Ideal for high-volume crawling with TypeScript/Node.js clients or microservices
- **Reliable Delivery** - Automatic retry with exponential backoff (5 attempts: 1s → 2s → 4s → 8s → 16s)
#### How It Works
1. **Submit Job** → POST to `/crawl/job` with optional `webhook_config`
2. **Get Task ID** → Receive a `task_id` immediately
3. **Job Runs** → Crawl executes in the background
4. **Webhook Fired** → Server POSTs completion notification to your webhook URL
5. **Fetch Results** → If data wasn't included in webhook, GET `/crawl/job/{task_id}`
#### Quick Example
```bash
# Submit a crawl job with webhook notification
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": false
}
}'
# Response: {"task_id": "crawl_a1b2c3d4"}
```
**Your webhook receives:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"]
}
```
Then fetch the results:
```bash
curl http://localhost:11235/crawl/job/crawl_a1b2c3d4
```
#### Include Data in Webhook
Set `webhook_data_in_payload: true` to receive the full crawl results directly in the webhook:
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": true
}
}'
```
**Your webhook receives the complete data:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"data": {
"markdown": "...",
"html": "...",
"links": {...},
"metadata": {...}
}
}
```
#### Webhook Authentication
Add custom headers for authentication:
```json
{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl",
"webhook_data_in_payload": false,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token",
"X-Service-ID": "crawl4ai-prod"
}
}
}
```
#### Global Default Webhook
Configure a default webhook URL in `config.yml` for all jobs:
```yaml
webhooks:
enabled: true
default_url: "https://myapp.com/webhooks/default"
data_in_payload: false
retry:
max_attempts: 5
initial_delay_ms: 1000
max_delay_ms: 32000
timeout_ms: 30000
```
Now jobs without `webhook_config` automatically use the default webhook.
#### Job Status Polling (Without Webhooks)
If you prefer polling instead of webhooks, just omit `webhook_config`:
```bash
# Submit job
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{"urls": ["https://example.com"]}'
# Response: {"task_id": "crawl_xyz"}
# Poll for status
curl http://localhost:11235/crawl/job/crawl_xyz
```
The response includes `status` field: `"processing"`, `"completed"`, or `"failed"`.
> 💡 **Pro tip**: See [WEBHOOK_EXAMPLES.md](./WEBHOOK_EXAMPLES.md) for detailed examples including TypeScript client code, Flask webhook handlers, and failure handling.
---
## Metrics & Monitoring
@@ -826,10 +967,11 @@ We're here to help you succeed with Crawl4AI! Here's how to get support:
In this guide, we've covered everything you need to get started with Crawl4AI's Docker deployment:
- Building and running the Docker container
- Configuring the environment
- Configuring the environment
- Using the interactive playground for testing
- Making API requests with proper typing
- Using the Python SDK
- Asynchronous job queues with webhook notifications
- Leveraging specialized endpoints for screenshots, PDFs, and JavaScript execution
- Connecting via the Model Context Protocol (MCP)
- Monitoring your deployment

View File

@@ -0,0 +1,378 @@
# Webhook Feature Examples
This document provides examples of how to use the webhook feature for crawl jobs in Crawl4AI.
## Overview
The webhook feature allows you to receive notifications when crawl jobs complete, eliminating the need for polling. Webhooks are sent with exponential backoff retry logic to ensure reliable delivery.
## Configuration
### Global Configuration (config.yml)
You can configure default webhook settings in `config.yml`:
```yaml
webhooks:
enabled: true
default_url: null # Optional: default webhook URL for all jobs
data_in_payload: false # Optional: default behavior for including data
retry:
max_attempts: 5
initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
max_delay_ms: 32000
timeout_ms: 30000 # 30s timeout per webhook call
headers: # Optional: default headers to include
User-Agent: "Crawl4AI-Webhook/1.0"
```
## API Usage Examples
### Example 1: Basic Webhook (Notification Only)
Send a webhook notification without including the crawl data in the payload.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": false
}
}'
```
**Response:**
```json
{
"task_id": "crawl_a1b2c3d4"
}
```
**Webhook Payload Received:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"]
}
```
Your webhook handler should then fetch the results:
```bash
curl http://localhost:11235/crawl/job/crawl_a1b2c3d4
```
### Example 2: Webhook with Data Included
Include the full crawl results in the webhook payload.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": true
}
}'
```
**Webhook Payload Received:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"data": {
"markdown": "...",
"html": "...",
"links": {...},
"metadata": {...}
}
}
```
### Example 3: Webhook with Custom Headers
Include custom headers for authentication or identification.
**Request:**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"],
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/crawl-complete",
"webhook_data_in_payload": false,
"webhook_headers": {
"X-Webhook-Secret": "my-secret-token",
"X-Service-ID": "crawl4ai-production"
}
}
}'
```
The webhook will be sent with these additional headers plus the default headers from config.
### Example 4: Failure Notification
When a crawl job fails, a webhook is sent with error details.
**Webhook Payload on Failure:**
```json
{
"task_id": "crawl_a1b2c3d4",
"task_type": "crawl",
"status": "failed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"error": "Connection timeout after 30s"
}
```
### Example 5: Using Global Default Webhook
If you set a `default_url` in config.yml, jobs without webhook_config will use it:
**config.yml:**
```yaml
webhooks:
enabled: true
default_url: "https://myapp.com/webhooks/default"
data_in_payload: false
```
**Request (no webhook_config needed):**
```bash
curl -X POST http://localhost:11235/crawl/job \
-H "Content-Type: application/json" \
-d '{
"urls": ["https://example.com"]
}'
```
The webhook will be sent to the default URL configured in config.yml.
### Example 6: LLM Extraction Job with Webhook
Use webhooks with the LLM extraction endpoint for asynchronous processing.
**Request:**
```bash
curl -X POST http://localhost:11235/llm/job \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/article",
"q": "Extract the article title, author, and publication date",
"schema": "{\"type\": \"object\", \"properties\": {\"title\": {\"type\": \"string\"}, \"author\": {\"type\": \"string\"}, \"date\": {\"type\": \"string\"}}}",
"cache": false,
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/llm-complete",
"webhook_data_in_payload": true
}
}'
```
**Response:**
```json
{
"task_id": "llm_1698765432_12345"
}
```
**Webhook Payload Received:**
```json
{
"task_id": "llm_1698765432_12345",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"date": "2025-10-21"
}
}
}
```
## Webhook Handler Example
Here's a simple Python Flask webhook handler that supports both crawl and LLM extraction jobs:
```python
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/webhooks/crawl-complete', methods=['POST'])
def handle_crawl_webhook():
payload = request.json
task_id = payload['task_id']
task_type = payload['task_type']
status = payload['status']
if status == 'completed':
# If data not in payload, fetch it
if 'data' not in payload:
# Determine endpoint based on task type
endpoint = 'crawl' if task_type == 'crawl' else 'llm'
response = requests.get(f'http://localhost:11235/{endpoint}/job/{task_id}')
data = response.json()
else:
data = payload['data']
# Process based on task type
if task_type == 'crawl':
print(f"Processing crawl results for {task_id}")
# Handle crawl results
results = data.get('results', [])
for result in results:
print(f" - {result.get('url')}: {len(result.get('markdown', ''))} chars")
elif task_type == 'llm_extraction':
print(f"Processing LLM extraction for {task_id}")
# Handle LLM extraction
# Note: Webhook sends 'extracted_content', API returns 'result'
extracted = data.get('extracted_content', data.get('result', {}))
print(f" - Extracted: {extracted}")
# Your business logic here...
elif status == 'failed':
error = payload.get('error', 'Unknown error')
print(f"{task_type} job {task_id} failed: {error}")
# Handle failure...
return jsonify({"status": "received"}), 200
if __name__ == '__main__':
app.run(port=8080)
```
## Retry Logic
The webhook delivery service uses exponential backoff retry logic:
- **Attempts:** Up to 5 attempts by default
- **Delays:** 1s → 2s → 4s → 8s → 16s
- **Timeout:** 30 seconds per attempt
- **Retry Conditions:**
- Server errors (5xx status codes)
- Network errors
- Timeouts
- **No Retry:**
- Client errors (4xx status codes)
- Successful delivery (2xx status codes)
## Benefits
1. **No Polling Required** - Eliminates constant API calls to check job status
2. **Real-time Notifications** - Immediate notification when jobs complete
3. **Reliable Delivery** - Exponential backoff ensures webhooks are delivered
4. **Flexible** - Choose between notification-only or full data delivery
5. **Secure** - Support for custom headers for authentication
6. **Configurable** - Global defaults or per-job configuration
7. **Universal Support** - Works with both `/crawl/job` and `/llm/job` endpoints
## TypeScript Client Example
```typescript
interface WebhookConfig {
webhook_url: string;
webhook_data_in_payload?: boolean;
webhook_headers?: Record<string, string>;
}
interface CrawlJobRequest {
urls: string[];
browser_config?: Record<string, any>;
crawler_config?: Record<string, any>;
webhook_config?: WebhookConfig;
}
interface LLMJobRequest {
url: string;
q: string;
schema?: string;
cache?: boolean;
provider?: string;
webhook_config?: WebhookConfig;
}
async function createCrawlJob(request: CrawlJobRequest) {
const response = await fetch('http://localhost:11235/crawl/job', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request)
});
const { task_id } = await response.json();
return task_id;
}
async function createLLMJob(request: LLMJobRequest) {
const response = await fetch('http://localhost:11235/llm/job', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request)
});
const { task_id } = await response.json();
return task_id;
}
// Usage - Crawl Job
const crawlTaskId = await createCrawlJob({
urls: ['https://example.com'],
webhook_config: {
webhook_url: 'https://myapp.com/webhooks/crawl-complete',
webhook_data_in_payload: false,
webhook_headers: {
'X-Webhook-Secret': 'my-secret'
}
}
});
// Usage - LLM Extraction Job
const llmTaskId = await createLLMJob({
url: 'https://example.com/article',
q: 'Extract the main points from this article',
provider: 'openai/gpt-4o-mini',
webhook_config: {
webhook_url: 'https://myapp.com/webhooks/llm-complete',
webhook_data_in_payload: true,
webhook_headers: {
'X-Webhook-Secret': 'my-secret'
}
}
});
```
## Monitoring and Debugging
Webhook delivery attempts are logged at INFO level:
- Successful deliveries
- Retry attempts with delays
- Final failures after max attempts
Check the application logs for webhook delivery status:
```bash
docker logs crawl4ai-container | grep -i webhook
```

View File

@@ -46,6 +46,7 @@ from utils import (
get_llm_temperature,
get_llm_base_url
)
from webhook import WebhookDeliveryService
import psutil, time
@@ -120,10 +121,14 @@ async def process_llm_extraction(
schema: Optional[str] = None,
cache: str = "0",
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
) -> None:
"""Process LLM extraction in background."""
# Initialize webhook service
webhook_service = WebhookDeliveryService(config)
try:
# Validate provider
is_valid, error_msg = validate_llm_provider(config, provider)
@@ -132,6 +137,16 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": error_msg
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=error_msg
)
return
api_key = get_llm_api_key(config, provider) # Returns None to let litellm handle it
llm_strategy = LLMExtractionStrategy(
@@ -162,17 +177,40 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": result.error_message
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=result.error_message
)
return
try:
content = json.loads(result.extracted_content)
except json.JSONDecodeError:
content = result.extracted_content
result_data = {"extracted_content": content}
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.COMPLETED,
"result": json.dumps(content)
})
# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="completed",
urls=[url],
webhook_config=webhook_config,
result=result_data
)
except Exception as e:
logger.error(f"LLM extraction error: {str(e)}", exc_info=True)
await redis.hset(f"task:{task_id}", mapping={
@@ -180,6 +218,16 @@ async def process_llm_extraction(
"error": str(e)
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=str(e)
)
async def handle_markdown_request(
url: str,
filter_type: FilterType,
@@ -261,6 +309,7 @@ async def handle_llm_request(
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None,,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
@@ -294,6 +343,7 @@ async def handle_llm_request(
base_url,
config,
provider,
webhook_config,
temperature,
api_base_url
)
@@ -341,6 +391,7 @@ async def create_new_task(
base_url: str,
config: dict,
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
@@ -351,12 +402,18 @@ async def create_new_task(
from datetime import datetime
task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}"
await redis.hset(f"task:{task_id}", mapping={
task_data = {
"status": TaskStatus.PROCESSING,
"created_at": datetime.now().isoformat(),
"url": decoded_url
})
}
# Store webhook config if provided
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)
await redis.hset(f"task:{task_id}", mapping=task_data)
background_tasks.add_task(
process_llm_extraction,
@@ -368,6 +425,7 @@ async def create_new_task(
schema,
cache,
provider,
webhook_config,
temperature,
api_base_url
)
@@ -680,6 +738,7 @@ async def handle_crawl_job(
browser_config: Dict,
crawler_config: Dict,
config: Dict,
webhook_config: Optional[Dict] = None,
) -> Dict:
"""
Fire-and-forget version of handle_crawl_request.
@@ -687,13 +746,24 @@ async def handle_crawl_job(
lets /crawl/job/{task_id} polling fetch the result.
"""
task_id = f"crawl_{uuid4().hex[:8]}"
await redis.hset(f"task:{task_id}", mapping={
# Store task data in Redis
task_data = {
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",
})
}
# Store webhook config if provided
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)
await redis.hset(f"task:{task_id}", mapping=task_data)
# Initialize webhook service
webhook_service = WebhookDeliveryService(config)
async def _runner():
try:
@@ -707,6 +777,17 @@ async def handle_crawl_job(
"status": TaskStatus.COMPLETED,
"result": json.dumps(result),
})
# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="crawl",
status="completed",
urls=urls,
webhook_config=webhook_config,
result=result
)
await asyncio.sleep(5) # Give Redis time to process the update
except Exception as exc:
await redis.hset(f"task:{task_id}", mapping={
@@ -714,5 +795,15 @@ async def handle_crawl_job(
"error": str(exc),
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="crawl",
status="failed",
urls=urls,
webhook_config=webhook_config,
error=str(exc)
)
background_tasks.add_task(_runner)
return {"task_id": task_id}

View File

@@ -87,4 +87,17 @@ observability:
enabled: True
endpoint: "/metrics"
health_check:
endpoint: "/health"
endpoint: "/health"
# Webhook Configuration
webhooks:
enabled: true
default_url: null # Optional: default webhook URL for all jobs
data_in_payload: false # Optional: default behavior for including data
retry:
max_attempts: 5
initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
max_delay_ms: 32000
timeout_ms: 30000 # 30s timeout per webhook call
headers: # Optional: default headers to include
User-Agent: "Crawl4AI-Webhook/1.0"

View File

@@ -12,6 +12,7 @@ from api import (
handle_crawl_job,
handle_task_status,
)
from schemas import WebhookConfig
# ------------- dependency placeholders -------------
_redis = None # will be injected from server.py
@@ -37,6 +38,7 @@ class LlmJobPayload(BaseModel):
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
webhook_config: Optional[WebhookConfig] = None
temperature: Optional[float] = None
base_url: Optional[str] = None
@@ -45,6 +47,7 @@ class CrawlJobPayload(BaseModel):
urls: list[HttpUrl]
browser_config: Dict = {}
crawler_config: Dict = {}
webhook_config: Optional[WebhookConfig] = None
# ---------- LLM job ---------------------------------------------------------
@@ -55,6 +58,10 @@ async def llm_job_enqueue(
request: Request,
_td: Dict = Depends(lambda: _token_dep()), # late-bound dep
):
webhook_config = None
if payload.webhook_config:
webhook_config = payload.webhook_config.model_dump(mode='json')
return await handle_llm_request(
_redis,
background_tasks,
@@ -65,6 +72,7 @@ async def llm_job_enqueue(
cache=payload.cache,
config=_config,
provider=payload.provider,
webhook_config=webhook_config,
temperature=payload.temperature,
api_base_url=payload.base_url,
)
@@ -86,6 +94,10 @@ async def crawl_job_enqueue(
background_tasks: BackgroundTasks,
_td: Dict = Depends(lambda: _token_dep()),
):
webhook_config = None
if payload.webhook_config:
webhook_config = payload.webhook_config.model_dump(mode='json')
return await handle_crawl_job(
_redis,
background_tasks,
@@ -93,6 +105,7 @@ async def crawl_job_enqueue(
payload.browser_config,
payload.crawler_config,
config=_config,
webhook_config=webhook_config,
)

View File

@@ -12,6 +12,6 @@ pydantic>=2.11
rank-bm25==0.2.2
anyio==4.9.0
PyJWT==2.10.1
mcp>=1.6.0
mcp>=1.18.0
websockets>=15.0.1
httpx[http2]>=0.27.2

View File

@@ -1,6 +1,6 @@
from typing import List, Optional, Dict
from enum import Enum
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, HttpUrl
from utils import FilterType
@@ -85,4 +85,22 @@ class JSEndpointRequest(BaseModel):
scripts: List[str] = Field(
...,
description="List of separated JavaScript snippets to execute"
)
)
class WebhookConfig(BaseModel):
"""Configuration for webhook notifications."""
webhook_url: HttpUrl
webhook_data_in_payload: bool = False
webhook_headers: Optional[Dict[str, str]] = None
class WebhookPayload(BaseModel):
"""Payload sent to webhook endpoints."""
task_id: str
task_type: str # "crawl", "llm_extraction", etc.
status: str # "completed" or "failed"
timestamp: str # ISO 8601 format
urls: List[str]
error: Optional[str] = None
data: Optional[Dict] = None # Included only if webhook_data_in_payload=True

159
deploy/docker/webhook.py Normal file
View File

@@ -0,0 +1,159 @@
"""
Webhook delivery service for Crawl4AI.
This module provides webhook notification functionality with exponential backoff retry logic.
"""
import asyncio
import httpx
import logging
from typing import Dict, Optional
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class WebhookDeliveryService:
"""Handles webhook delivery with exponential backoff retry logic."""
def __init__(self, config: Dict):
"""
Initialize the webhook delivery service.
Args:
config: Application configuration dictionary containing webhook settings
"""
self.config = config.get("webhooks", {})
self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5)
self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000
self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000
self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000
async def send_webhook(
self,
webhook_url: str,
payload: Dict,
headers: Optional[Dict[str, str]] = None
) -> bool:
"""
Send webhook with exponential backoff retry logic.
Args:
webhook_url: The URL to send the webhook to
payload: The JSON payload to send
headers: Optional custom headers
Returns:
bool: True if delivered successfully, False otherwise
"""
default_headers = self.config.get("headers", {})
merged_headers = {**default_headers, **(headers or {})}
merged_headers["Content-Type"] = "application/json"
async with httpx.AsyncClient(timeout=self.timeout) as client:
for attempt in range(self.max_attempts):
try:
logger.info(
f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}"
)
response = await client.post(
webhook_url,
json=payload,
headers=merged_headers
)
# Success or client error (don't retry client errors)
if response.status_code < 500:
if 200 <= response.status_code < 300:
logger.info(f"Webhook delivered successfully to {webhook_url}")
return True
else:
logger.warning(
f"Webhook rejected with status {response.status_code}: {response.text[:200]}"
)
return False # Client error - don't retry
# Server error - retry with backoff
logger.warning(
f"Webhook failed with status {response.status_code}, will retry"
)
except httpx.TimeoutException as exc:
logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}")
except httpx.RequestError as exc:
logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}")
except Exception as exc:
logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}")
# Calculate exponential backoff delay
if attempt < self.max_attempts - 1:
delay = min(self.initial_delay * (2 ** attempt), self.max_delay)
logger.info(f"Retrying in {delay}s...")
await asyncio.sleep(delay)
logger.error(
f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}"
)
return False
async def notify_job_completion(
self,
task_id: str,
task_type: str,
status: str,
urls: list,
webhook_config: Optional[Dict],
result: Optional[Dict] = None,
error: Optional[str] = None
):
"""
Notify webhook of job completion.
Args:
task_id: The task identifier
task_type: Type of task (e.g., "crawl", "llm_extraction")
status: Task status ("completed" or "failed")
urls: List of URLs that were crawled
webhook_config: Webhook configuration from the job request
result: Optional crawl result data
error: Optional error message if failed
"""
# Determine webhook URL
webhook_url = None
data_in_payload = self.config.get("data_in_payload", False)
custom_headers = None
if webhook_config:
webhook_url = webhook_config.get("webhook_url")
data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload)
custom_headers = webhook_config.get("webhook_headers")
if not webhook_url:
webhook_url = self.config.get("default_url")
if not webhook_url:
logger.debug("No webhook URL configured, skipping notification")
return
# Check if webhooks are enabled
if not self.config.get("enabled", True):
logger.debug("Webhooks are disabled, skipping notification")
return
# Build payload
payload = {
"task_id": task_id,
"task_type": task_type,
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
"urls": urls
}
if error:
payload["error"] = error
if data_in_payload and result:
payload["data"] = result
# Send webhook (fire and forget - don't block on completion)
await self.send_webhook(webhook_url, payload, custom_headers)