feat(adaptive-crawling): implement adaptive crawling endpoints and integrate with server

This commit is contained in:
AHMET YILMAZ
2025-10-01 15:53:56 +08:00
parent a62cfeebd9
commit 1a8e0236af
5 changed files with 215 additions and 59 deletions

View File

View File

@@ -0,0 +1,120 @@
import uuid
from typing import Any, Dict
from fastapi import APIRouter, BackgroundTasks, HTTPException
from schemas import AdaptiveConfigPayload, AdaptiveCrawlRequest, AdaptiveJobStatus
from crawl4ai import AsyncWebCrawler
from crawl4ai.adaptive_crawler import AdaptiveConfig, AdaptiveCrawler
from crawl4ai.utils import get_error_context
# --- In-memory storage for job statuses. For production, use Redis or a database. ---
ADAPTIVE_JOBS: Dict[str, Dict[str, Any]] = {}
# --- APIRouter for Adaptive Crawling Endpoints ---
router = APIRouter(
prefix="/adaptive/digest",
tags=["Adaptive Crawling"],
)
# --- Background Worker Function ---
async def run_adaptive_digest(task_id: str, request: AdaptiveCrawlRequest):
"""The actual async worker that performs the adaptive crawl."""
try:
# Update job status to RUNNING
ADAPTIVE_JOBS[task_id]["status"] = "RUNNING"
# Create AdaptiveConfig from payload or use default
if request.config:
adaptive_config = AdaptiveConfig(**request.config.model_dump())
else:
adaptive_config = AdaptiveConfig()
# The adaptive crawler needs an instance of the web crawler
async with AsyncWebCrawler() as crawler:
adaptive_crawler = AdaptiveCrawler(crawler, config=adaptive_config)
# This is the long-running operation
final_state = await adaptive_crawler.digest(
start_url=request.start_url, query=request.query
)
# Process the final state into a clean result
result_data = {
"confidence": final_state.metrics.get("confidence", 0.0),
"is_sufficient": adaptive_crawler.is_sufficient,
"coverage_stats": adaptive_crawler.coverage_stats,
"relevant_content": adaptive_crawler.get_relevant_content(top_k=5),
}
# Update job with the final result
ADAPTIVE_JOBS[task_id].update(
{
"status": "COMPLETED",
"result": result_data,
"metrics": final_state.metrics,
}
)
except Exception as e:
# On failure, update the job with an error message
import sys
error_context = get_error_context(sys.exc_info())
error_message = f"Adaptive crawl failed: {str(e)}\nContext: {error_context}"
ADAPTIVE_JOBS[task_id].update({"status": "FAILED", "error": error_message})
# --- API Endpoints ---
@router.post("/job", response_model=AdaptiveJobStatus, status_code=202)
async def submit_adaptive_digest_job(
request: AdaptiveCrawlRequest,
background_tasks: BackgroundTasks,
):
"""
Submit a new adaptive crawling job.
This endpoint starts a long-running adaptive crawl in the background and
immediately returns a task ID for polling the job's status.
"""
print("Received adaptive crawl request:", request)
task_id = str(uuid.uuid4())
# Initialize the job in our in-memory store
ADAPTIVE_JOBS[task_id] = {
"task_id": task_id,
"status": "PENDING",
"metrics": None,
"result": None,
"error": None,
}
# Add the long-running task to the background
background_tasks.add_task(run_adaptive_digest, task_id, request)
return ADAPTIVE_JOBS[task_id]
@router.get("/job/{task_id}", response_model=AdaptiveJobStatus)
async def get_adaptive_digest_status(task_id: str):
"""
Get the status and result of an adaptive crawling job.
Poll this endpoint with the `task_id` returned from the submission
endpoint until the status is 'COMPLETED' or 'FAILED'.
"""
job = ADAPTIVE_JOBS.get(task_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# If the job is running, update the metrics from the live state
if job["status"] == "RUNNING" and job.get("live_state"):
job["metrics"] = job["live_state"].metrics
return job

View File

@@ -0,0 +1,135 @@
from typing import Optional
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from schemas import C4AScriptPayload
from crawl4ai.script import (
CompilationResult,
ValidationResult,
# ErrorDetail
)
# Import all necessary components from the crawl4ai library
# C4A Script Language Support
from crawl4ai.script import (
compile as c4a_compile,
)
from crawl4ai.script import (
validate as c4a_validate,
)
# --- APIRouter for c4a Scripts Endpoints ---
router = APIRouter(
prefix="/c4a",
tags=["c4a Scripts"],
)
# --- Background Worker Function ---
@router.post(
"/validate", response_model=ValidationResult, summary="Validate a C4A-Script"
)
async def validate_c4a_script_endpoint(payload: C4AScriptPayload):
"""
Validates the syntax of a C4A-Script without compiling it.
Returns a `ValidationResult` object indicating whether the script is
valid and providing detailed error information if it's not.
"""
# The validate function is designed not to raise exceptions
validation_result = c4a_validate(payload.script)
return validation_result
@router.post(
"/compile", response_model=CompilationResult, summary="Compile a C4A-Script"
)
async def compile_c4a_script_endpoint(payload: C4AScriptPayload):
"""
Compiles a C4A-Script into executable JavaScript.
If successful, returns the compiled JavaScript code. If there are syntax
errors, it returns a detailed error report.
"""
# The compile function also returns a result object instead of raising
compilation_result = c4a_compile(payload.script)
if not compilation_result.success:
# You can optionally raise an HTTP exception for failed compilations
# This makes it clearer on the client-side that it was a bad request
raise HTTPException(
status_code=400,
detail=compilation_result.to_dict(), # FastAPI will serialize this
)
return compilation_result
@router.post(
"/compile-file",
response_model=CompilationResult,
summary="Compile a C4A-Script from file or string",
)
async def compile_c4a_script_file_endpoint(
file: Optional[UploadFile] = File(None), script: Optional[str] = Form(None)
):
"""
Compiles a C4A-Script into executable JavaScript from either an uploaded file or string content.
Accepts either:
- A file upload containing the C4A-Script
- A string containing the C4A-Script content
At least one of the parameters must be provided.
If successful, returns the compiled JavaScript code. If there are syntax
errors, it returns a detailed error report.
"""
script_content = None
# Validate that at least one input is provided
if not file and not script:
raise HTTPException(
status_code=400,
detail={"error": "Either 'file' or 'script' parameter must be provided"},
)
# If both are provided, prioritize the file
if file and script:
raise HTTPException(
status_code=400,
detail={"error": "Please provide either 'file' or 'script', not both"},
)
# Handle file upload
if file:
try:
file_content = await file.read()
script_content = file_content.decode("utf-8")
except UnicodeDecodeError as exc:
raise HTTPException(
status_code=400,
detail={"error": "File must be a valid UTF-8 text file"},
) from exc
except Exception as e:
raise HTTPException(
status_code=400, detail={"error": f"Error reading file: {str(e)}"}
) from e
# Handle string content
elif script:
script_content = script
# Compile the script content
compilation_result = c4a_compile(script_content)
if not compilation_result.success:
# You can optionally raise an HTTP exception for failed compilations
# This makes it clearer on the client-side that it was a bad request
raise HTTPException(
status_code=400,
detail=compilation_result.to_dict(), # FastAPI will serialize this
)
return compilation_result