feat: add webhook support for /llm/job endpoint

Add comprehensive webhook notification support for the /llm/job endpoint,
following the same pattern as the existing /crawl/job implementation.

Changes:
- Add webhook_config field to LlmJobPayload model (job.py)
- Implement webhook notifications in process_llm_extraction() with 4
  notification points: success, provider validation failure, extraction
  failure, and general exceptions (api.py)
- Store webhook_config in Redis task data for job tracking
- Initialize WebhookDeliveryService with exponential backoff retry logic
Documentation:
- Add Example 6 to WEBHOOK_EXAMPLES.md showing LLM extraction with webhooks
- Update Flask webhook handler to support both crawl and llm_extraction tasks
- Add TypeScript client examples for LLM jobs
- Add comprehensive examples to docker_webhook_example.py with schema support
- Clarify data structure differences between webhook and API responses

Testing:
- Add test_llm_webhook_feature.py with 7 validation tests (all passing)
- Verify pattern consistency with /crawl/job implementation
- Add implementation guide (WEBHOOK_LLM_JOB_IMPLEMENTATION.md)
This commit is contained in:
ntohidi
2025-10-22 13:03:09 +02:00
parent f8606f6865
commit d670dcde0a
6 changed files with 770 additions and 31 deletions

View File

@@ -164,9 +164,55 @@ curl -X POST http://localhost:11235/crawl/job \
The webhook will be sent to the default URL configured in config.yml.
### Example 6: LLM Extraction Job with Webhook
Use webhooks with the LLM extraction endpoint for asynchronous processing.
**Request:**
```bash
curl -X POST http://localhost:11235/llm/job \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/article",
"q": "Extract the article title, author, and publication date",
"schema": "{\"type\": \"object\", \"properties\": {\"title\": {\"type\": \"string\"}, \"author\": {\"type\": \"string\"}, \"date\": {\"type\": \"string\"}}}",
"cache": false,
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/llm-complete",
"webhook_data_in_payload": true
}
}'
```
**Response:**
```json
{
"task_id": "llm_1698765432_12345"
}
```
**Webhook Payload Received:**
```json
{
"task_id": "llm_1698765432_12345",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"date": "2025-10-21"
}
}
}
```
## Webhook Handler Example
Here's a simple Python Flask webhook handler:
Here's a simple Python Flask webhook handler that supports both crawl and LLM extraction jobs:
```python
from flask import Flask, request, jsonify
@@ -179,23 +225,39 @@ def handle_crawl_webhook():
payload = request.json
task_id = payload['task_id']
task_type = payload['task_type']
status = payload['status']
if status == 'completed':
# If data not in payload, fetch it
if 'data' not in payload:
response = requests.get(f'http://localhost:11235/crawl/job/{task_id}')
# Determine endpoint based on task type
endpoint = 'crawl' if task_type == 'crawl' else 'llm'
response = requests.get(f'http://localhost:11235/{endpoint}/job/{task_id}')
data = response.json()
else:
data = payload['data']
# Process the crawl data
print(f"Processing crawl results for {task_id}")
# Process based on task type
if task_type == 'crawl':
print(f"Processing crawl results for {task_id}")
# Handle crawl results
results = data.get('results', [])
for result in results:
print(f" - {result.get('url')}: {len(result.get('markdown', ''))} chars")
elif task_type == 'llm_extraction':
print(f"Processing LLM extraction for {task_id}")
# Handle LLM extraction
# Note: Webhook sends 'extracted_content', API returns 'result'
extracted = data.get('extracted_content', data.get('result', {}))
print(f" - Extracted: {extracted}")
# Your business logic here...
elif status == 'failed':
error = payload.get('error', 'Unknown error')
print(f"Crawl job {task_id} failed: {error}")
print(f"{task_type} job {task_id} failed: {error}")
# Handle failure...
return jsonify({"status": "received"}), 200
@@ -227,6 +289,7 @@ The webhook delivery service uses exponential backoff retry logic:
4. **Flexible** - Choose between notification-only or full data delivery
5. **Secure** - Support for custom headers for authentication
6. **Configurable** - Global defaults or per-job configuration
7. **Universal Support** - Works with both `/crawl/job` and `/llm/job` endpoints
## TypeScript Client Example
@@ -244,6 +307,15 @@ interface CrawlJobRequest {
webhook_config?: WebhookConfig;
}
interface LLMJobRequest {
url: string;
q: string;
schema?: string;
cache?: boolean;
provider?: string;
webhook_config?: WebhookConfig;
}
async function createCrawlJob(request: CrawlJobRequest) {
const response = await fetch('http://localhost:11235/crawl/job', {
method: 'POST',
@@ -255,8 +327,19 @@ async function createCrawlJob(request: CrawlJobRequest) {
return task_id;
}
// Usage
const taskId = await createCrawlJob({
async function createLLMJob(request: LLMJobRequest) {
const response = await fetch('http://localhost:11235/llm/job', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request)
});
const { task_id } = await response.json();
return task_id;
}
// Usage - Crawl Job
const crawlTaskId = await createCrawlJob({
urls: ['https://example.com'],
webhook_config: {
webhook_url: 'https://myapp.com/webhooks/crawl-complete',
@@ -266,6 +349,20 @@ const taskId = await createCrawlJob({
}
}
});
// Usage - LLM Extraction Job
const llmTaskId = await createLLMJob({
url: 'https://example.com/article',
q: 'Extract the main points from this article',
provider: 'openai/gpt-4o-mini',
webhook_config: {
webhook_url: 'https://myapp.com/webhooks/llm-complete',
webhook_data_in_payload: true,
webhook_headers: {
'X-Webhook-Secret': 'my-secret'
}
}
});
```
## Monitoring and Debugging

View File

@@ -116,9 +116,13 @@ async def process_llm_extraction(
instruction: str,
schema: Optional[str] = None,
cache: str = "0",
provider: Optional[str] = None
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None
) -> None:
"""Process LLM extraction in background."""
# Initialize webhook service
webhook_service = WebhookDeliveryService(config)
try:
# Validate provider
is_valid, error_msg = validate_llm_provider(config, provider)
@@ -127,6 +131,16 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": error_msg
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=error_msg
)
return
api_key = get_llm_api_key(config, provider)
llm_strategy = LLMExtractionStrategy(
@@ -155,17 +169,40 @@ async def process_llm_extraction(
"status": TaskStatus.FAILED,
"error": result.error_message
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=result.error_message
)
return
try:
content = json.loads(result.extracted_content)
except json.JSONDecodeError:
content = result.extracted_content
result_data = {"extracted_content": content}
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.COMPLETED,
"result": json.dumps(content)
})
# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="completed",
urls=[url],
webhook_config=webhook_config,
result=result_data
)
except Exception as e:
logger.error(f"LLM extraction error: {str(e)}", exc_info=True)
await redis.hset(f"task:{task_id}", mapping={
@@ -173,6 +210,16 @@ async def process_llm_extraction(
"error": str(e)
})
# Send webhook notification on failure
await webhook_service.notify_job_completion(
task_id=task_id,
task_type="llm_extraction",
status="failed",
urls=[url],
webhook_config=webhook_config,
error=str(e)
)
async def handle_markdown_request(
url: str,
filter_type: FilterType,
@@ -249,7 +296,8 @@ async def handle_llm_request(
schema: Optional[str] = None,
cache: str = "0",
config: Optional[dict] = None,
provider: Optional[str] = None
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None,
) -> JSONResponse:
"""Handle LLM extraction requests."""
base_url = get_base_url(request)
@@ -280,7 +328,8 @@ async def handle_llm_request(
cache,
base_url,
config,
provider
provider,
webhook_config
)
except Exception as e:
@@ -325,7 +374,8 @@ async def create_new_task(
cache: str,
base_url: str,
config: dict,
provider: Optional[str] = None
provider: Optional[str] = None,
webhook_config: Optional[Dict] = None
) -> JSONResponse:
"""Create and initialize a new task."""
decoded_url = unquote(input_path)
@@ -334,12 +384,18 @@ async def create_new_task(
from datetime import datetime
task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}"
await redis.hset(f"task:{task_id}", mapping={
task_data = {
"status": TaskStatus.PROCESSING,
"created_at": datetime.now().isoformat(),
"url": decoded_url
})
}
# Store webhook config if provided
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)
await redis.hset(f"task:{task_id}", mapping=task_data)
background_tasks.add_task(
process_llm_extraction,
@@ -350,7 +406,8 @@ async def create_new_task(
query,
schema,
cache,
provider
provider,
webhook_config
)
return JSONResponse({

View File

@@ -38,6 +38,7 @@ class LlmJobPayload(BaseModel):
schema: Optional[str] = None
cache: bool = False
provider: Optional[str] = None
webhook_config: Optional[WebhookConfig] = None
class CrawlJobPayload(BaseModel):
@@ -55,6 +56,10 @@ async def llm_job_enqueue(
request: Request,
_td: Dict = Depends(lambda: _token_dep()), # late-bound dep
):
webhook_config = None
if payload.webhook_config:
webhook_config = payload.webhook_config.model_dump(mode='json')
return await handle_llm_request(
_redis,
background_tasks,
@@ -65,6 +70,7 @@ async def llm_job_enqueue(
cache=payload.cache,
config=_config,
provider=payload.provider,
webhook_config=webhook_config,
)
@@ -74,7 +80,7 @@ async def llm_job_status(
task_id: str,
_td: Dict = Depends(lambda: _token_dep())
):
return await handle_task_status(_redis, task_id)
return await handle_task_status(_redis, task_id, base_url=str(request.base_url))
# ---------- CRAWL job -------------------------------------------------------