diff --git a/deploy/docker/WEBHOOK_EXAMPLES.md b/deploy/docker/WEBHOOK_EXAMPLES.md index a9a61bb7..190efb18 100644 --- a/deploy/docker/WEBHOOK_EXAMPLES.md +++ b/deploy/docker/WEBHOOK_EXAMPLES.md @@ -164,9 +164,55 @@ curl -X POST http://localhost:11235/crawl/job \ The webhook will be sent to the default URL configured in config.yml. +### Example 6: LLM Extraction Job with Webhook + +Use webhooks with the LLM extraction endpoint for asynchronous processing. + +**Request:** +```bash +curl -X POST http://localhost:11235/llm/job \ + -H "Content-Type: application/json" \ + -d '{ + "url": "https://example.com/article", + "q": "Extract the article title, author, and publication date", + "schema": "{\"type\": \"object\", \"properties\": {\"title\": {\"type\": \"string\"}, \"author\": {\"type\": \"string\"}, \"date\": {\"type\": \"string\"}}}", + "cache": false, + "provider": "openai/gpt-4o-mini", + "webhook_config": { + "webhook_url": "https://myapp.com/webhooks/llm-complete", + "webhook_data_in_payload": true + } + }' +``` + +**Response:** +```json +{ + "task_id": "llm_1698765432_12345" +} +``` + +**Webhook Payload Received:** +```json +{ + "task_id": "llm_1698765432_12345", + "task_type": "llm_extraction", + "status": "completed", + "timestamp": "2025-10-21T10:30:00.000000+00:00", + "urls": ["https://example.com/article"], + "data": { + "extracted_content": { + "title": "Understanding Web Scraping", + "author": "John Doe", + "date": "2025-10-21" + } + } +} +``` + ## Webhook Handler Example -Here's a simple Python Flask webhook handler: +Here's a simple Python Flask webhook handler that supports both crawl and LLM extraction jobs: ```python from flask import Flask, request, jsonify @@ -179,23 +225,39 @@ def handle_crawl_webhook(): payload = request.json task_id = payload['task_id'] + task_type = payload['task_type'] status = payload['status'] if status == 'completed': # If data not in payload, fetch it if 'data' not in payload: - response = requests.get(f'http://localhost:11235/crawl/job/{task_id}') + # Determine endpoint based on task type + endpoint = 'crawl' if task_type == 'crawl' else 'llm' + response = requests.get(f'http://localhost:11235/{endpoint}/job/{task_id}') data = response.json() else: data = payload['data'] - # Process the crawl data - print(f"Processing crawl results for {task_id}") + # Process based on task type + if task_type == 'crawl': + print(f"Processing crawl results for {task_id}") + # Handle crawl results + results = data.get('results', []) + for result in results: + print(f" - {result.get('url')}: {len(result.get('markdown', ''))} chars") + + elif task_type == 'llm_extraction': + print(f"Processing LLM extraction for {task_id}") + # Handle LLM extraction + # Note: Webhook sends 'extracted_content', API returns 'result' + extracted = data.get('extracted_content', data.get('result', {})) + print(f" - Extracted: {extracted}") + # Your business logic here... elif status == 'failed': error = payload.get('error', 'Unknown error') - print(f"Crawl job {task_id} failed: {error}") + print(f"{task_type} job {task_id} failed: {error}") # Handle failure... return jsonify({"status": "received"}), 200 @@ -227,6 +289,7 @@ The webhook delivery service uses exponential backoff retry logic: 4. **Flexible** - Choose between notification-only or full data delivery 5. **Secure** - Support for custom headers for authentication 6. **Configurable** - Global defaults or per-job configuration +7. **Universal Support** - Works with both `/crawl/job` and `/llm/job` endpoints ## TypeScript Client Example @@ -244,6 +307,15 @@ interface CrawlJobRequest { webhook_config?: WebhookConfig; } +interface LLMJobRequest { + url: string; + q: string; + schema?: string; + cache?: boolean; + provider?: string; + webhook_config?: WebhookConfig; +} + async function createCrawlJob(request: CrawlJobRequest) { const response = await fetch('http://localhost:11235/crawl/job', { method: 'POST', @@ -255,8 +327,19 @@ async function createCrawlJob(request: CrawlJobRequest) { return task_id; } -// Usage -const taskId = await createCrawlJob({ +async function createLLMJob(request: LLMJobRequest) { + const response = await fetch('http://localhost:11235/llm/job', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(request) + }); + + const { task_id } = await response.json(); + return task_id; +} + +// Usage - Crawl Job +const crawlTaskId = await createCrawlJob({ urls: ['https://example.com'], webhook_config: { webhook_url: 'https://myapp.com/webhooks/crawl-complete', @@ -266,6 +349,20 @@ const taskId = await createCrawlJob({ } } }); + +// Usage - LLM Extraction Job +const llmTaskId = await createLLMJob({ + url: 'https://example.com/article', + q: 'Extract the main points from this article', + provider: 'openai/gpt-4o-mini', + webhook_config: { + webhook_url: 'https://myapp.com/webhooks/llm-complete', + webhook_data_in_payload: true, + webhook_headers: { + 'X-Webhook-Secret': 'my-secret' + } + } +}); ``` ## Monitoring and Debugging diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 0d717183..7a037d70 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -116,9 +116,13 @@ async def process_llm_extraction( instruction: str, schema: Optional[str] = None, cache: str = "0", - provider: Optional[str] = None + provider: Optional[str] = None, + webhook_config: Optional[Dict] = None ) -> None: """Process LLM extraction in background.""" + # Initialize webhook service + webhook_service = WebhookDeliveryService(config) + try: # Validate provider is_valid, error_msg = validate_llm_provider(config, provider) @@ -127,6 +131,16 @@ async def process_llm_extraction( "status": TaskStatus.FAILED, "error": error_msg }) + + # Send webhook notification on failure + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="llm_extraction", + status="failed", + urls=[url], + webhook_config=webhook_config, + error=error_msg + ) return api_key = get_llm_api_key(config, provider) llm_strategy = LLMExtractionStrategy( @@ -155,17 +169,40 @@ async def process_llm_extraction( "status": TaskStatus.FAILED, "error": result.error_message }) + + # Send webhook notification on failure + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="llm_extraction", + status="failed", + urls=[url], + webhook_config=webhook_config, + error=result.error_message + ) return try: content = json.loads(result.extracted_content) except json.JSONDecodeError: content = result.extracted_content + + result_data = {"extracted_content": content} + await redis.hset(f"task:{task_id}", mapping={ "status": TaskStatus.COMPLETED, "result": json.dumps(content) }) + # Send webhook notification on successful completion + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="llm_extraction", + status="completed", + urls=[url], + webhook_config=webhook_config, + result=result_data + ) + except Exception as e: logger.error(f"LLM extraction error: {str(e)}", exc_info=True) await redis.hset(f"task:{task_id}", mapping={ @@ -173,6 +210,16 @@ async def process_llm_extraction( "error": str(e) }) + # Send webhook notification on failure + await webhook_service.notify_job_completion( + task_id=task_id, + task_type="llm_extraction", + status="failed", + urls=[url], + webhook_config=webhook_config, + error=str(e) + ) + async def handle_markdown_request( url: str, filter_type: FilterType, @@ -249,7 +296,8 @@ async def handle_llm_request( schema: Optional[str] = None, cache: str = "0", config: Optional[dict] = None, - provider: Optional[str] = None + provider: Optional[str] = None, + webhook_config: Optional[Dict] = None, ) -> JSONResponse: """Handle LLM extraction requests.""" base_url = get_base_url(request) @@ -280,7 +328,8 @@ async def handle_llm_request( cache, base_url, config, - provider + provider, + webhook_config ) except Exception as e: @@ -325,7 +374,8 @@ async def create_new_task( cache: str, base_url: str, config: dict, - provider: Optional[str] = None + provider: Optional[str] = None, + webhook_config: Optional[Dict] = None ) -> JSONResponse: """Create and initialize a new task.""" decoded_url = unquote(input_path) @@ -334,12 +384,18 @@ async def create_new_task( from datetime import datetime task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}" - - await redis.hset(f"task:{task_id}", mapping={ + + task_data = { "status": TaskStatus.PROCESSING, "created_at": datetime.now().isoformat(), "url": decoded_url - }) + } + + # Store webhook config if provided + if webhook_config: + task_data["webhook_config"] = json.dumps(webhook_config) + + await redis.hset(f"task:{task_id}", mapping=task_data) background_tasks.add_task( process_llm_extraction, @@ -350,7 +406,8 @@ async def create_new_task( query, schema, cache, - provider + provider, + webhook_config ) return JSONResponse({ diff --git a/deploy/docker/job.py b/deploy/docker/job.py index 15289f8d..7388e07c 100644 --- a/deploy/docker/job.py +++ b/deploy/docker/job.py @@ -38,6 +38,7 @@ class LlmJobPayload(BaseModel): schema: Optional[str] = None cache: bool = False provider: Optional[str] = None + webhook_config: Optional[WebhookConfig] = None class CrawlJobPayload(BaseModel): @@ -55,6 +56,10 @@ async def llm_job_enqueue( request: Request, _td: Dict = Depends(lambda: _token_dep()), # late-bound dep ): + webhook_config = None + if payload.webhook_config: + webhook_config = payload.webhook_config.model_dump(mode='json') + return await handle_llm_request( _redis, background_tasks, @@ -65,6 +70,7 @@ async def llm_job_enqueue( cache=payload.cache, config=_config, provider=payload.provider, + webhook_config=webhook_config, ) @@ -74,7 +80,7 @@ async def llm_job_status( task_id: str, _td: Dict = Depends(lambda: _token_dep()) ): - return await handle_task_status(_redis, task_id) + return await handle_task_status(_redis, task_id, base_url=str(request.base_url)) # ---------- CRAWL job ------------------------------------------------------- diff --git a/docs/examples/docker_webhook_example.py b/docs/examples/docker_webhook_example.py index d7ed1d5b..8822e879 100644 --- a/docs/examples/docker_webhook_example.py +++ b/docs/examples/docker_webhook_example.py @@ -2,11 +2,16 @@ Docker Webhook Example for Crawl4AI This example demonstrates how to use webhooks with the Crawl4AI job queue API. -Instead of polling for results, webhooks notify your application when crawls complete. +Instead of polling for results, webhooks notify your application when jobs complete. + +Supports both: +- /crawl/job - Raw crawling with markdown extraction +- /llm/job - LLM-powered content extraction Prerequisites: -1. Crawl4AI Docker container running on localhost:11235 +1. Crawl4AI Docker container running on localhost:11234 2. Flask installed: pip install flask requests +3. LLM API key configured in .llm.env (for LLM extraction examples) Usage: 1. Run this script: python docker_webhook_example.py @@ -21,7 +26,7 @@ from flask import Flask, request, jsonify from threading import Thread # Configuration -CRAWL4AI_BASE_URL = "http://localhost:11235" +CRAWL4AI_BASE_URL = "http://localhost:11234" WEBHOOK_BASE_URL = "http://localhost:8080" # Your webhook receiver URL # Initialize Flask app for webhook receiver @@ -88,6 +93,64 @@ def handle_crawl_webhook(): return jsonify({"status": "received"}), 200 +@app.route('/webhooks/llm-complete', methods=['POST']) +def handle_llm_webhook(): + """ + Webhook handler that receives notifications when LLM extraction jobs complete. + + Payload structure: + { + "task_id": "llm_1698765432_12345", + "task_type": "llm_extraction", + "status": "completed" or "failed", + "timestamp": "2025-10-21T10:30:00.000000+00:00", + "urls": ["https://example.com/article"], + "error": "error message" (only if failed), + "data": {"extracted_content": {...}} (only if webhook_data_in_payload=True) + } + """ + payload = request.json + print(f"\n{'='*60}") + print(f"๐Ÿค– LLM Webhook received for task: {payload['task_id']}") + print(f" Task Type: {payload['task_type']}") + print(f" Status: {payload['status']}") + print(f" Timestamp: {payload['timestamp']}") + print(f" URL: {payload['urls'][0]}") + + if payload['status'] == 'completed': + # If data is in payload, process it directly + if 'data' in payload: + print(f" โœ… Data included in webhook") + data = payload['data'] + # Webhook wraps extracted content in 'extracted_content' field + extracted = data.get('extracted_content', {}) + print(f" - Extracted content:") + print(f" {json.dumps(extracted, indent=8)}") + else: + # Fetch results from API if not included + print(f" ๐Ÿ“ฅ Fetching results from API...") + task_id = payload['task_id'] + result_response = requests.get(f"{CRAWL4AI_BASE_URL}/llm/job/{task_id}") + if result_response.ok: + data = result_response.json() + print(f" โœ… Results fetched successfully") + # API returns unwrapped content in 'result' field + extracted = data['result'] + print(f" - Extracted content:") + print(f" {json.dumps(extracted, indent=8)}") + + elif payload['status'] == 'failed': + print(f" โŒ Job failed: {payload.get('error', 'Unknown error')}") + + print(f"{'='*60}\n") + + # Store webhook for demonstration + received_webhooks.append(payload) + + # Return 200 OK to acknowledge receipt + return jsonify({"status": "received"}), 200 + + def start_webhook_server(): """Start the Flask webhook server in a separate thread""" app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False) @@ -141,6 +204,66 @@ def submit_crawl_job_with_webhook(urls, webhook_url, include_data=False): return None +def submit_llm_job_with_webhook(url, query, webhook_url, include_data=False, schema=None, provider=None): + """ + Submit an LLM extraction job with webhook notification. + + Args: + url: URL to extract content from + query: Instruction for the LLM (e.g., "Extract article title and author") + webhook_url: URL to receive webhook notifications + include_data: Whether to include full results in webhook payload + schema: Optional JSON schema for structured extraction + provider: Optional LLM provider (e.g., "openai/gpt-4o-mini") + + Returns: + task_id: The job's task identifier + """ + payload = { + "url": url, + "q": query, + "cache": False, + "webhook_config": { + "webhook_url": webhook_url, + "webhook_data_in_payload": include_data, + # Optional: Add custom headers for authentication + # "webhook_headers": { + # "X-Webhook-Secret": "your-secret-token" + # } + } + } + + if schema: + payload["schema"] = schema + + if provider: + payload["provider"] = provider + + print(f"\n๐Ÿค– Submitting LLM extraction job...") + print(f" URL: {url}") + print(f" Query: {query}") + print(f" Webhook: {webhook_url}") + print(f" Include data: {include_data}") + if provider: + print(f" Provider: {provider}") + + response = requests.post( + f"{CRAWL4AI_BASE_URL}/llm/job", + json=payload, + headers={"Content-Type": "application/json"} + ) + + if response.ok: + data = response.json() + task_id = data['task_id'] + print(f" โœ… Job submitted successfully") + print(f" Task ID: {task_id}") + return task_id + else: + print(f" โŒ Failed to submit job: {response.text}") + return None + + def submit_job_without_webhook(urls): """ Submit a job without webhook (traditional polling approach). @@ -221,7 +344,7 @@ def main(): except: print(f"โŒ Cannot connect to Crawl4AI at {CRAWL4AI_BASE_URL}") print(" Please make sure Docker container is running:") - print(" docker run -d -p 11235:11235 --name crawl4ai unclecode/crawl4ai:latest") + print(" docker run -d -p 11234:11234 --name crawl4ai unclecode/crawl4ai:latest") return # Start webhook server in background thread @@ -251,34 +374,87 @@ def main(): include_data=True ) - # Example 3: Traditional polling (no webhook) + # Example 3: LLM extraction with webhook (notification only) time.sleep(5) # Wait a bit between requests print(f"\n{'='*60}") - print("Example 3: Traditional Polling (No Webhook)") + print("Example 3: LLM Extraction with Webhook (Notification Only)") print(f"{'='*60}") - task_id_3 = submit_job_without_webhook( + task_id_3 = submit_llm_job_with_webhook( + url="https://www.example.com", + query="Extract the main heading and description from this page.", + webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete", + include_data=False, + provider="openai/gpt-4o-mini" + ) + + # Example 4: LLM extraction with webhook (data included + schema) + time.sleep(5) # Wait a bit between requests + print(f"\n{'='*60}") + print("Example 4: LLM Extraction with Schema and Full Data") + print(f"{'='*60}") + + # Define a schema for structured extraction + schema = json.dumps({ + "type": "object", + "properties": { + "title": {"type": "string", "description": "Page title"}, + "description": {"type": "string", "description": "Page description"} + }, + "required": ["title"] + }) + + task_id_4 = submit_llm_job_with_webhook( + url="https://www.python.org", + query="Extract the title and description of this website", + webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete", + include_data=True, + schema=schema, + provider="openai/gpt-4o-mini" + ) + + # Example 5: Traditional polling (no webhook) + time.sleep(5) # Wait a bit between requests + print(f"\n{'='*60}") + print("Example 5: Traditional Polling (No Webhook)") + print(f"{'='*60}") + task_id_5 = submit_job_without_webhook( urls=["https://github.com"] ) - if task_id_3: - result = poll_job_status(task_id_3) + if task_id_5: + result = poll_job_status(task_id_5) if result and result.get('status') == 'completed': print(f" โœ… Results retrieved via polling") # Wait for webhooks to arrive print(f"\nโณ Waiting for webhooks to be received...") - time.sleep(20) # Give jobs time to complete and webhooks to arrive + time.sleep(30) # Give jobs time to complete and webhooks to arrive (longer for LLM) # Summary print(f"\n{'='*60}") print("Summary") print(f"{'='*60}") print(f"Total webhooks received: {len(received_webhooks)}") + + crawl_webhooks = [w for w in received_webhooks if w['task_type'] == 'crawl'] + llm_webhooks = [w for w in received_webhooks if w['task_type'] == 'llm_extraction'] + + print(f"\n๐Ÿ“Š Breakdown:") + print(f" - Crawl webhooks: {len(crawl_webhooks)}") + print(f" - LLM extraction webhooks: {len(llm_webhooks)}") + + print(f"\n๐Ÿ“‹ Details:") for i, webhook in enumerate(received_webhooks, 1): - print(f"{i}. Task {webhook['task_id']}: {webhook['status']}") + task_type = webhook['task_type'] + icon = "๐Ÿ•ท๏ธ" if task_type == "crawl" else "๐Ÿค–" + print(f"{i}. {icon} Task {webhook['task_id']}: {webhook['status']} ({task_type})") print(f"\nโœ… Demo completed!") - print(f"\n๐Ÿ’ก Pro tip: In production, your webhook URL should be publicly accessible") - print(f" (e.g., https://myapp.com/webhooks/crawl) or use a service like ngrok for testing.") + print(f"\n๐Ÿ’ก Pro tips:") + print(f" - In production, your webhook URL should be publicly accessible") + print(f" (e.g., https://myapp.com/webhooks) or use ngrok for testing") + print(f" - Both /crawl/job and /llm/job support the same webhook configuration") + print(f" - Use webhook_data_in_payload=true to get results directly in the webhook") + print(f" - LLM jobs may take longer, adjust timeouts accordingly") if __name__ == "__main__": diff --git a/test_llm_webhook_feature.py b/test_llm_webhook_feature.py new file mode 100644 index 00000000..98133e82 --- /dev/null +++ b/test_llm_webhook_feature.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 +""" +Test script to validate webhook implementation for /llm/job endpoint. + +This tests that the /llm/job endpoint now supports webhooks +following the same pattern as /crawl/job. +""" + +import sys +import os + +# Add deploy/docker to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'deploy', 'docker')) + +def test_llm_job_payload_model(): + """Test that LlmJobPayload includes webhook_config field""" + print("=" * 60) + print("TEST 1: LlmJobPayload Model") + print("=" * 60) + + try: + from job import LlmJobPayload + from schemas import WebhookConfig + from pydantic import ValidationError + + # Test with webhook_config + payload_dict = { + "url": "https://example.com", + "q": "Extract main content", + "schema": None, + "cache": False, + "provider": None, + "webhook_config": { + "webhook_url": "https://myapp.com/webhook", + "webhook_data_in_payload": True, + "webhook_headers": {"X-Secret": "token"} + } + } + + payload = LlmJobPayload(**payload_dict) + + print(f"โœ… LlmJobPayload accepts webhook_config") + print(f" - URL: {payload.url}") + print(f" - Query: {payload.q}") + print(f" - Webhook URL: {payload.webhook_config.webhook_url}") + print(f" - Data in payload: {payload.webhook_config.webhook_data_in_payload}") + + # Test without webhook_config (should be optional) + minimal_payload = { + "url": "https://example.com", + "q": "Extract content" + } + + payload2 = LlmJobPayload(**minimal_payload) + assert payload2.webhook_config is None, "webhook_config should be optional" + print(f"โœ… LlmJobPayload works without webhook_config (optional)") + + return True + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_handle_llm_request_signature(): + """Test that handle_llm_request accepts webhook_config parameter""" + print("\n" + "=" * 60) + print("TEST 2: handle_llm_request Function Signature") + print("=" * 60) + + try: + from api import handle_llm_request + import inspect + + sig = inspect.signature(handle_llm_request) + params = list(sig.parameters.keys()) + + print(f"Function parameters: {params}") + + if 'webhook_config' in params: + print(f"โœ… handle_llm_request has webhook_config parameter") + + # Check that it's optional with default None + webhook_param = sig.parameters['webhook_config'] + if webhook_param.default is None or webhook_param.default == inspect.Parameter.empty: + print(f"โœ… webhook_config is optional (default: {webhook_param.default})") + else: + print(f"โš ๏ธ webhook_config default is: {webhook_param.default}") + + return True + else: + print(f"โŒ handle_llm_request missing webhook_config parameter") + return False + + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_process_llm_extraction_signature(): + """Test that process_llm_extraction accepts webhook_config parameter""" + print("\n" + "=" * 60) + print("TEST 3: process_llm_extraction Function Signature") + print("=" * 60) + + try: + from api import process_llm_extraction + import inspect + + sig = inspect.signature(process_llm_extraction) + params = list(sig.parameters.keys()) + + print(f"Function parameters: {params}") + + if 'webhook_config' in params: + print(f"โœ… process_llm_extraction has webhook_config parameter") + + webhook_param = sig.parameters['webhook_config'] + if webhook_param.default is None or webhook_param.default == inspect.Parameter.empty: + print(f"โœ… webhook_config is optional (default: {webhook_param.default})") + else: + print(f"โš ๏ธ webhook_config default is: {webhook_param.default}") + + return True + else: + print(f"โŒ process_llm_extraction missing webhook_config parameter") + return False + + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_webhook_integration_in_api(): + """Test that api.py properly integrates webhook notifications""" + print("\n" + "=" * 60) + print("TEST 4: Webhook Integration in process_llm_extraction") + print("=" * 60) + + try: + api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py') + + with open(api_file, 'r') as f: + api_content = f.read() + + # Check for WebhookDeliveryService initialization + if 'webhook_service = WebhookDeliveryService(config)' in api_content: + print("โœ… process_llm_extraction initializes WebhookDeliveryService") + else: + print("โŒ Missing WebhookDeliveryService initialization in process_llm_extraction") + return False + + # Check for notify_job_completion calls with llm_extraction + if 'task_type="llm_extraction"' in api_content: + print("โœ… Uses correct task_type='llm_extraction' for notifications") + else: + print("โŒ Missing task_type='llm_extraction' in webhook notifications") + return False + + # Count webhook notification calls (should have at least 3: success + 2 failure paths) + notification_count = api_content.count('await webhook_service.notify_job_completion') + # Find only in process_llm_extraction function + llm_func_start = api_content.find('async def process_llm_extraction') + llm_func_end = api_content.find('\nasync def ', llm_func_start + 1) + if llm_func_end == -1: + llm_func_end = len(api_content) + + llm_func_content = api_content[llm_func_start:llm_func_end] + llm_notification_count = llm_func_content.count('await webhook_service.notify_job_completion') + + print(f"โœ… Found {llm_notification_count} webhook notification calls in process_llm_extraction") + + if llm_notification_count >= 3: + print(f"โœ… Sufficient notification points (success + failure paths)") + else: + print(f"โš ๏ธ Expected at least 3 notification calls, found {llm_notification_count}") + + return True + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_job_endpoint_integration(): + """Test that /llm/job endpoint extracts and passes webhook_config""" + print("\n" + "=" * 60) + print("TEST 5: /llm/job Endpoint Integration") + print("=" * 60) + + try: + job_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'job.py') + + with open(job_file, 'r') as f: + job_content = f.read() + + # Find the llm_job_enqueue function + llm_job_start = job_content.find('async def llm_job_enqueue') + llm_job_end = job_content.find('\n\n@router', llm_job_start + 1) + if llm_job_end == -1: + llm_job_end = job_content.find('\n\nasync def', llm_job_start + 1) + + llm_job_func = job_content[llm_job_start:llm_job_end] + + # Check for webhook_config extraction + if 'webhook_config = None' in llm_job_func: + print("โœ… llm_job_enqueue initializes webhook_config variable") + else: + print("โŒ Missing webhook_config initialization") + return False + + if 'if payload.webhook_config:' in llm_job_func: + print("โœ… llm_job_enqueue checks for payload.webhook_config") + else: + print("โŒ Missing webhook_config check") + return False + + if 'webhook_config = payload.webhook_config.model_dump(mode=\'json\')' in llm_job_func: + print("โœ… llm_job_enqueue converts webhook_config to dict") + else: + print("โŒ Missing webhook_config.model_dump conversion") + return False + + if 'webhook_config=webhook_config' in llm_job_func: + print("โœ… llm_job_enqueue passes webhook_config to handle_llm_request") + else: + print("โŒ Missing webhook_config parameter in handle_llm_request call") + return False + + return True + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_create_new_task_integration(): + """Test that create_new_task stores webhook_config in Redis""" + print("\n" + "=" * 60) + print("TEST 6: create_new_task Webhook Storage") + print("=" * 60) + + try: + api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py') + + with open(api_file, 'r') as f: + api_content = f.read() + + # Find create_new_task function + create_task_start = api_content.find('async def create_new_task') + create_task_end = api_content.find('\nasync def ', create_task_start + 1) + if create_task_end == -1: + create_task_end = len(api_content) + + create_task_func = api_content[create_task_start:create_task_end] + + # Check for webhook_config storage + if 'if webhook_config:' in create_task_func: + print("โœ… create_new_task checks for webhook_config") + else: + print("โŒ Missing webhook_config check in create_new_task") + return False + + if 'task_data["webhook_config"] = json.dumps(webhook_config)' in create_task_func: + print("โœ… create_new_task stores webhook_config in Redis task data") + else: + print("โŒ Missing webhook_config storage in task_data") + return False + + # Check that webhook_config is passed to process_llm_extraction + if 'webhook_config' in create_task_func and 'background_tasks.add_task' in create_task_func: + print("โœ… create_new_task passes webhook_config to background task") + else: + print("โš ๏ธ Could not verify webhook_config passed to background task") + + return True + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_pattern_consistency(): + """Test that /llm/job follows the same pattern as /crawl/job""" + print("\n" + "=" * 60) + print("TEST 7: Pattern Consistency with /crawl/job") + print("=" * 60) + + try: + api_file = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py') + + with open(api_file, 'r') as f: + api_content = f.read() + + # Find handle_crawl_job to compare pattern + crawl_job_start = api_content.find('async def handle_crawl_job') + crawl_job_end = api_content.find('\nasync def ', crawl_job_start + 1) + if crawl_job_end == -1: + crawl_job_end = len(api_content) + crawl_job_func = api_content[crawl_job_start:crawl_job_end] + + # Find process_llm_extraction + llm_extract_start = api_content.find('async def process_llm_extraction') + llm_extract_end = api_content.find('\nasync def ', llm_extract_start + 1) + if llm_extract_end == -1: + llm_extract_end = len(api_content) + llm_extract_func = api_content[llm_extract_start:llm_extract_end] + + print("Checking pattern consistency...") + + # Both should initialize WebhookDeliveryService + crawl_has_service = 'webhook_service = WebhookDeliveryService(config)' in crawl_job_func + llm_has_service = 'webhook_service = WebhookDeliveryService(config)' in llm_extract_func + + if crawl_has_service and llm_has_service: + print("โœ… Both initialize WebhookDeliveryService") + else: + print(f"โŒ Service initialization mismatch (crawl: {crawl_has_service}, llm: {llm_has_service})") + return False + + # Both should call notify_job_completion on success + crawl_notifies_success = 'status="completed"' in crawl_job_func and 'notify_job_completion' in crawl_job_func + llm_notifies_success = 'status="completed"' in llm_extract_func and 'notify_job_completion' in llm_extract_func + + if crawl_notifies_success and llm_notifies_success: + print("โœ… Both notify on success") + else: + print(f"โŒ Success notification mismatch (crawl: {crawl_notifies_success}, llm: {llm_notifies_success})") + return False + + # Both should call notify_job_completion on failure + crawl_notifies_failure = 'status="failed"' in crawl_job_func and 'error=' in crawl_job_func + llm_notifies_failure = 'status="failed"' in llm_extract_func and 'error=' in llm_extract_func + + if crawl_notifies_failure and llm_notifies_failure: + print("โœ… Both notify on failure") + else: + print(f"โŒ Failure notification mismatch (crawl: {crawl_notifies_failure}, llm: {llm_notifies_failure})") + return False + + print("โœ… /llm/job follows the same pattern as /crawl/job") + return True + + except Exception as e: + print(f"โŒ Failed: {e}") + import traceback + traceback.print_exc() + return False + +def main(): + """Run all tests""" + print("\n๐Ÿงช LLM Job Webhook Feature Validation") + print("=" * 60) + print("Testing that /llm/job now supports webhooks like /crawl/job") + print("=" * 60 + "\n") + + results = [] + + # Run all tests + results.append(("LlmJobPayload Model", test_llm_job_payload_model())) + results.append(("handle_llm_request Signature", test_handle_llm_request_signature())) + results.append(("process_llm_extraction Signature", test_process_llm_extraction_signature())) + results.append(("Webhook Integration", test_webhook_integration_in_api())) + results.append(("/llm/job Endpoint", test_job_endpoint_integration())) + results.append(("create_new_task Storage", test_create_new_task_integration())) + results.append(("Pattern Consistency", test_pattern_consistency())) + + # Print summary + print("\n" + "=" * 60) + print("TEST SUMMARY") + print("=" * 60) + + passed = sum(1 for _, result in results if result) + total = len(results) + + for test_name, result in results: + status = "โœ… PASS" if result else "โŒ FAIL" + print(f"{status} - {test_name}") + + print(f"\n{'=' * 60}") + print(f"Results: {passed}/{total} tests passed") + print(f"{'=' * 60}") + + if passed == total: + print("\n๐ŸŽ‰ All tests passed! /llm/job webhook feature is correctly implemented.") + print("\n๐Ÿ“ Summary of changes:") + print(" 1. LlmJobPayload model includes webhook_config field") + print(" 2. /llm/job endpoint extracts and passes webhook_config") + print(" 3. handle_llm_request accepts webhook_config parameter") + print(" 4. create_new_task stores webhook_config in Redis") + print(" 5. process_llm_extraction sends webhook notifications") + print(" 6. Follows the same pattern as /crawl/job") + return 0 + else: + print(f"\nโš ๏ธ {total - passed} test(s) failed. Please review the output above.") + return 1 + +if __name__ == "__main__": + exit(main()) diff --git a/test_webhook_implementation.py b/test_webhook_implementation.py index c7a68e4f..072db8b3 100644 --- a/test_webhook_implementation.py +++ b/test_webhook_implementation.py @@ -14,7 +14,8 @@ import json from datetime import datetime, timezone # Add deploy/docker to path to import modules -sys.path.insert(0, '/home/user/crawl4ai/deploy/docker') +# sys.path.insert(0, '/home/user/crawl4ai/deploy/docker') +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'deploy', 'docker')) def test_imports(): """Test that all webhook-related modules can be imported""" @@ -237,7 +238,8 @@ def test_api_integration(): try: # Check if api.py can import webhook module - with open('/home/user/crawl4ai/deploy/docker/api.py', 'r') as f: + api_path = os.path.join(os.path.dirname(__file__), 'deploy', 'docker', 'api.py') + with open(api_path, 'r') as f: api_content = f.read() if 'from webhook import WebhookDeliveryService' in api_content: