diff --git a/README.md b/README.md
index 45f11560..0c5d4091 100644
--- a/README.md
+++ b/README.md
@@ -919,6 +919,36 @@ We envision a future where AI is powered by real human knowledge, ensuring data
For more details, see our [full mission statement](./MISSION.md).
+## π Current Sponsors
+
+### π’ Enterprise Sponsors & Partners
+
+Our enterprise sponsors and technology partners help scale Crawl4AI to power production-grade data pipelines.
+
+| Company | About | Sponsorship Tier |
+|------|------|----------------------------|
+| | AI-powered Captcha solving service. Supports all major Captcha types, including reCAPTCHA, Cloudflare, and more | π₯ Silver |
+| | Helps engineers and buyers find, compare, and source electronic & industrial parts in seconds, with specs, pricing, lead times & alternatives.| π₯ Gold |
+|
| Kidocode is a hybrid technology and entrepreneurship school for kids aged 5β18, offering both online and on-campus education. | π₯ Gold |
+| | Singapore-based Aleph Null is Asiaβs leading edtech hub, dedicated to student-centric, AI-driven educationβempowering learners with the tools to thrive in a fast-changing world. | π₯ Gold |
+
+### π§βπ€ Individual Sponsors
+
+A heartfelt thanks to our individual supporters! Every contribution helps us keep our opensource mission alive and thriving!
+
+
+
+
+
+
+
+
+
+
+
+
+> Want to join them? [Sponsor Crawl4AI β](https://github.com/sponsors/unclecode)
+
## Star History
[](https://star-history.com/#unclecode/crawl4ai&Date)
diff --git a/deploy/docker/README.md b/deploy/docker/README.md
index d35050cc..9206250e 100644
--- a/deploy/docker/README.md
+++ b/deploy/docker/README.md
@@ -12,6 +12,7 @@
- [Python SDK](#python-sdk)
- [Understanding Request Schema](#understanding-request-schema)
- [REST API Examples](#rest-api-examples)
+ - [Asynchronous Jobs with Webhooks](#asynchronous-jobs-with-webhooks)
- [Additional API Endpoints](#additional-api-endpoints)
- [HTML Extraction Endpoint](#html-extraction-endpoint)
- [Screenshot Endpoint](#screenshot-endpoint)
@@ -648,6 +649,146 @@ async def test_stream_crawl(token: str = None): # Made token optional
# asyncio.run(test_stream_crawl())
```
+### Asynchronous Jobs with Webhooks
+
+For long-running crawls or when you want to avoid keeping connections open, use the job queue endpoints. Instead of polling for results, configure a webhook to receive notifications when jobs complete.
+
+#### Why Use Jobs & Webhooks?
+
+- **No Polling Required** - Get notified when crawls complete instead of constantly checking status
+- **Better Resource Usage** - Free up client connections while jobs run in the background
+- **Scalable Architecture** - Ideal for high-volume crawling with TypeScript/Node.js clients or microservices
+- **Reliable Delivery** - Automatic retry with exponential backoff (5 attempts: 1s β 2s β 4s β 8s β 16s)
+
+#### How It Works
+
+1. **Submit Job** β POST to `/crawl/job` with optional `webhook_config`
+2. **Get Task ID** β Receive a `task_id` immediately
+3. **Job Runs** β Crawl executes in the background
+4. **Webhook Fired** β Server POSTs completion notification to your webhook URL
+5. **Fetch Results** β If data wasn't included in webhook, GET `/crawl/job/{task_id}`
+
+#### Quick Example
+
+```bash
+# Submit a crawl job with webhook notification
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl-complete",
+ "webhook_data_in_payload": false
+ }
+ }'
+
+# Response: {"task_id": "crawl_a1b2c3d4"}
+```
+
+**Your webhook receives:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"]
+}
+```
+
+Then fetch the results:
+```bash
+curl http://localhost:11235/crawl/job/crawl_a1b2c3d4
+```
+
+#### Include Data in Webhook
+
+Set `webhook_data_in_payload: true` to receive the full crawl results directly in the webhook:
+
+```bash
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl-complete",
+ "webhook_data_in_payload": true
+ }
+ }'
+```
+
+**Your webhook receives the complete data:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"],
+ "data": {
+ "markdown": "...",
+ "html": "...",
+ "links": {...},
+ "metadata": {...}
+ }
+}
+```
+
+#### Webhook Authentication
+
+Add custom headers for authentication:
+
+```json
+{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl",
+ "webhook_data_in_payload": false,
+ "webhook_headers": {
+ "X-Webhook-Secret": "your-secret-token",
+ "X-Service-ID": "crawl4ai-prod"
+ }
+ }
+}
+```
+
+#### Global Default Webhook
+
+Configure a default webhook URL in `config.yml` for all jobs:
+
+```yaml
+webhooks:
+ enabled: true
+ default_url: "https://myapp.com/webhooks/default"
+ data_in_payload: false
+ retry:
+ max_attempts: 5
+ initial_delay_ms: 1000
+ max_delay_ms: 32000
+ timeout_ms: 30000
+```
+
+Now jobs without `webhook_config` automatically use the default webhook.
+
+#### Job Status Polling (Without Webhooks)
+
+If you prefer polling instead of webhooks, just omit `webhook_config`:
+
+```bash
+# Submit job
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{"urls": ["https://example.com"]}'
+# Response: {"task_id": "crawl_xyz"}
+
+# Poll for status
+curl http://localhost:11235/crawl/job/crawl_xyz
+```
+
+The response includes `status` field: `"processing"`, `"completed"`, or `"failed"`.
+
+> π‘ **Pro tip**: See [WEBHOOK_EXAMPLES.md](./WEBHOOK_EXAMPLES.md) for detailed examples including TypeScript client code, Flask webhook handlers, and failure handling.
+
---
## Metrics & Monitoring
@@ -826,10 +967,11 @@ We're here to help you succeed with Crawl4AI! Here's how to get support:
In this guide, we've covered everything you need to get started with Crawl4AI's Docker deployment:
- Building and running the Docker container
-- Configuring the environment
+- Configuring the environment
- Using the interactive playground for testing
- Making API requests with proper typing
- Using the Python SDK
+- Asynchronous job queues with webhook notifications
- Leveraging specialized endpoints for screenshots, PDFs, and JavaScript execution
- Connecting via the Model Context Protocol (MCP)
- Monitoring your deployment
diff --git a/deploy/docker/WEBHOOK_EXAMPLES.md b/deploy/docker/WEBHOOK_EXAMPLES.md
new file mode 100644
index 00000000..190efb18
--- /dev/null
+++ b/deploy/docker/WEBHOOK_EXAMPLES.md
@@ -0,0 +1,378 @@
+# Webhook Feature Examples
+
+This document provides examples of how to use the webhook feature for crawl jobs in Crawl4AI.
+
+## Overview
+
+The webhook feature allows you to receive notifications when crawl jobs complete, eliminating the need for polling. Webhooks are sent with exponential backoff retry logic to ensure reliable delivery.
+
+## Configuration
+
+### Global Configuration (config.yml)
+
+You can configure default webhook settings in `config.yml`:
+
+```yaml
+webhooks:
+ enabled: true
+ default_url: null # Optional: default webhook URL for all jobs
+ data_in_payload: false # Optional: default behavior for including data
+ retry:
+ max_attempts: 5
+ initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
+ max_delay_ms: 32000
+ timeout_ms: 30000 # 30s timeout per webhook call
+ headers: # Optional: default headers to include
+ User-Agent: "Crawl4AI-Webhook/1.0"
+```
+
+## API Usage Examples
+
+### Example 1: Basic Webhook (Notification Only)
+
+Send a webhook notification without including the crawl data in the payload.
+
+**Request:**
+```bash
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl-complete",
+ "webhook_data_in_payload": false
+ }
+ }'
+```
+
+**Response:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4"
+}
+```
+
+**Webhook Payload Received:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"]
+}
+```
+
+Your webhook handler should then fetch the results:
+```bash
+curl http://localhost:11235/crawl/job/crawl_a1b2c3d4
+```
+
+### Example 2: Webhook with Data Included
+
+Include the full crawl results in the webhook payload.
+
+**Request:**
+```bash
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl-complete",
+ "webhook_data_in_payload": true
+ }
+ }'
+```
+
+**Webhook Payload Received:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"],
+ "data": {
+ "markdown": "...",
+ "html": "...",
+ "links": {...},
+ "metadata": {...}
+ }
+}
+```
+
+### Example 3: Webhook with Custom Headers
+
+Include custom headers for authentication or identification.
+
+**Request:**
+```bash
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"],
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/crawl-complete",
+ "webhook_data_in_payload": false,
+ "webhook_headers": {
+ "X-Webhook-Secret": "my-secret-token",
+ "X-Service-ID": "crawl4ai-production"
+ }
+ }
+ }'
+```
+
+The webhook will be sent with these additional headers plus the default headers from config.
+
+### Example 4: Failure Notification
+
+When a crawl job fails, a webhook is sent with error details.
+
+**Webhook Payload on Failure:**
+```json
+{
+ "task_id": "crawl_a1b2c3d4",
+ "task_type": "crawl",
+ "status": "failed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"],
+ "error": "Connection timeout after 30s"
+}
+```
+
+### Example 5: Using Global Default Webhook
+
+If you set a `default_url` in config.yml, jobs without webhook_config will use it:
+
+**config.yml:**
+```yaml
+webhooks:
+ enabled: true
+ default_url: "https://myapp.com/webhooks/default"
+ data_in_payload: false
+```
+
+**Request (no webhook_config needed):**
+```bash
+curl -X POST http://localhost:11235/crawl/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "urls": ["https://example.com"]
+ }'
+```
+
+The webhook will be sent to the default URL configured in config.yml.
+
+### Example 6: LLM Extraction Job with Webhook
+
+Use webhooks with the LLM extraction endpoint for asynchronous processing.
+
+**Request:**
+```bash
+curl -X POST http://localhost:11235/llm/job \
+ -H "Content-Type: application/json" \
+ -d '{
+ "url": "https://example.com/article",
+ "q": "Extract the article title, author, and publication date",
+ "schema": "{\"type\": \"object\", \"properties\": {\"title\": {\"type\": \"string\"}, \"author\": {\"type\": \"string\"}, \"date\": {\"type\": \"string\"}}}",
+ "cache": false,
+ "provider": "openai/gpt-4o-mini",
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhooks/llm-complete",
+ "webhook_data_in_payload": true
+ }
+ }'
+```
+
+**Response:**
+```json
+{
+ "task_id": "llm_1698765432_12345"
+}
+```
+
+**Webhook Payload Received:**
+```json
+{
+ "task_id": "llm_1698765432_12345",
+ "task_type": "llm_extraction",
+ "status": "completed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com/article"],
+ "data": {
+ "extracted_content": {
+ "title": "Understanding Web Scraping",
+ "author": "John Doe",
+ "date": "2025-10-21"
+ }
+ }
+}
+```
+
+## Webhook Handler Example
+
+Here's a simple Python Flask webhook handler that supports both crawl and LLM extraction jobs:
+
+```python
+from flask import Flask, request, jsonify
+import requests
+
+app = Flask(__name__)
+
+@app.route('/webhooks/crawl-complete', methods=['POST'])
+def handle_crawl_webhook():
+ payload = request.json
+
+ task_id = payload['task_id']
+ task_type = payload['task_type']
+ status = payload['status']
+
+ if status == 'completed':
+ # If data not in payload, fetch it
+ if 'data' not in payload:
+ # Determine endpoint based on task type
+ endpoint = 'crawl' if task_type == 'crawl' else 'llm'
+ response = requests.get(f'http://localhost:11235/{endpoint}/job/{task_id}')
+ data = response.json()
+ else:
+ data = payload['data']
+
+ # Process based on task type
+ if task_type == 'crawl':
+ print(f"Processing crawl results for {task_id}")
+ # Handle crawl results
+ results = data.get('results', [])
+ for result in results:
+ print(f" - {result.get('url')}: {len(result.get('markdown', ''))} chars")
+
+ elif task_type == 'llm_extraction':
+ print(f"Processing LLM extraction for {task_id}")
+ # Handle LLM extraction
+ # Note: Webhook sends 'extracted_content', API returns 'result'
+ extracted = data.get('extracted_content', data.get('result', {}))
+ print(f" - Extracted: {extracted}")
+
+ # Your business logic here...
+
+ elif status == 'failed':
+ error = payload.get('error', 'Unknown error')
+ print(f"{task_type} job {task_id} failed: {error}")
+ # Handle failure...
+
+ return jsonify({"status": "received"}), 200
+
+if __name__ == '__main__':
+ app.run(port=8080)
+```
+
+## Retry Logic
+
+The webhook delivery service uses exponential backoff retry logic:
+
+- **Attempts:** Up to 5 attempts by default
+- **Delays:** 1s β 2s β 4s β 8s β 16s
+- **Timeout:** 30 seconds per attempt
+- **Retry Conditions:**
+ - Server errors (5xx status codes)
+ - Network errors
+ - Timeouts
+- **No Retry:**
+ - Client errors (4xx status codes)
+ - Successful delivery (2xx status codes)
+
+## Benefits
+
+1. **No Polling Required** - Eliminates constant API calls to check job status
+2. **Real-time Notifications** - Immediate notification when jobs complete
+3. **Reliable Delivery** - Exponential backoff ensures webhooks are delivered
+4. **Flexible** - Choose between notification-only or full data delivery
+5. **Secure** - Support for custom headers for authentication
+6. **Configurable** - Global defaults or per-job configuration
+7. **Universal Support** - Works with both `/crawl/job` and `/llm/job` endpoints
+
+## TypeScript Client Example
+
+```typescript
+interface WebhookConfig {
+ webhook_url: string;
+ webhook_data_in_payload?: boolean;
+ webhook_headers?: Record;
+}
+
+interface CrawlJobRequest {
+ urls: string[];
+ browser_config?: Record;
+ crawler_config?: Record;
+ webhook_config?: WebhookConfig;
+}
+
+interface LLMJobRequest {
+ url: string;
+ q: string;
+ schema?: string;
+ cache?: boolean;
+ provider?: string;
+ webhook_config?: WebhookConfig;
+}
+
+async function createCrawlJob(request: CrawlJobRequest) {
+ const response = await fetch('http://localhost:11235/crawl/job', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify(request)
+ });
+
+ const { task_id } = await response.json();
+ return task_id;
+}
+
+async function createLLMJob(request: LLMJobRequest) {
+ const response = await fetch('http://localhost:11235/llm/job', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify(request)
+ });
+
+ const { task_id } = await response.json();
+ return task_id;
+}
+
+// Usage - Crawl Job
+const crawlTaskId = await createCrawlJob({
+ urls: ['https://example.com'],
+ webhook_config: {
+ webhook_url: 'https://myapp.com/webhooks/crawl-complete',
+ webhook_data_in_payload: false,
+ webhook_headers: {
+ 'X-Webhook-Secret': 'my-secret'
+ }
+ }
+});
+
+// Usage - LLM Extraction Job
+const llmTaskId = await createLLMJob({
+ url: 'https://example.com/article',
+ q: 'Extract the main points from this article',
+ provider: 'openai/gpt-4o-mini',
+ webhook_config: {
+ webhook_url: 'https://myapp.com/webhooks/llm-complete',
+ webhook_data_in_payload: true,
+ webhook_headers: {
+ 'X-Webhook-Secret': 'my-secret'
+ }
+ }
+});
+```
+
+## Monitoring and Debugging
+
+Webhook delivery attempts are logged at INFO level:
+- Successful deliveries
+- Retry attempts with delays
+- Final failures after max attempts
+
+Check the application logs for webhook delivery status:
+```bash
+docker logs crawl4ai-container | grep -i webhook
+```
diff --git a/deploy/docker/api.py b/deploy/docker/api.py
index d0127e7b..417d6a01 100644
--- a/deploy/docker/api.py
+++ b/deploy/docker/api.py
@@ -46,6 +46,7 @@ from utils import (
get_llm_temperature,
get_llm_base_url
)
+from webhook import WebhookDeliveryService
import psutil, time
@@ -120,10 +121,14 @@ async def process_llm_extraction(
schema: Optional[str] = None,
cache: str = "0",
provider: Optional[str] = None,
+ webhook_config: Optional[Dict] = None,
temperature: Optional[float] = None,
base_url: Optional[str] = None
) -> None:
"""Process LLM extraction in background."""
+ # Initialize webhook service
+ webhook_service = WebhookDeliveryService(config)
+
try:
# Validate provider
is_valid, error_msg = validate_llm_provider(config, provider)
@@ -132,6 +137,16 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": error_msg
})
+
+ # Send webhook notification on failure
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="llm_extraction",
+ status="failed",
+ urls=[url],
+ webhook_config=webhook_config,
+ error=error_msg
+ )
return
api_key = get_llm_api_key(config, provider) # Returns None to let litellm handle it
llm_strategy = LLMExtractionStrategy(
@@ -162,17 +177,40 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": result.error_message
})
+
+ # Send webhook notification on failure
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="llm_extraction",
+ status="failed",
+ urls=[url],
+ webhook_config=webhook_config,
+ error=result.error_message
+ )
return
try:
content = json.loads(result.extracted_content)
except json.JSONDecodeError:
content = result.extracted_content
+
+ result_data = {"extracted_content": content}
+
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.COMPLETED,
"result": json.dumps(content)
})
+ # Send webhook notification on successful completion
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="llm_extraction",
+ status="completed",
+ urls=[url],
+ webhook_config=webhook_config,
+ result=result_data
+ )
+
except Exception as e:
logger.error(f"LLM extraction error: {str(e)}", exc_info=True)
await redis.hset(f"task:{task_id}", mapping={
@@ -180,6 +218,16 @@ async def process_llm_extraction(
"error": str(e)
})
+ # Send webhook notification on failure
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="llm_extraction",
+ status="failed",
+ urls=[url],
+ webhook_config=webhook_config,
+ error=str(e)
+ )
+
async def handle_markdown_request(
url: str,
filter_type: FilterType,
@@ -261,6 +309,7 @@ async def handle_llm_request(
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None,
+ webhook_config: Optional[Dict] = None,,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
@@ -294,6 +343,7 @@ async def handle_llm_request(
base_url,
config,
provider,
+ webhook_config,
temperature,
api_base_url
)
@@ -341,6 +391,7 @@ async def create_new_task(
base_url: str,
config: dict,
provider: Optional[str] = None,
+ webhook_config: Optional[Dict] = None,
temperature: Optional[float] = None,
api_base_url: Optional[str] = None
) -> JSONResponse:
@@ -351,12 +402,18 @@ async def create_new_task(
from datetime import datetime
task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}"
-
- await redis.hset(f"task:{task_id}", mapping={
+
+ task_data = {
"status": TaskStatus.PROCESSING,
"created_at": datetime.now().isoformat(),
"url": decoded_url
- })
+ }
+
+ # Store webhook config if provided
+ if webhook_config:
+ task_data["webhook_config"] = json.dumps(webhook_config)
+
+ await redis.hset(f"task:{task_id}", mapping=task_data)
background_tasks.add_task(
process_llm_extraction,
@@ -368,6 +425,7 @@ async def create_new_task(
schema,
cache,
provider,
+ webhook_config,
temperature,
api_base_url
)
@@ -680,6 +738,7 @@ async def handle_crawl_job(
browser_config: Dict,
crawler_config: Dict,
config: Dict,
+ webhook_config: Optional[Dict] = None,
) -> Dict:
"""
Fire-and-forget version of handle_crawl_request.
@@ -687,13 +746,24 @@ async def handle_crawl_job(
lets /crawl/job/{task_id} polling fetch the result.
"""
task_id = f"crawl_{uuid4().hex[:8]}"
- await redis.hset(f"task:{task_id}", mapping={
+
+ # Store task data in Redis
+ task_data = {
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",
- })
+ }
+
+ # Store webhook config if provided
+ if webhook_config:
+ task_data["webhook_config"] = json.dumps(webhook_config)
+
+ await redis.hset(f"task:{task_id}", mapping=task_data)
+
+ # Initialize webhook service
+ webhook_service = WebhookDeliveryService(config)
async def _runner():
try:
@@ -707,6 +777,17 @@ async def handle_crawl_job(
"status": TaskStatus.COMPLETED,
"result": json.dumps(result),
})
+
+ # Send webhook notification on successful completion
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="crawl",
+ status="completed",
+ urls=urls,
+ webhook_config=webhook_config,
+ result=result
+ )
+
await asyncio.sleep(5) # Give Redis time to process the update
except Exception as exc:
await redis.hset(f"task:{task_id}", mapping={
@@ -714,5 +795,15 @@ async def handle_crawl_job(
"error": str(exc),
})
+ # Send webhook notification on failure
+ await webhook_service.notify_job_completion(
+ task_id=task_id,
+ task_type="crawl",
+ status="failed",
+ urls=urls,
+ webhook_config=webhook_config,
+ error=str(exc)
+ )
+
background_tasks.add_task(_runner)
return {"task_id": task_id}
\ No newline at end of file
diff --git a/deploy/docker/config.yml b/deploy/docker/config.yml
index 35371375..22878b59 100644
--- a/deploy/docker/config.yml
+++ b/deploy/docker/config.yml
@@ -87,4 +87,17 @@ observability:
enabled: True
endpoint: "/metrics"
health_check:
- endpoint: "/health"
\ No newline at end of file
+ endpoint: "/health"
+
+# Webhook Configuration
+webhooks:
+ enabled: true
+ default_url: null # Optional: default webhook URL for all jobs
+ data_in_payload: false # Optional: default behavior for including data
+ retry:
+ max_attempts: 5
+ initial_delay_ms: 1000 # 1s, 2s, 4s, 8s, 16s exponential backoff
+ max_delay_ms: 32000
+ timeout_ms: 30000 # 30s timeout per webhook call
+ headers: # Optional: default headers to include
+ User-Agent: "Crawl4AI-Webhook/1.0"
\ No newline at end of file
diff --git a/deploy/docker/job.py b/deploy/docker/job.py
index 823dd8c8..8fae16cd 100644
--- a/deploy/docker/job.py
+++ b/deploy/docker/job.py
@@ -12,6 +12,7 @@ from api import (
handle_crawl_job,
handle_task_status,
)
+from schemas import WebhookConfig
# ------------- dependency placeholders -------------
_redis = None # will be injected from server.py
@@ -37,6 +38,7 @@ class LlmJobPayload(BaseModel):
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
+ webhook_config: Optional[WebhookConfig] = None
temperature: Optional[float] = None
base_url: Optional[str] = None
@@ -45,6 +47,7 @@ class CrawlJobPayload(BaseModel):
urls: list[HttpUrl]
browser_config: Dict = {}
crawler_config: Dict = {}
+ webhook_config: Optional[WebhookConfig] = None
# ---------- LLβM job ---------------------------------------------------------
@@ -55,6 +58,10 @@ async def llm_job_enqueue(
request: Request,
_td: Dict = Depends(lambda: _token_dep()), # late-bound dep
):
+ webhook_config = None
+ if payload.webhook_config:
+ webhook_config = payload.webhook_config.model_dump(mode='json')
+
return await handle_llm_request(
_redis,
background_tasks,
@@ -65,6 +72,7 @@ async def llm_job_enqueue(
cache=payload.cache,
config=_config,
provider=payload.provider,
+ webhook_config=webhook_config,
temperature=payload.temperature,
api_base_url=payload.base_url,
)
@@ -86,6 +94,10 @@ async def crawl_job_enqueue(
background_tasks: BackgroundTasks,
_td: Dict = Depends(lambda: _token_dep()),
):
+ webhook_config = None
+ if payload.webhook_config:
+ webhook_config = payload.webhook_config.model_dump(mode='json')
+
return await handle_crawl_job(
_redis,
background_tasks,
@@ -93,6 +105,7 @@ async def crawl_job_enqueue(
payload.browser_config,
payload.crawler_config,
config=_config,
+ webhook_config=webhook_config,
)
diff --git a/deploy/docker/requirements.txt b/deploy/docker/requirements.txt
index d463c641..b33c081f 100644
--- a/deploy/docker/requirements.txt
+++ b/deploy/docker/requirements.txt
@@ -12,6 +12,6 @@ pydantic>=2.11
rank-bm25==0.2.2
anyio==4.9.0
PyJWT==2.10.1
-mcp>=1.6.0
+mcp>=1.18.0
websockets>=15.0.1
httpx[http2]>=0.27.2
diff --git a/deploy/docker/schemas.py b/deploy/docker/schemas.py
index 792936bb..21d47fc4 100644
--- a/deploy/docker/schemas.py
+++ b/deploy/docker/schemas.py
@@ -1,6 +1,6 @@
from typing import List, Optional, Dict
from enum import Enum
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, Field, HttpUrl
from utils import FilterType
@@ -85,4 +85,22 @@ class JSEndpointRequest(BaseModel):
scripts: List[str] = Field(
...,
description="List of separated JavaScript snippets to execute"
- )
\ No newline at end of file
+ )
+
+
+class WebhookConfig(BaseModel):
+ """Configuration for webhook notifications."""
+ webhook_url: HttpUrl
+ webhook_data_in_payload: bool = False
+ webhook_headers: Optional[Dict[str, str]] = None
+
+
+class WebhookPayload(BaseModel):
+ """Payload sent to webhook endpoints."""
+ task_id: str
+ task_type: str # "crawl", "llm_extraction", etc.
+ status: str # "completed" or "failed"
+ timestamp: str # ISO 8601 format
+ urls: List[str]
+ error: Optional[str] = None
+ data: Optional[Dict] = None # Included only if webhook_data_in_payload=True
\ No newline at end of file
diff --git a/deploy/docker/webhook.py b/deploy/docker/webhook.py
new file mode 100644
index 00000000..ebee9dff
--- /dev/null
+++ b/deploy/docker/webhook.py
@@ -0,0 +1,159 @@
+"""
+Webhook delivery service for Crawl4AI.
+
+This module provides webhook notification functionality with exponential backoff retry logic.
+"""
+import asyncio
+import httpx
+import logging
+from typing import Dict, Optional
+from datetime import datetime, timezone
+
+logger = logging.getLogger(__name__)
+
+
+class WebhookDeliveryService:
+ """Handles webhook delivery with exponential backoff retry logic."""
+
+ def __init__(self, config: Dict):
+ """
+ Initialize the webhook delivery service.
+
+ Args:
+ config: Application configuration dictionary containing webhook settings
+ """
+ self.config = config.get("webhooks", {})
+ self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5)
+ self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000
+ self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000
+ self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000
+
+ async def send_webhook(
+ self,
+ webhook_url: str,
+ payload: Dict,
+ headers: Optional[Dict[str, str]] = None
+ ) -> bool:
+ """
+ Send webhook with exponential backoff retry logic.
+
+ Args:
+ webhook_url: The URL to send the webhook to
+ payload: The JSON payload to send
+ headers: Optional custom headers
+
+ Returns:
+ bool: True if delivered successfully, False otherwise
+ """
+ default_headers = self.config.get("headers", {})
+ merged_headers = {**default_headers, **(headers or {})}
+ merged_headers["Content-Type"] = "application/json"
+
+ async with httpx.AsyncClient(timeout=self.timeout) as client:
+ for attempt in range(self.max_attempts):
+ try:
+ logger.info(
+ f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}"
+ )
+
+ response = await client.post(
+ webhook_url,
+ json=payload,
+ headers=merged_headers
+ )
+
+ # Success or client error (don't retry client errors)
+ if response.status_code < 500:
+ if 200 <= response.status_code < 300:
+ logger.info(f"Webhook delivered successfully to {webhook_url}")
+ return True
+ else:
+ logger.warning(
+ f"Webhook rejected with status {response.status_code}: {response.text[:200]}"
+ )
+ return False # Client error - don't retry
+
+ # Server error - retry with backoff
+ logger.warning(
+ f"Webhook failed with status {response.status_code}, will retry"
+ )
+
+ except httpx.TimeoutException as exc:
+ logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}")
+ except httpx.RequestError as exc:
+ logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}")
+ except Exception as exc:
+ logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}")
+
+ # Calculate exponential backoff delay
+ if attempt < self.max_attempts - 1:
+ delay = min(self.initial_delay * (2 ** attempt), self.max_delay)
+ logger.info(f"Retrying in {delay}s...")
+ await asyncio.sleep(delay)
+
+ logger.error(
+ f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}"
+ )
+ return False
+
+ async def notify_job_completion(
+ self,
+ task_id: str,
+ task_type: str,
+ status: str,
+ urls: list,
+ webhook_config: Optional[Dict],
+ result: Optional[Dict] = None,
+ error: Optional[str] = None
+ ):
+ """
+ Notify webhook of job completion.
+
+ Args:
+ task_id: The task identifier
+ task_type: Type of task (e.g., "crawl", "llm_extraction")
+ status: Task status ("completed" or "failed")
+ urls: List of URLs that were crawled
+ webhook_config: Webhook configuration from the job request
+ result: Optional crawl result data
+ error: Optional error message if failed
+ """
+ # Determine webhook URL
+ webhook_url = None
+ data_in_payload = self.config.get("data_in_payload", False)
+ custom_headers = None
+
+ if webhook_config:
+ webhook_url = webhook_config.get("webhook_url")
+ data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload)
+ custom_headers = webhook_config.get("webhook_headers")
+
+ if not webhook_url:
+ webhook_url = self.config.get("default_url")
+
+ if not webhook_url:
+ logger.debug("No webhook URL configured, skipping notification")
+ return
+
+ # Check if webhooks are enabled
+ if not self.config.get("enabled", True):
+ logger.debug("Webhooks are disabled, skipping notification")
+ return
+
+ # Build payload
+ payload = {
+ "task_id": task_id,
+ "task_type": task_type,
+ "status": status,
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "urls": urls
+ }
+
+ if error:
+ payload["error"] = error
+
+ if data_in_payload and result:
+ payload["data"] = result
+
+ # Send webhook (fire and forget - don't block on completion)
+ await self.send_webhook(webhook_url, payload, custom_headers)
diff --git a/docs/examples/docker_webhook_example.py b/docs/examples/docker_webhook_example.py
new file mode 100644
index 00000000..8822e879
--- /dev/null
+++ b/docs/examples/docker_webhook_example.py
@@ -0,0 +1,461 @@
+"""
+Docker Webhook Example for Crawl4AI
+
+This example demonstrates how to use webhooks with the Crawl4AI job queue API.
+Instead of polling for results, webhooks notify your application when jobs complete.
+
+Supports both:
+- /crawl/job - Raw crawling with markdown extraction
+- /llm/job - LLM-powered content extraction
+
+Prerequisites:
+1. Crawl4AI Docker container running on localhost:11234
+2. Flask installed: pip install flask requests
+3. LLM API key configured in .llm.env (for LLM extraction examples)
+
+Usage:
+1. Run this script: python docker_webhook_example.py
+2. The webhook server will start on http://localhost:8080
+3. Jobs will be submitted and webhooks will be received automatically
+"""
+
+import requests
+import json
+import time
+from flask import Flask, request, jsonify
+from threading import Thread
+
+# Configuration
+CRAWL4AI_BASE_URL = "http://localhost:11234"
+WEBHOOK_BASE_URL = "http://localhost:8080" # Your webhook receiver URL
+
+# Initialize Flask app for webhook receiver
+app = Flask(__name__)
+
+# Store received webhook data for demonstration
+received_webhooks = []
+
+
+@app.route('/webhooks/crawl-complete', methods=['POST'])
+def handle_crawl_webhook():
+ """
+ Webhook handler that receives notifications when crawl jobs complete.
+
+ Payload structure:
+ {
+ "task_id": "crawl_abc123",
+ "task_type": "crawl",
+ "status": "completed" or "failed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com"],
+ "error": "error message" (only if failed),
+ "data": {...} (only if webhook_data_in_payload=True)
+ }
+ """
+ payload = request.json
+ print(f"\n{'='*60}")
+ print(f"π¬ Webhook received for task: {payload['task_id']}")
+ print(f" Status: {payload['status']}")
+ print(f" Timestamp: {payload['timestamp']}")
+ print(f" URLs: {payload['urls']}")
+
+ if payload['status'] == 'completed':
+ # If data is in payload, process it directly
+ if 'data' in payload:
+ print(f" β Data included in webhook")
+ data = payload['data']
+ # Process the crawl results here
+ for result in data.get('results', []):
+ print(f" - Crawled: {result.get('url')}")
+ print(f" - Markdown length: {len(result.get('markdown', ''))}")
+ else:
+ # Fetch results from API if not included
+ print(f" π₯ Fetching results from API...")
+ task_id = payload['task_id']
+ result_response = requests.get(f"{CRAWL4AI_BASE_URL}/crawl/job/{task_id}")
+ if result_response.ok:
+ data = result_response.json()
+ print(f" β Results fetched successfully")
+ # Process the crawl results here
+ for result in data['result'].get('results', []):
+ print(f" - Crawled: {result.get('url')}")
+ print(f" - Markdown length: {len(result.get('markdown', ''))}")
+
+ elif payload['status'] == 'failed':
+ print(f" β Job failed: {payload.get('error', 'Unknown error')}")
+
+ print(f"{'='*60}\n")
+
+ # Store webhook for demonstration
+ received_webhooks.append(payload)
+
+ # Return 200 OK to acknowledge receipt
+ return jsonify({"status": "received"}), 200
+
+
+@app.route('/webhooks/llm-complete', methods=['POST'])
+def handle_llm_webhook():
+ """
+ Webhook handler that receives notifications when LLM extraction jobs complete.
+
+ Payload structure:
+ {
+ "task_id": "llm_1698765432_12345",
+ "task_type": "llm_extraction",
+ "status": "completed" or "failed",
+ "timestamp": "2025-10-21T10:30:00.000000+00:00",
+ "urls": ["https://example.com/article"],
+ "error": "error message" (only if failed),
+ "data": {"extracted_content": {...}} (only if webhook_data_in_payload=True)
+ }
+ """
+ payload = request.json
+ print(f"\n{'='*60}")
+ print(f"π€ LLM Webhook received for task: {payload['task_id']}")
+ print(f" Task Type: {payload['task_type']}")
+ print(f" Status: {payload['status']}")
+ print(f" Timestamp: {payload['timestamp']}")
+ print(f" URL: {payload['urls'][0]}")
+
+ if payload['status'] == 'completed':
+ # If data is in payload, process it directly
+ if 'data' in payload:
+ print(f" β Data included in webhook")
+ data = payload['data']
+ # Webhook wraps extracted content in 'extracted_content' field
+ extracted = data.get('extracted_content', {})
+ print(f" - Extracted content:")
+ print(f" {json.dumps(extracted, indent=8)}")
+ else:
+ # Fetch results from API if not included
+ print(f" π₯ Fetching results from API...")
+ task_id = payload['task_id']
+ result_response = requests.get(f"{CRAWL4AI_BASE_URL}/llm/job/{task_id}")
+ if result_response.ok:
+ data = result_response.json()
+ print(f" β Results fetched successfully")
+ # API returns unwrapped content in 'result' field
+ extracted = data['result']
+ print(f" - Extracted content:")
+ print(f" {json.dumps(extracted, indent=8)}")
+
+ elif payload['status'] == 'failed':
+ print(f" β Job failed: {payload.get('error', 'Unknown error')}")
+
+ print(f"{'='*60}\n")
+
+ # Store webhook for demonstration
+ received_webhooks.append(payload)
+
+ # Return 200 OK to acknowledge receipt
+ return jsonify({"status": "received"}), 200
+
+
+def start_webhook_server():
+ """Start the Flask webhook server in a separate thread"""
+ app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)
+
+
+def submit_crawl_job_with_webhook(urls, webhook_url, include_data=False):
+ """
+ Submit a crawl job with webhook notification.
+
+ Args:
+ urls: List of URLs to crawl
+ webhook_url: URL to receive webhook notifications
+ include_data: Whether to include full results in webhook payload
+
+ Returns:
+ task_id: The job's task identifier
+ """
+ payload = {
+ "urls": urls,
+ "browser_config": {"headless": True},
+ "crawler_config": {"cache_mode": "bypass"},
+ "webhook_config": {
+ "webhook_url": webhook_url,
+ "webhook_data_in_payload": include_data,
+ # Optional: Add custom headers for authentication
+ # "webhook_headers": {
+ # "X-Webhook-Secret": "your-secret-token"
+ # }
+ }
+ }
+
+ print(f"\nπ Submitting crawl job...")
+ print(f" URLs: {urls}")
+ print(f" Webhook: {webhook_url}")
+ print(f" Include data: {include_data}")
+
+ response = requests.post(
+ f"{CRAWL4AI_BASE_URL}/crawl/job",
+ json=payload,
+ headers={"Content-Type": "application/json"}
+ )
+
+ if response.ok:
+ data = response.json()
+ task_id = data['task_id']
+ print(f" β Job submitted successfully")
+ print(f" Task ID: {task_id}")
+ return task_id
+ else:
+ print(f" β Failed to submit job: {response.text}")
+ return None
+
+
+def submit_llm_job_with_webhook(url, query, webhook_url, include_data=False, schema=None, provider=None):
+ """
+ Submit an LLM extraction job with webhook notification.
+
+ Args:
+ url: URL to extract content from
+ query: Instruction for the LLM (e.g., "Extract article title and author")
+ webhook_url: URL to receive webhook notifications
+ include_data: Whether to include full results in webhook payload
+ schema: Optional JSON schema for structured extraction
+ provider: Optional LLM provider (e.g., "openai/gpt-4o-mini")
+
+ Returns:
+ task_id: The job's task identifier
+ """
+ payload = {
+ "url": url,
+ "q": query,
+ "cache": False,
+ "webhook_config": {
+ "webhook_url": webhook_url,
+ "webhook_data_in_payload": include_data,
+ # Optional: Add custom headers for authentication
+ # "webhook_headers": {
+ # "X-Webhook-Secret": "your-secret-token"
+ # }
+ }
+ }
+
+ if schema:
+ payload["schema"] = schema
+
+ if provider:
+ payload["provider"] = provider
+
+ print(f"\nπ€ Submitting LLM extraction job...")
+ print(f" URL: {url}")
+ print(f" Query: {query}")
+ print(f" Webhook: {webhook_url}")
+ print(f" Include data: {include_data}")
+ if provider:
+ print(f" Provider: {provider}")
+
+ response = requests.post(
+ f"{CRAWL4AI_BASE_URL}/llm/job",
+ json=payload,
+ headers={"Content-Type": "application/json"}
+ )
+
+ if response.ok:
+ data = response.json()
+ task_id = data['task_id']
+ print(f" β Job submitted successfully")
+ print(f" Task ID: {task_id}")
+ return task_id
+ else:
+ print(f" β Failed to submit job: {response.text}")
+ return None
+
+
+def submit_job_without_webhook(urls):
+ """
+ Submit a job without webhook (traditional polling approach).
+
+ Args:
+ urls: List of URLs to crawl
+
+ Returns:
+ task_id: The job's task identifier
+ """
+ payload = {
+ "urls": urls,
+ "browser_config": {"headless": True},
+ "crawler_config": {"cache_mode": "bypass"}
+ }
+
+ print(f"\nπ Submitting crawl job (without webhook)...")
+ print(f" URLs: {urls}")
+
+ response = requests.post(
+ f"{CRAWL4AI_BASE_URL}/crawl/job",
+ json=payload
+ )
+
+ if response.ok:
+ data = response.json()
+ task_id = data['task_id']
+ print(f" β Job submitted successfully")
+ print(f" Task ID: {task_id}")
+ return task_id
+ else:
+ print(f" β Failed to submit job: {response.text}")
+ return None
+
+
+def poll_job_status(task_id, timeout=60):
+ """
+ Poll for job status (used when webhook is not configured).
+
+ Args:
+ task_id: The job's task identifier
+ timeout: Maximum time to wait in seconds
+ """
+ print(f"\nβ³ Polling for job status...")
+ start_time = time.time()
+
+ while time.time() - start_time < timeout:
+ response = requests.get(f"{CRAWL4AI_BASE_URL}/crawl/job/{task_id}")
+
+ if response.ok:
+ data = response.json()
+ status = data.get('status', 'unknown')
+
+ if status == 'completed':
+ print(f" β Job completed!")
+ return data
+ elif status == 'failed':
+ print(f" β Job failed: {data.get('error', 'Unknown error')}")
+ return data
+ else:
+ print(f" β³ Status: {status}, waiting...")
+ time.sleep(2)
+ else:
+ print(f" β Failed to get status: {response.text}")
+ return None
+
+ print(f" β° Timeout reached")
+ return None
+
+
+def main():
+ """Run the webhook demonstration"""
+
+ # Check if Crawl4AI is running
+ try:
+ health = requests.get(f"{CRAWL4AI_BASE_URL}/health", timeout=5)
+ print(f"β Crawl4AI is running: {health.json()}")
+ except:
+ print(f"β Cannot connect to Crawl4AI at {CRAWL4AI_BASE_URL}")
+ print(" Please make sure Docker container is running:")
+ print(" docker run -d -p 11234:11234 --name crawl4ai unclecode/crawl4ai:latest")
+ return
+
+ # Start webhook server in background thread
+ print(f"\nπ Starting webhook server at {WEBHOOK_BASE_URL}...")
+ webhook_thread = Thread(target=start_webhook_server, daemon=True)
+ webhook_thread.start()
+ time.sleep(2) # Give server time to start
+
+ # Example 1: Job with webhook (notification only, fetch data separately)
+ print(f"\n{'='*60}")
+ print("Example 1: Webhook Notification Only")
+ print(f"{'='*60}")
+ task_id_1 = submit_crawl_job_with_webhook(
+ urls=["https://example.com"],
+ webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/crawl-complete",
+ include_data=False
+ )
+
+ # Example 2: Job with webhook (data included in payload)
+ time.sleep(5) # Wait a bit between requests
+ print(f"\n{'='*60}")
+ print("Example 2: Webhook with Full Data")
+ print(f"{'='*60}")
+ task_id_2 = submit_crawl_job_with_webhook(
+ urls=["https://www.python.org"],
+ webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/crawl-complete",
+ include_data=True
+ )
+
+ # Example 3: LLM extraction with webhook (notification only)
+ time.sleep(5) # Wait a bit between requests
+ print(f"\n{'='*60}")
+ print("Example 3: LLM Extraction with Webhook (Notification Only)")
+ print(f"{'='*60}")
+ task_id_3 = submit_llm_job_with_webhook(
+ url="https://www.example.com",
+ query="Extract the main heading and description from this page.",
+ webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete",
+ include_data=False,
+ provider="openai/gpt-4o-mini"
+ )
+
+ # Example 4: LLM extraction with webhook (data included + schema)
+ time.sleep(5) # Wait a bit between requests
+ print(f"\n{'='*60}")
+ print("Example 4: LLM Extraction with Schema and Full Data")
+ print(f"{'='*60}")
+
+ # Define a schema for structured extraction
+ schema = json.dumps({
+ "type": "object",
+ "properties": {
+ "title": {"type": "string", "description": "Page title"},
+ "description": {"type": "string", "description": "Page description"}
+ },
+ "required": ["title"]
+ })
+
+ task_id_4 = submit_llm_job_with_webhook(
+ url="https://www.python.org",
+ query="Extract the title and description of this website",
+ webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete",
+ include_data=True,
+ schema=schema,
+ provider="openai/gpt-4o-mini"
+ )
+
+ # Example 5: Traditional polling (no webhook)
+ time.sleep(5) # Wait a bit between requests
+ print(f"\n{'='*60}")
+ print("Example 5: Traditional Polling (No Webhook)")
+ print(f"{'='*60}")
+ task_id_5 = submit_job_without_webhook(
+ urls=["https://github.com"]
+ )
+ if task_id_5:
+ result = poll_job_status(task_id_5)
+ if result and result.get('status') == 'completed':
+ print(f" β Results retrieved via polling")
+
+ # Wait for webhooks to arrive
+ print(f"\nβ³ Waiting for webhooks to be received...")
+ time.sleep(30) # Give jobs time to complete and webhooks to arrive (longer for LLM)
+
+ # Summary
+ print(f"\n{'='*60}")
+ print("Summary")
+ print(f"{'='*60}")
+ print(f"Total webhooks received: {len(received_webhooks)}")
+
+ crawl_webhooks = [w for w in received_webhooks if w['task_type'] == 'crawl']
+ llm_webhooks = [w for w in received_webhooks if w['task_type'] == 'llm_extraction']
+
+ print(f"\nπ Breakdown:")
+ print(f" - Crawl webhooks: {len(crawl_webhooks)}")
+ print(f" - LLM extraction webhooks: {len(llm_webhooks)}")
+
+ print(f"\nπ Details:")
+ for i, webhook in enumerate(received_webhooks, 1):
+ task_type = webhook['task_type']
+ icon = "π·οΈ" if task_type == "crawl" else "π€"
+ print(f"{i}. {icon} Task {webhook['task_id']}: {webhook['status']} ({task_type})")
+
+ print(f"\nβ Demo completed!")
+ print(f"\nπ‘ Pro tips:")
+ print(f" - In production, your webhook URL should be publicly accessible")
+ print(f" (e.g., https://myapp.com/webhooks) or use ngrok for testing")
+ print(f" - Both /crawl/job and /llm/job support the same webhook configuration")
+ print(f" - Use webhook_data_in_payload=true to get results directly in the webhook")
+ print(f" - LLM jobs may take longer, adjust timeouts accordingly")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/test_llm_webhook_feature.py b/test_llm_webhook_feature.py
new file mode 100644
index 00000000..98133e82
--- /dev/null
+++ b/test_llm_webhook_feature.py
@@ -0,0 +1,401 @@
+#!/usr/bin/env python3
+"""
+Test script to validate webhook implementation for /llm/job endpoint.
+
+This tests that the /llm/job endpoint now supports webhooks
+following the same pattern as /crawl/job.
+"""
+
+import sys
+import os
+
+# Add deploy/docker to path
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'deploy', 'docker'))
+
+def test_llm_job_payload_model():
+ """Test that LlmJobPayload includes webhook_config field"""
+ print("=" * 60)
+ print("TEST 1: LlmJobPayload Model")
+ print("=" * 60)
+
+ try:
+ from job import LlmJobPayload
+ from schemas import WebhookConfig
+ from pydantic import ValidationError
+
+ # Test with webhook_config
+ payload_dict = {
+ "url": "https://example.com",
+ "q": "Extract main content",
+ "schema": None,
+ "cache": False,
+ "provider": None,
+ "webhook_config": {
+ "webhook_url": "https://myapp.com/webhook",
+ "webhook_data_in_payload": True,
+ "webhook_headers": {"X-Secret": "token"}
+ }
+ }
+
+ payload = LlmJobPayload(**payload_dict)
+
+ print(f"β LlmJobPayload accepts webhook_config")
+ print(f" - URL: {payload.url}")
+ print(f" - Query: {payload.q}")
+ print(f" - Webhook URL: {payload.webhook_config.webhook_url}")
+ print(f" - Data in payload: {payload.webhook_config.webhook_data_in_payload}")
+
+ # Test without webhook_config (should be optional)
+ minimal_payload = {
+ "url": "https://example.com",
+ "q": "Extract content"
+ }
+
+ payload2 = LlmJobPayload(**minimal_payload)
+ assert payload2.webhook_config is None, "webhook_config should be optional"
+ print(f"β LlmJobPayload works without webhook_config (optional)")
+
+ return True
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_handle_llm_request_signature():
+ """Test that handle_llm_request accepts webhook_config parameter"""
+ print("\n" + "=" * 60)
+ print("TEST 2: handle_llm_request Function Signature")
+ print("=" * 60)
+
+ try:
+ from api import handle_llm_request
+ import inspect
+
+ sig = inspect.signature(handle_llm_request)
+ params = list(sig.parameters.keys())
+
+ print(f"Function parameters: {params}")
+
+ if 'webhook_config' in params:
+ print(f"β handle_llm_request has webhook_config parameter")
+
+ # Check that it's optional with default None
+ webhook_param = sig.parameters['webhook_config']
+ if webhook_param.default is None or webhook_param.default == inspect.Parameter.empty:
+ print(f"β webhook_config is optional (default: {webhook_param.default})")
+ else:
+ print(f"β οΈ webhook_config default is: {webhook_param.default}")
+
+ return True
+ else:
+ print(f"β handle_llm_request missing webhook_config parameter")
+ return False
+
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_process_llm_extraction_signature():
+ """Test that process_llm_extraction accepts webhook_config parameter"""
+ print("\n" + "=" * 60)
+ print("TEST 3: process_llm_extraction Function Signature")
+ print("=" * 60)
+
+ try:
+ from api import process_llm_extraction
+ import inspect
+
+ sig = inspect.signature(process_llm_extraction)
+ params = list(sig.parameters.keys())
+
+ print(f"Function parameters: {params}")
+
+ if 'webhook_config' in params:
+ print(f"β process_llm_extraction has webhook_config parameter")
+
+ webhook_param = sig.parameters['webhook_config']
+ if webhook_param.default is None or webhook_param.default == inspect.Parameter.empty:
+ print(f"β webhook_config is optional (default: {webhook_param.default})")
+ else:
+ print(f"β οΈ webhook_config default is: {webhook_param.default}")
+
+ return True
+ else:
+ print(f"β process_llm_extraction missing webhook_config parameter")
+ return False
+
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_webhook_integration_in_api():
+ """Test that api.py properly integrates webhook notifications"""
+ print("\n" + "=" * 60)
+ print("TEST 4: Webhook Integration in process_llm_extraction")
+ print("=" * 60)
+
+ try:
+ api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py')
+
+ with open(api_file, 'r') as f:
+ api_content = f.read()
+
+ # Check for WebhookDeliveryService initialization
+ if 'webhook_service = WebhookDeliveryService(config)' in api_content:
+ print("β process_llm_extraction initializes WebhookDeliveryService")
+ else:
+ print("β Missing WebhookDeliveryService initialization in process_llm_extraction")
+ return False
+
+ # Check for notify_job_completion calls with llm_extraction
+ if 'task_type="llm_extraction"' in api_content:
+ print("β Uses correct task_type='llm_extraction' for notifications")
+ else:
+ print("β Missing task_type='llm_extraction' in webhook notifications")
+ return False
+
+ # Count webhook notification calls (should have at least 3: success + 2 failure paths)
+ notification_count = api_content.count('await webhook_service.notify_job_completion')
+ # Find only in process_llm_extraction function
+ llm_func_start = api_content.find('async def process_llm_extraction')
+ llm_func_end = api_content.find('\nasync def ', llm_func_start + 1)
+ if llm_func_end == -1:
+ llm_func_end = len(api_content)
+
+ llm_func_content = api_content[llm_func_start:llm_func_end]
+ llm_notification_count = llm_func_content.count('await webhook_service.notify_job_completion')
+
+ print(f"β Found {llm_notification_count} webhook notification calls in process_llm_extraction")
+
+ if llm_notification_count >= 3:
+ print(f"β Sufficient notification points (success + failure paths)")
+ else:
+ print(f"β οΈ Expected at least 3 notification calls, found {llm_notification_count}")
+
+ return True
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_job_endpoint_integration():
+ """Test that /llm/job endpoint extracts and passes webhook_config"""
+ print("\n" + "=" * 60)
+ print("TEST 5: /llm/job Endpoint Integration")
+ print("=" * 60)
+
+ try:
+ job_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'job.py')
+
+ with open(job_file, 'r') as f:
+ job_content = f.read()
+
+ # Find the llm_job_enqueue function
+ llm_job_start = job_content.find('async def llm_job_enqueue')
+ llm_job_end = job_content.find('\n\n@router', llm_job_start + 1)
+ if llm_job_end == -1:
+ llm_job_end = job_content.find('\n\nasync def', llm_job_start + 1)
+
+ llm_job_func = job_content[llm_job_start:llm_job_end]
+
+ # Check for webhook_config extraction
+ if 'webhook_config = None' in llm_job_func:
+ print("β llm_job_enqueue initializes webhook_config variable")
+ else:
+ print("β Missing webhook_config initialization")
+ return False
+
+ if 'if payload.webhook_config:' in llm_job_func:
+ print("β llm_job_enqueue checks for payload.webhook_config")
+ else:
+ print("β Missing webhook_config check")
+ return False
+
+ if 'webhook_config = payload.webhook_config.model_dump(mode=\'json\')' in llm_job_func:
+ print("β llm_job_enqueue converts webhook_config to dict")
+ else:
+ print("β Missing webhook_config.model_dump conversion")
+ return False
+
+ if 'webhook_config=webhook_config' in llm_job_func:
+ print("β llm_job_enqueue passes webhook_config to handle_llm_request")
+ else:
+ print("β Missing webhook_config parameter in handle_llm_request call")
+ return False
+
+ return True
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_create_new_task_integration():
+ """Test that create_new_task stores webhook_config in Redis"""
+ print("\n" + "=" * 60)
+ print("TEST 6: create_new_task Webhook Storage")
+ print("=" * 60)
+
+ try:
+ api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py')
+
+ with open(api_file, 'r') as f:
+ api_content = f.read()
+
+ # Find create_new_task function
+ create_task_start = api_content.find('async def create_new_task')
+ create_task_end = api_content.find('\nasync def ', create_task_start + 1)
+ if create_task_end == -1:
+ create_task_end = len(api_content)
+
+ create_task_func = api_content[create_task_start:create_task_end]
+
+ # Check for webhook_config storage
+ if 'if webhook_config:' in create_task_func:
+ print("β create_new_task checks for webhook_config")
+ else:
+ print("β Missing webhook_config check in create_new_task")
+ return False
+
+ if 'task_data["webhook_config"] = json.dumps(webhook_config)' in create_task_func:
+ print("β create_new_task stores webhook_config in Redis task data")
+ else:
+ print("β Missing webhook_config storage in task_data")
+ return False
+
+ # Check that webhook_config is passed to process_llm_extraction
+ if 'webhook_config' in create_task_func and 'background_tasks.add_task' in create_task_func:
+ print("β create_new_task passes webhook_config to background task")
+ else:
+ print("β οΈ Could not verify webhook_config passed to background task")
+
+ return True
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_pattern_consistency():
+ """Test that /llm/job follows the same pattern as /crawl/job"""
+ print("\n" + "=" * 60)
+ print("TEST 7: Pattern Consistency with /crawl/job")
+ print("=" * 60)
+
+ try:
+ api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py')
+
+ with open(api_file, 'r') as f:
+ api_content = f.read()
+
+ # Find handle_crawl_job to compare pattern
+ crawl_job_start = api_content.find('async def handle_crawl_job')
+ crawl_job_end = api_content.find('\nasync def ', crawl_job_start + 1)
+ if crawl_job_end == -1:
+ crawl_job_end = len(api_content)
+ crawl_job_func = api_content[crawl_job_start:crawl_job_end]
+
+ # Find process_llm_extraction
+ llm_extract_start = api_content.find('async def process_llm_extraction')
+ llm_extract_end = api_content.find('\nasync def ', llm_extract_start + 1)
+ if llm_extract_end == -1:
+ llm_extract_end = len(api_content)
+ llm_extract_func = api_content[llm_extract_start:llm_extract_end]
+
+ print("Checking pattern consistency...")
+
+ # Both should initialize WebhookDeliveryService
+ crawl_has_service = 'webhook_service = WebhookDeliveryService(config)' in crawl_job_func
+ llm_has_service = 'webhook_service = WebhookDeliveryService(config)' in llm_extract_func
+
+ if crawl_has_service and llm_has_service:
+ print("β Both initialize WebhookDeliveryService")
+ else:
+ print(f"β Service initialization mismatch (crawl: {crawl_has_service}, llm: {llm_has_service})")
+ return False
+
+ # Both should call notify_job_completion on success
+ crawl_notifies_success = 'status="completed"' in crawl_job_func and 'notify_job_completion' in crawl_job_func
+ llm_notifies_success = 'status="completed"' in llm_extract_func and 'notify_job_completion' in llm_extract_func
+
+ if crawl_notifies_success and llm_notifies_success:
+ print("β Both notify on success")
+ else:
+ print(f"β Success notification mismatch (crawl: {crawl_notifies_success}, llm: {llm_notifies_success})")
+ return False
+
+ # Both should call notify_job_completion on failure
+ crawl_notifies_failure = 'status="failed"' in crawl_job_func and 'error=' in crawl_job_func
+ llm_notifies_failure = 'status="failed"' in llm_extract_func and 'error=' in llm_extract_func
+
+ if crawl_notifies_failure and llm_notifies_failure:
+ print("β Both notify on failure")
+ else:
+ print(f"β Failure notification mismatch (crawl: {crawl_notifies_failure}, llm: {llm_notifies_failure})")
+ return False
+
+ print("β /llm/job follows the same pattern as /crawl/job")
+ return True
+
+ except Exception as e:
+ print(f"β Failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def main():
+ """Run all tests"""
+ print("\nπ§ͺ LLM Job Webhook Feature Validation")
+ print("=" * 60)
+ print("Testing that /llm/job now supports webhooks like /crawl/job")
+ print("=" * 60 + "\n")
+
+ results = []
+
+ # Run all tests
+ results.append(("LlmJobPayload Model", test_llm_job_payload_model()))
+ results.append(("handle_llm_request Signature", test_handle_llm_request_signature()))
+ results.append(("process_llm_extraction Signature", test_process_llm_extraction_signature()))
+ results.append(("Webhook Integration", test_webhook_integration_in_api()))
+ results.append(("/llm/job Endpoint", test_job_endpoint_integration()))
+ results.append(("create_new_task Storage", test_create_new_task_integration()))
+ results.append(("Pattern Consistency", test_pattern_consistency()))
+
+ # Print summary
+ print("\n" + "=" * 60)
+ print("TEST SUMMARY")
+ print("=" * 60)
+
+ passed = sum(1 for _, result in results if result)
+ total = len(results)
+
+ for test_name, result in results:
+ status = "β PASS" if result else "β FAIL"
+ print(f"{status} - {test_name}")
+
+ print(f"\n{'=' * 60}")
+ print(f"Results: {passed}/{total} tests passed")
+ print(f"{'=' * 60}")
+
+ if passed == total:
+ print("\nπ All tests passed! /llm/job webhook feature is correctly implemented.")
+ print("\nπ Summary of changes:")
+ print(" 1. LlmJobPayload model includes webhook_config field")
+ print(" 2. /llm/job endpoint extracts and passes webhook_config")
+ print(" 3. handle_llm_request accepts webhook_config parameter")
+ print(" 4. create_new_task stores webhook_config in Redis")
+ print(" 5. process_llm_extraction sends webhook notifications")
+ print(" 6. Follows the same pattern as /crawl/job")
+ return 0
+ else:
+ print(f"\nβ οΈ {total - passed} test(s) failed. Please review the output above.")
+ return 1
+
+if __name__ == "__main__":
+ exit(main())
diff --git a/test_webhook_implementation.py b/test_webhook_implementation.py
new file mode 100644
index 00000000..072db8b3
--- /dev/null
+++ b/test_webhook_implementation.py
@@ -0,0 +1,307 @@
+"""
+Simple test script to validate webhook implementation without running full server.
+
+This script tests:
+1. Webhook module imports and syntax
+2. WebhookDeliveryService initialization
+3. Payload construction logic
+4. Configuration parsing
+"""
+
+import sys
+import os
+import json
+from datetime import datetime, timezone
+
+# Add deploy/docker to path to import modules
+# sys.path.insert(0, '/home/user/crawl4ai/deploy/docker')
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'deploy', 'docker'))
+
+def test_imports():
+ """Test that all webhook-related modules can be imported"""
+ print("=" * 60)
+ print("TEST 1: Module Imports")
+ print("=" * 60)
+
+ try:
+ from webhook import WebhookDeliveryService
+ print("β webhook.WebhookDeliveryService imported successfully")
+ except Exception as e:
+ print(f"β Failed to import webhook module: {e}")
+ return False
+
+ try:
+ from schemas import WebhookConfig, WebhookPayload
+ print("β schemas.WebhookConfig imported successfully")
+ print("β schemas.WebhookPayload imported successfully")
+ except Exception as e:
+ print(f"β Failed to import schemas: {e}")
+ return False
+
+ return True
+
+def test_webhook_service_init():
+ """Test WebhookDeliveryService initialization"""
+ print("\n" + "=" * 60)
+ print("TEST 2: WebhookDeliveryService Initialization")
+ print("=" * 60)
+
+ try:
+ from webhook import WebhookDeliveryService
+
+ # Test with default config
+ config = {
+ "webhooks": {
+ "enabled": True,
+ "default_url": None,
+ "data_in_payload": False,
+ "retry": {
+ "max_attempts": 5,
+ "initial_delay_ms": 1000,
+ "max_delay_ms": 32000,
+ "timeout_ms": 30000
+ },
+ "headers": {
+ "User-Agent": "Crawl4AI-Webhook/1.0"
+ }
+ }
+ }
+
+ service = WebhookDeliveryService(config)
+
+ print(f"β Service initialized successfully")
+ print(f" - Max attempts: {service.max_attempts}")
+ print(f" - Initial delay: {service.initial_delay}s")
+ print(f" - Max delay: {service.max_delay}s")
+ print(f" - Timeout: {service.timeout}s")
+
+ # Verify calculations
+ assert service.max_attempts == 5, "Max attempts should be 5"
+ assert service.initial_delay == 1.0, "Initial delay should be 1.0s"
+ assert service.max_delay == 32.0, "Max delay should be 32.0s"
+ assert service.timeout == 30.0, "Timeout should be 30.0s"
+
+ print("β All configuration values correct")
+
+ return True
+ except Exception as e:
+ print(f"β Service initialization failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_webhook_config_model():
+ """Test WebhookConfig Pydantic model"""
+ print("\n" + "=" * 60)
+ print("TEST 3: WebhookConfig Model Validation")
+ print("=" * 60)
+
+ try:
+ from schemas import WebhookConfig
+ from pydantic import ValidationError
+
+ # Test valid config
+ valid_config = {
+ "webhook_url": "https://example.com/webhook",
+ "webhook_data_in_payload": True,
+ "webhook_headers": {"X-Secret": "token123"}
+ }
+
+ config = WebhookConfig(**valid_config)
+ print(f"β Valid config accepted:")
+ print(f" - URL: {config.webhook_url}")
+ print(f" - Data in payload: {config.webhook_data_in_payload}")
+ print(f" - Headers: {config.webhook_headers}")
+
+ # Test minimal config
+ minimal_config = {
+ "webhook_url": "https://example.com/webhook"
+ }
+
+ config2 = WebhookConfig(**minimal_config)
+ print(f"β Minimal config accepted (defaults applied):")
+ print(f" - URL: {config2.webhook_url}")
+ print(f" - Data in payload: {config2.webhook_data_in_payload}")
+ print(f" - Headers: {config2.webhook_headers}")
+
+ # Test invalid URL
+ try:
+ invalid_config = {
+ "webhook_url": "not-a-url"
+ }
+ config3 = WebhookConfig(**invalid_config)
+ print(f"β Invalid URL should have been rejected")
+ return False
+ except ValidationError as e:
+ print(f"β Invalid URL correctly rejected")
+
+ return True
+ except Exception as e:
+ print(f"β Model validation test failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_payload_construction():
+ """Test webhook payload construction logic"""
+ print("\n" + "=" * 60)
+ print("TEST 4: Payload Construction")
+ print("=" * 60)
+
+ try:
+ # Simulate payload construction from notify_job_completion
+ task_id = "crawl_abc123"
+ task_type = "crawl"
+ status = "completed"
+ urls = ["https://example.com"]
+
+ payload = {
+ "task_id": task_id,
+ "task_type": task_type,
+ "status": status,
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "urls": urls
+ }
+
+ print(f"β Basic payload constructed:")
+ print(json.dumps(payload, indent=2))
+
+ # Test with error
+ error_payload = {
+ "task_id": "crawl_xyz789",
+ "task_type": "crawl",
+ "status": "failed",
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "urls": ["https://example.com"],
+ "error": "Connection timeout"
+ }
+
+ print(f"\nβ Error payload constructed:")
+ print(json.dumps(error_payload, indent=2))
+
+ # Test with data
+ data_payload = {
+ "task_id": "crawl_def456",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "urls": ["https://example.com"],
+ "data": {
+ "results": [
+ {"url": "https://example.com", "markdown": "# Example"}
+ ]
+ }
+ }
+
+ print(f"\nβ Data payload constructed:")
+ print(json.dumps(data_payload, indent=2))
+
+ return True
+ except Exception as e:
+ print(f"β Payload construction failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_exponential_backoff():
+ """Test exponential backoff calculation"""
+ print("\n" + "=" * 60)
+ print("TEST 5: Exponential Backoff Calculation")
+ print("=" * 60)
+
+ try:
+ initial_delay = 1.0 # 1 second
+ max_delay = 32.0 # 32 seconds
+
+ print("Backoff delays for 5 attempts:")
+ for attempt in range(5):
+ delay = min(initial_delay * (2 ** attempt), max_delay)
+ print(f" Attempt {attempt + 1}: {delay}s")
+
+ # Verify the sequence: 1s, 2s, 4s, 8s, 16s
+ expected = [1.0, 2.0, 4.0, 8.0, 16.0]
+ actual = [min(initial_delay * (2 ** i), max_delay) for i in range(5)]
+
+ assert actual == expected, f"Expected {expected}, got {actual}"
+ print("β Exponential backoff sequence correct")
+
+ return True
+ except Exception as e:
+ print(f"β Backoff calculation failed: {e}")
+ return False
+
+def test_api_integration():
+ """Test that api.py imports webhook module correctly"""
+ print("\n" + "=" * 60)
+ print("TEST 6: API Integration")
+ print("=" * 60)
+
+ try:
+ # Check if api.py can import webhook module
+ api_path = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py')
+ with open(api_path, 'r') as f:
+ api_content = f.read()
+
+ if 'from webhook import WebhookDeliveryService' in api_content:
+ print("β api.py imports WebhookDeliveryService")
+ else:
+ print("β api.py missing webhook import")
+ return False
+
+ if 'WebhookDeliveryService(config)' in api_content:
+ print("β api.py initializes WebhookDeliveryService")
+ else:
+ print("β api.py doesn't initialize WebhookDeliveryService")
+ return False
+
+ if 'notify_job_completion' in api_content:
+ print("β api.py calls notify_job_completion")
+ else:
+ print("β api.py doesn't call notify_job_completion")
+ return False
+
+ return True
+ except Exception as e:
+ print(f"β API integration check failed: {e}")
+ return False
+
+def main():
+ """Run all tests"""
+ print("\nπ§ͺ Webhook Implementation Validation Tests")
+ print("=" * 60)
+
+ results = []
+
+ # Run tests
+ results.append(("Module Imports", test_imports()))
+ results.append(("Service Initialization", test_webhook_service_init()))
+ results.append(("Config Model", test_webhook_config_model()))
+ results.append(("Payload Construction", test_payload_construction()))
+ results.append(("Exponential Backoff", test_exponential_backoff()))
+ results.append(("API Integration", test_api_integration()))
+
+ # Print summary
+ print("\n" + "=" * 60)
+ print("TEST SUMMARY")
+ print("=" * 60)
+
+ passed = sum(1 for _, result in results if result)
+ total = len(results)
+
+ for test_name, result in results:
+ status = "β PASS" if result else "β FAIL"
+ print(f"{status} - {test_name}")
+
+ print(f"\n{'=' * 60}")
+ print(f"Results: {passed}/{total} tests passed")
+ print(f"{'=' * 60}")
+
+ if passed == total:
+ print("\nπ All tests passed! Webhook implementation is valid.")
+ return 0
+ else:
+ print(f"\nβ οΈ {total - passed} test(s) failed. Please review the output above.")
+ return 1
+
+if __name__ == "__main__":
+ exit(main())
diff --git a/tests/WEBHOOK_TEST_README.md b/tests/WEBHOOK_TEST_README.md
new file mode 100644
index 00000000..4f3c68a0
--- /dev/null
+++ b/tests/WEBHOOK_TEST_README.md
@@ -0,0 +1,251 @@
+# Webhook Feature Test Script
+
+This directory contains a comprehensive test script for the webhook feature implementation.
+
+## Overview
+
+The `test_webhook_feature.sh` script automates the entire process of testing the webhook feature:
+
+1. β Fetches and switches to the webhook feature branch
+2. β Activates the virtual environment
+3. β Installs all required dependencies
+4. β Starts Redis server in background
+5. β Starts Crawl4AI server in background
+6. β Runs webhook integration test
+7. β Verifies job completion via webhook
+8. β Cleans up and returns to original branch
+
+## Prerequisites
+
+- Python 3.10+
+- Virtual environment already created (`venv/` in project root)
+- Git repository with the webhook feature branch
+- `redis-server` (script will attempt to install if missing)
+- `curl` and `lsof` commands available
+
+## Usage
+
+### Quick Start
+
+From the project root:
+
+```bash
+./tests/test_webhook_feature.sh
+```
+
+Or from the tests directory:
+
+```bash
+cd tests
+./test_webhook_feature.sh
+```
+
+### What the Script Does
+
+#### Step 1: Branch Management
+- Saves your current branch
+- Fetches the webhook feature branch from remote
+- Switches to the webhook feature branch
+
+#### Step 2: Environment Setup
+- Activates your existing virtual environment
+- Installs dependencies from `deploy/docker/requirements.txt`
+- Installs Flask for the webhook receiver
+
+#### Step 3: Service Startup
+- Starts Redis server on port 6379
+- Starts Crawl4AI server on port 11235
+- Waits for server health check to pass
+
+#### Step 4: Webhook Test
+- Creates a webhook receiver on port 8080
+- Submits a crawl job for `https://example.com` with webhook config
+- Waits for webhook notification (60s timeout)
+- Verifies webhook payload contains expected data
+
+#### Step 5: Cleanup
+- Stops webhook receiver
+- Stops Crawl4AI server
+- Stops Redis server
+- Returns to your original branch
+
+## Expected Output
+
+```
+[INFO] Starting webhook feature test script
+[INFO] Project root: /path/to/crawl4ai
+[INFO] Step 1: Fetching PR branch...
+[INFO] Current branch: develop
+[SUCCESS] Branch fetched
+[INFO] Step 2: Switching to branch: claude/implement-webhook-crawl-feature-011CULZY1Jy8N5MUkZqXkRVp
+[SUCCESS] Switched to webhook feature branch
+[INFO] Step 3: Activating virtual environment...
+[SUCCESS] Virtual environment activated
+[INFO] Step 4: Installing server dependencies...
+[SUCCESS] Dependencies installed
+[INFO] Step 5a: Starting Redis...
+[SUCCESS] Redis started (PID: 12345)
+[INFO] Step 5b: Starting server on port 11235...
+[INFO] Server started (PID: 12346)
+[INFO] Waiting for server to be ready...
+[SUCCESS] Server is ready!
+[INFO] Step 6: Creating webhook test script...
+[INFO] Running webhook test...
+
+π Submitting crawl job with webhook...
+β Job submitted successfully, task_id: crawl_abc123
+β³ Waiting for webhook notification...
+
+β Webhook received: {
+ "task_id": "crawl_abc123",
+ "task_type": "crawl",
+ "status": "completed",
+ "timestamp": "2025-10-22T00:00:00.000000+00:00",
+ "urls": ["https://example.com"],
+ "data": { ... }
+}
+
+β Webhook received!
+ Task ID: crawl_abc123
+ Status: completed
+ URLs: ['https://example.com']
+ β Data included in webhook payload
+ π Crawled 1 URL(s)
+ - https://example.com: 1234 chars
+
+π Webhook test PASSED!
+
+[INFO] Step 7: Verifying test results...
+[SUCCESS] β Webhook test PASSED!
+[SUCCESS] All tests completed successfully! π
+[INFO] Cleanup will happen automatically...
+[INFO] Starting cleanup...
+[INFO] Stopping webhook receiver...
+[INFO] Stopping server...
+[INFO] Stopping Redis...
+[INFO] Switching back to branch: develop
+[SUCCESS] Cleanup complete
+```
+
+## Troubleshooting
+
+### Server Failed to Start
+
+If the server fails to start, check the logs:
+
+```bash
+tail -100 /tmp/crawl4ai_server.log
+```
+
+Common issues:
+- Port 11235 already in use: `lsof -ti:11235 | xargs kill -9`
+- Missing dependencies: Check that all packages are installed
+
+### Redis Connection Failed
+
+Check if Redis is running:
+
+```bash
+redis-cli ping
+# Should return: PONG
+```
+
+If not running:
+
+```bash
+redis-server --port 6379 --daemonize yes
+```
+
+### Webhook Not Received
+
+The script has a 60-second timeout for webhook delivery. If the webhook isn't received:
+
+1. Check server logs: `/tmp/crawl4ai_server.log`
+2. Verify webhook receiver is running on port 8080
+3. Check network connectivity between components
+
+### Script Interruption
+
+If the script is interrupted (Ctrl+C), cleanup happens automatically via trap. The script will:
+- Kill all background processes
+- Stop Redis
+- Return to your original branch
+
+To manually cleanup if needed:
+
+```bash
+# Kill processes by port
+lsof -ti:11235 | xargs kill -9 # Server
+lsof -ti:8080 | xargs kill -9 # Webhook receiver
+lsof -ti:6379 | xargs kill -9 # Redis
+
+# Return to your branch
+git checkout develop # or your branch name
+```
+
+## Testing Different URLs
+
+To test with a different URL, modify the script or create a custom test:
+
+```python
+payload = {
+ "urls": ["https://your-url-here.com"],
+ "browser_config": {"headless": True},
+ "crawler_config": {"cache_mode": "bypass"},
+ "webhook_config": {
+ "webhook_url": "http://localhost:8080/webhook",
+ "webhook_data_in_payload": True
+ }
+}
+```
+
+## Files Generated
+
+The script creates temporary files:
+
+- `/tmp/crawl4ai_server.log` - Server output logs
+- `/tmp/test_webhook.py` - Webhook test Python script
+
+These are not cleaned up automatically so you can review them after the test.
+
+## Exit Codes
+
+- `0` - All tests passed successfully
+- `1` - Test failed (check output for details)
+
+## Safety Features
+
+- β Automatic cleanup on exit, interrupt, or error
+- β Returns to original branch on completion
+- β Kills all background processes
+- β Comprehensive error handling
+- β Colored output for easy reading
+- β Detailed logging at each step
+
+## Notes
+
+- The script uses `set -e` to exit on any command failure
+- All background processes are tracked and cleaned up
+- The virtual environment must exist before running
+- Redis must be available (installed or installable via apt-get/brew)
+
+## Integration with CI/CD
+
+This script can be integrated into CI/CD pipelines:
+
+```yaml
+# Example GitHub Actions
+- name: Test Webhook Feature
+ run: |
+ chmod +x tests/test_webhook_feature.sh
+ ./tests/test_webhook_feature.sh
+```
+
+## Support
+
+If you encounter issues:
+
+1. Check the troubleshooting section above
+2. Review server logs at `/tmp/crawl4ai_server.log`
+3. Ensure all prerequisites are met
+4. Open an issue with the full output of the script
diff --git a/tests/test_webhook_feature.sh b/tests/test_webhook_feature.sh
new file mode 100755
index 00000000..20eab2a1
--- /dev/null
+++ b/tests/test_webhook_feature.sh
@@ -0,0 +1,305 @@
+#!/bin/bash
+
+#############################################################################
+# Webhook Feature Test Script
+#
+# This script tests the webhook feature implementation by:
+# 1. Switching to the webhook feature branch
+# 2. Installing dependencies
+# 3. Starting the server
+# 4. Running webhook tests
+# 5. Cleaning up and returning to original branch
+#
+# Usage: ./test_webhook_feature.sh
+#############################################################################
+
+set -e # Exit on error
+
+# Colors for output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+BLUE='\033[0;34m'
+NC='\033[0m' # No Color
+
+# Configuration
+BRANCH_NAME="claude/implement-webhook-crawl-feature-011CULZY1Jy8N5MUkZqXkRVp"
+VENV_PATH="venv"
+SERVER_PORT=11235
+WEBHOOK_PORT=8080
+PROJECT_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
+
+# PID files for cleanup
+REDIS_PID=""
+SERVER_PID=""
+WEBHOOK_PID=""
+
+#############################################################################
+# Utility Functions
+#############################################################################
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+cleanup() {
+ log_info "Starting cleanup..."
+
+ # Kill webhook receiver if running
+ if [ ! -z "$WEBHOOK_PID" ] && kill -0 $WEBHOOK_PID 2>/dev/null; then
+ log_info "Stopping webhook receiver (PID: $WEBHOOK_PID)..."
+ kill $WEBHOOK_PID 2>/dev/null || true
+ fi
+
+ # Kill server if running
+ if [ ! -z "$SERVER_PID" ] && kill -0 $SERVER_PID 2>/dev/null; then
+ log_info "Stopping server (PID: $SERVER_PID)..."
+ kill $SERVER_PID 2>/dev/null || true
+ fi
+
+ # Kill Redis if running
+ if [ ! -z "$REDIS_PID" ] && kill -0 $REDIS_PID 2>/dev/null; then
+ log_info "Stopping Redis (PID: $REDIS_PID)..."
+ kill $REDIS_PID 2>/dev/null || true
+ fi
+
+ # Also kill by port if PIDs didn't work
+ lsof -ti:$SERVER_PORT | xargs kill -9 2>/dev/null || true
+ lsof -ti:$WEBHOOK_PORT | xargs kill -9 2>/dev/null || true
+ lsof -ti:6379 | xargs kill -9 2>/dev/null || true
+
+ # Return to original branch
+ if [ ! -z "$ORIGINAL_BRANCH" ]; then
+ log_info "Switching back to branch: $ORIGINAL_BRANCH"
+ git checkout $ORIGINAL_BRANCH 2>/dev/null || true
+ fi
+
+ log_success "Cleanup complete"
+}
+
+# Set trap to cleanup on exit
+trap cleanup EXIT INT TERM
+
+#############################################################################
+# Main Script
+#############################################################################
+
+log_info "Starting webhook feature test script"
+log_info "Project root: $PROJECT_ROOT"
+
+cd "$PROJECT_ROOT"
+
+# Step 1: Save current branch and fetch PR
+log_info "Step 1: Fetching PR branch..."
+ORIGINAL_BRANCH=$(git rev-parse --abbrev-ref HEAD)
+log_info "Current branch: $ORIGINAL_BRANCH"
+
+git fetch origin $BRANCH_NAME
+log_success "Branch fetched"
+
+# Step 2: Switch to new branch
+log_info "Step 2: Switching to branch: $BRANCH_NAME"
+git checkout $BRANCH_NAME
+log_success "Switched to webhook feature branch"
+
+# Step 3: Activate virtual environment
+log_info "Step 3: Activating virtual environment..."
+if [ ! -d "$VENV_PATH" ]; then
+ log_error "Virtual environment not found at $VENV_PATH"
+ log_info "Creating virtual environment..."
+ python3 -m venv $VENV_PATH
+fi
+
+source $VENV_PATH/bin/activate
+log_success "Virtual environment activated: $(which python)"
+
+# Step 4: Install server dependencies
+log_info "Step 4: Installing server dependencies..."
+pip install -q -r deploy/docker/requirements.txt
+log_success "Dependencies installed"
+
+# Check if Redis is available
+log_info "Checking Redis availability..."
+if ! command -v redis-server &> /dev/null; then
+ log_warning "Redis not found, attempting to install..."
+ if command -v apt-get &> /dev/null; then
+ sudo apt-get update && sudo apt-get install -y redis-server
+ elif command -v brew &> /dev/null; then
+ brew install redis
+ else
+ log_error "Cannot install Redis automatically. Please install Redis manually."
+ exit 1
+ fi
+fi
+
+# Step 5: Start Redis in background
+log_info "Step 5a: Starting Redis..."
+redis-server --port 6379 --daemonize yes
+sleep 2
+REDIS_PID=$(pgrep redis-server)
+log_success "Redis started (PID: $REDIS_PID)"
+
+# Step 5b: Start server in background
+log_info "Step 5b: Starting server on port $SERVER_PORT..."
+cd deploy/docker
+
+# Start server in background
+python3 -m uvicorn server:app --host 0.0.0.0 --port $SERVER_PORT > /tmp/crawl4ai_server.log 2>&1 &
+SERVER_PID=$!
+cd "$PROJECT_ROOT"
+
+log_info "Server started (PID: $SERVER_PID)"
+
+# Wait for server to be ready
+log_info "Waiting for server to be ready..."
+for i in {1..30}; do
+ if curl -s http://localhost:$SERVER_PORT/health > /dev/null 2>&1; then
+ log_success "Server is ready!"
+ break
+ fi
+ if [ $i -eq 30 ]; then
+ log_error "Server failed to start within 30 seconds"
+ log_info "Server logs:"
+ tail -50 /tmp/crawl4ai_server.log
+ exit 1
+ fi
+ echo -n "."
+ sleep 1
+done
+echo ""
+
+# Step 6: Create and run webhook test
+log_info "Step 6: Creating webhook test script..."
+
+cat > /tmp/test_webhook.py << 'PYTHON_SCRIPT'
+import requests
+import json
+import time
+from flask import Flask, request, jsonify
+from threading import Thread, Event
+
+# Configuration
+CRAWL4AI_BASE_URL = "http://localhost:11235"
+WEBHOOK_BASE_URL = "http://localhost:8080"
+
+# Flask app for webhook receiver
+app = Flask(__name__)
+webhook_received = Event()
+webhook_data = {}
+
+@app.route('/webhook', methods=['POST'])
+def handle_webhook():
+ global webhook_data
+ webhook_data = request.json
+ webhook_received.set()
+ print(f"\nβ Webhook received: {json.dumps(webhook_data, indent=2)}")
+ return jsonify({"status": "received"}), 200
+
+def start_webhook_server():
+ app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)
+
+# Start webhook server in background
+webhook_thread = Thread(target=start_webhook_server, daemon=True)
+webhook_thread.start()
+time.sleep(2)
+
+print("π Submitting crawl job with webhook...")
+
+# Submit job with webhook
+payload = {
+ "urls": ["https://example.com"],
+ "browser_config": {"headless": True},
+ "crawler_config": {"cache_mode": "bypass"},
+ "webhook_config": {
+ "webhook_url": f"{WEBHOOK_BASE_URL}/webhook",
+ "webhook_data_in_payload": True
+ }
+}
+
+response = requests.post(
+ f"{CRAWL4AI_BASE_URL}/crawl/job",
+ json=payload,
+ headers={"Content-Type": "application/json"}
+)
+
+if not response.ok:
+ print(f"β Failed to submit job: {response.text}")
+ exit(1)
+
+task_id = response.json()['task_id']
+print(f"β Job submitted successfully, task_id: {task_id}")
+
+# Wait for webhook (with timeout)
+print("β³ Waiting for webhook notification...")
+if webhook_received.wait(timeout=60):
+ print(f"β Webhook received!")
+ print(f" Task ID: {webhook_data.get('task_id')}")
+ print(f" Status: {webhook_data.get('status')}")
+ print(f" URLs: {webhook_data.get('urls')}")
+
+ if webhook_data.get('status') == 'completed':
+ if 'data' in webhook_data:
+ print(f" β Data included in webhook payload")
+ results = webhook_data['data'].get('results', [])
+ if results:
+ print(f" π Crawled {len(results)} URL(s)")
+ for result in results:
+ print(f" - {result.get('url')}: {len(result.get('markdown', ''))} chars")
+ print("\nπ Webhook test PASSED!")
+ exit(0)
+ else:
+ print(f" β Job failed: {webhook_data.get('error')}")
+ exit(1)
+else:
+ print("β Webhook not received within 60 seconds")
+ # Try polling as fallback
+ print("β³ Trying to poll job status...")
+ for i in range(10):
+ status_response = requests.get(f"{CRAWL4AI_BASE_URL}/crawl/job/{task_id}")
+ if status_response.ok:
+ status = status_response.json()
+ print(f" Status: {status.get('status')}")
+ if status.get('status') in ['completed', 'failed']:
+ break
+ time.sleep(2)
+ exit(1)
+PYTHON_SCRIPT
+
+# Install Flask for webhook receiver
+pip install -q flask
+
+# Run the webhook test
+log_info "Running webhook test..."
+python3 /tmp/test_webhook.py &
+WEBHOOK_PID=$!
+
+# Wait for test to complete
+wait $WEBHOOK_PID
+TEST_EXIT_CODE=$?
+
+# Step 7: Verify results
+log_info "Step 7: Verifying test results..."
+if [ $TEST_EXIT_CODE -eq 0 ]; then
+ log_success "β Webhook test PASSED!"
+else
+ log_error "β Webhook test FAILED (exit code: $TEST_EXIT_CODE)"
+ log_info "Server logs:"
+ tail -100 /tmp/crawl4ai_server.log
+ exit 1
+fi
+
+# Step 8: Cleanup happens automatically via trap
+log_success "All tests completed successfully! π"
+log_info "Cleanup will happen automatically..."