feat: Add HTTP-only crawling endpoints and related models

- Introduced HTTPCrawlRequest and HTTPCrawlRequestWithHooks models for HTTP-only crawling.
- Implemented /crawl/http and /crawl/http/stream endpoints for fast, lightweight crawling without browser rendering.
- Enhanced server.py to handle HTTP crawl requests and streaming responses.
- Updated utils.py to disable memory wait timeout for testing.
- Expanded API documentation to include new HTTP crawling features.
- Added tests for HTTP crawling endpoints, including error handling and streaming responses.
This commit is contained in:
AHMET YILMAZ
2025-10-15 17:45:58 +08:00
parent aebf5a3694
commit 674d0741da
8 changed files with 1091 additions and 45 deletions

View File

@@ -18,9 +18,11 @@ from crawl4ai import (
BrowserConfig,
CacheMode,
CrawlerRunConfig,
HTTPCrawlerConfig,
LLMConfig,
LLMExtractionStrategy,
MemoryAdaptiveDispatcher,
NoExtractionStrategy,
PlaywrightAdapter,
RateLimiter,
SeedingConfig,
@@ -53,6 +55,7 @@ from crawl4ai.content_filter_strategy import (
)
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
from crawl4ai.utils import perform_completion_with_backoff
# Import missing utility functions and types
@@ -60,7 +63,7 @@ try:
from utils import (
FilterType, TaskStatus, get_base_url, is_task_id,
get_llm_api_key, get_llm_temperature, get_llm_base_url,
validate_llm_provider, create_chunking_strategy
validate_llm_provider, create_chunking_strategy, decode_redis_hash
)
except ImportError:
# Fallback definitions for development/testing
@@ -94,6 +97,12 @@ except ImportError:
def validate_llm_provider(config, provider):
return True, None
def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
"""Fallback decode_redis_hash function"""
return {k.decode('utf-8') if isinstance(k, bytes) else str(k):
v.decode('utf-8') if isinstance(v, bytes) else str(v)
for k, v in hash_data.items()}
logger = logging.getLogger(__name__)
@@ -682,8 +691,11 @@ async def stream_results(
}
yield (json.dumps(error_response) + "\n").encode("utf-8")
yield json.dumps({"status": "completed"}).encode("utf-8")
yield (json.dumps({"status": "completed"}) + "\n").encode("utf-8")
except Exception as e:
logger.error(f"Streaming error: {e}")
yield (json.dumps({"status": "error", "message": str(e)}) + "\n").encode("utf-8")
except asyncio.CancelledError:
logger.warning("Client disconnected during streaming")
finally:
@@ -748,6 +760,7 @@ async def handle_crawl_request(
# Legacy fallback: create MemoryAdaptiveDispatcher with old config
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
memory_wait_timeout=None, # Disable memory timeout for testing
rate_limiter=RateLimiter(
base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"])
)
@@ -965,6 +978,7 @@ async def handle_stream_crawl_request(
# Legacy fallback: create MemoryAdaptiveDispatcher with old config
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
memory_wait_timeout=None, # Disable memory timeout for testing
rate_limiter=RateLimiter(
base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"])
),
@@ -1111,3 +1125,333 @@ async def handle_url_discovery(domain, seeding_config):
return urls
except Exception as e:
return []
# ============================================================================
# HTTP Crawling Handlers
# ============================================================================
async def handle_http_crawl_request(
urls: List[str],
http_config: dict,
crawler_config: dict,
config: dict,
hooks_config: Optional[dict] = None,
dispatcher = None,
) -> dict:
"""Handle HTTP-only crawl requests with optional hooks."""
start_mem_mb = _get_memory_mb() # <--- Get memory before
start_time = time.time()
mem_delta_mb = None
peak_mem_mb = start_mem_mb
hook_manager = None
try:
urls = [
("https://" + url)
if not url.startswith(("http://", "https://"))
and not url.startswith(("raw:", "raw://"))
else url
for url in urls
]
# Load HTTP config instead of browser config
http_config = HTTPCrawlerConfig.from_kwargs(http_config)
crawler_config = CrawlerRunConfig.load(crawler_config)
# Create HTTP crawler strategy
http_strategy = AsyncHTTPCrawlerStrategy(browser_config=http_config)
# Use provided dispatcher or fallback to legacy behavior
if dispatcher is None:
# Legacy fallback: create MemoryAdaptiveDispatcher with old config
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
memory_wait_timeout=None, # Disable memory timeout for testing
rate_limiter=RateLimiter(
base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"])
)
if config["crawler"]["rate_limiter"]["enabled"]
else None,
)
# Create crawler with HTTP strategy (no browser pooling needed)
crawler = AsyncWebCrawler(crawler_strategy=http_strategy)
await crawler.start()
# Attach hooks if provided
hooks_status = {}
if hooks_config:
from hook_manager import UserHookManager, attach_user_hooks_to_crawler
hook_manager = UserHookManager(timeout=hooks_config.get("timeout", 30))
hooks_status, hook_manager = await attach_user_hooks_to_crawler(
crawler,
hooks_config.get("code", {}),
timeout=hooks_config.get("timeout", 30),
hook_manager=hook_manager,
)
logger.info(f"Hooks attachment status: {hooks_status['status']}")
base_config = config["crawler"]["base_config"]
# Iterate on key-value pairs in global_config then use hasattr to set them
for key, value in base_config.items():
if hasattr(crawler_config, key):
current_value = getattr(crawler_config, key)
# Only set base config if user didn't provide a value
if current_value is None or current_value == "":
setattr(crawler_config, key, value)
results = []
func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many")
partial_func = partial(
func,
urls[0] if len(urls) == 1 else urls,
config=crawler_config,
dispatcher=dispatcher,
)
results = await partial_func()
# Ensure results is always a list
if not isinstance(results, list):
results = [results]
await crawler.close() # Close HTTP crawler after use
# Process results to handle PDF bytes
processed_results = []
for result in results:
try:
# Check if result has model_dump method (is a proper CrawlResult)
if hasattr(result, "model_dump"):
result_dict = result.model_dump()
elif isinstance(result, dict):
result_dict = result
else:
# Handle unexpected result type
logger.warning(f"Unexpected result type: {type(result)}")
result_dict = {
"url": str(result) if hasattr(result, "__str__") else "unknown",
"success": False,
"error_message": f"Unexpected result type: {type(result).__name__}",
}
# if fit_html is not a string, set it to None to avoid serialization errors
if "fit_html" in result_dict and not (
result_dict["fit_html"] is None
or isinstance(result_dict["fit_html"], str)
):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get("pdf") is not None and isinstance(
result_dict.get("pdf"), bytes
):
result_dict["pdf"] = b64encode(result_dict["pdf"]).decode("utf-8")
processed_results.append(result_dict)
except Exception as e:
logger.error(f"Error processing result: {e}")
processed_results.append(
{"url": "unknown", "success": False, "error_message": str(e)}
)
end_mem_mb = _get_memory_mb() # <--- Get memory after
end_time = time.time()
if start_mem_mb is not None and end_mem_mb is not None:
mem_delta_mb = end_mem_mb - start_mem_mb # <--- Calculate delta
peak_mem_mb = max(
peak_mem_mb if peak_mem_mb else 0, end_mem_mb
) # <--- Get peak memory
logger.info(
f"HTTP Memory usage: Start: {start_mem_mb} MB, End: {end_mem_mb} MB, Delta: {mem_delta_mb} MB, Peak: {peak_mem_mb} MB"
)
response = {
"success": True,
"results": processed_results,
"server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb,
}
# Add hooks information if hooks were used
if hooks_config and hook_manager:
from hook_manager import UserHookManager
if isinstance(hook_manager, UserHookManager):
try:
# Ensure all hook data is JSON serializable
hook_data = {
"status": hooks_status,
"execution_log": hook_manager.execution_log,
"errors": hook_manager.errors,
"summary": hook_manager.get_summary(),
}
# Test that it's serializable
json.dumps(hook_data)
response["hooks"] = hook_data
except (TypeError, ValueError) as e:
logger.error(f"Hook data not JSON serializable: {e}")
response["hooks"] = {
"status": {
"status": "error",
"message": "Hook data serialization failed",
},
"execution_log": [],
"errors": [{"error": str(e)}],
"summary": {},
}
return response
except Exception as e:
logger.error(f"HTTP crawl error: {str(e)}", exc_info=True)
if (
"crawler" in locals() and crawler.ready
): # Check if crawler was initialized and started
try:
await crawler.close()
except Exception as close_e:
logger.error(f"Error closing HTTP crawler during exception handling: {close_e}")
return {
"success": False,
"error": str(e),
"server_processing_time_s": time.time() - start_time,
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb,
}
async def handle_http_stream_crawl_request(
urls: List[str],
http_config: dict,
crawler_config: dict,
config: dict,
hooks_config: Optional[dict] = None,
dispatcher = None,
) -> Tuple[AsyncWebCrawler, AsyncGenerator, Optional[dict]]:
"""Handle HTTP-only streaming crawl requests with optional hooks."""
urls = [
("https://" + url)
if not url.startswith(("http://", "https://"))
and not url.startswith(("raw:", "raw://"))
else url
for url in urls
]
# Load HTTP config instead of browser config
http_config = HTTPCrawlerConfig.from_kwargs(http_config)
crawler_config = CrawlerRunConfig.load(crawler_config)
# Create HTTP crawler strategy
http_strategy = AsyncHTTPCrawlerStrategy(browser_config=http_config)
# Use provided dispatcher or fallback to legacy behavior
if dispatcher is None:
# Legacy fallback: create MemoryAdaptiveDispatcher with old config
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
memory_wait_timeout=None, # Disable memory timeout for testing
rate_limiter=RateLimiter(
base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"])
)
if config["crawler"]["rate_limiter"]["enabled"]
else None,
)
# Create crawler with HTTP strategy (no browser pooling needed)
crawler = AsyncWebCrawler(crawler_strategy=http_strategy)
await crawler.start()
# Attach hooks if provided
hooks_info = None
if hooks_config:
from hook_manager import UserHookManager, attach_user_hooks_to_crawler
hook_manager = UserHookManager(timeout=hooks_config.get("timeout", 30))
hooks_status, hook_manager = await attach_user_hooks_to_crawler(
crawler,
hooks_config.get("code", {}),
timeout=hooks_config.get("timeout", 30),
hook_manager=hook_manager,
)
logger.info(f"HTTP Hooks attachment status: {hooks_status['status']}")
hooks_info = {
"status": hooks_status,
"execution_log": hook_manager.execution_log,
"errors": hook_manager.errors,
"summary": hook_manager.get_summary(),
}
base_config = config["crawler"]["base_config"]
# Iterate on key-value pairs in global_config then use hasattr to set them
for key, value in base_config.items():
if hasattr(crawler_config, key):
current_value = getattr(crawler_config, key)
# Only set base config if user didn't provide a value
if current_value is None or current_value == "":
setattr(crawler_config, key, value)
# Create streaming generator
func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many")
partial_func = partial(
func,
urls[0] if len(urls) == 1 else urls,
config=crawler_config,
dispatcher=dispatcher,
)
async def stream_generator():
try:
results = await partial_func()
# Ensure results is always a list
if not isinstance(results, list):
results = [results]
for result in results:
try:
# Check if result has model_dump method (is a proper CrawlResult)
if hasattr(result, "model_dump"):
result_dict = result.model_dump()
elif isinstance(result, dict):
result_dict = result
else:
# Handle unexpected result type
logger.warning(f"Unexpected result type: {type(result)}")
result_dict = {
"url": str(result) if hasattr(result, "__str__") else "unknown",
"success": False,
"error_message": f"Unexpected result type: {type(result).__name__}",
}
# if fit_html is not a string, set it to None to avoid serialization errors
if "fit_html" in result_dict and not (
result_dict["fit_html"] is None
or isinstance(result_dict["fit_html"], str)
):
result_dict["fit_html"] = None
# If PDF exists, encode it to base64
if result_dict.get("pdf") is not None and isinstance(
result_dict.get("pdf"), bytes
):
result_dict["pdf"] = b64encode(result_dict["pdf"]).decode("utf-8")
yield result_dict
except Exception as e:
logger.error(f"Error processing stream result: {e}")
yield {"url": "unknown", "success": False, "error_message": str(e)}
except Exception as e:
logger.error(f"Error in HTTP streaming: {e}")
yield {"url": "unknown", "success": False, "error_message": f"Streaming error: {str(e)}"}
finally:
# Yield completion marker
yield {"status": "completed"}
await crawler.close() # Close HTTP crawler after streaming
return crawler, stream_generator(), hooks_info

View File

@@ -123,6 +123,34 @@ class CrawlRequestWithHooks(CrawlRequest):
)
class HTTPCrawlRequest(BaseModel):
"""Request model for HTTP-only crawling endpoints."""
urls: List[str] = Field(min_length=1, max_length=100, description="List of URLs to crawl")
http_config: Optional[Dict] = Field(
default_factory=dict,
description="HTTP crawler configuration (method, headers, timeout, etc.)"
)
crawler_config: Optional[Dict] = Field(
default_factory=dict,
description="Crawler run configuration (extraction, filtering, etc.)"
)
# Dispatcher selection (same as browser crawling)
dispatcher: Optional[DispatcherType] = Field(
None,
description="Dispatcher type to use. Defaults to memory_adaptive if not specified."
)
class HTTPCrawlRequestWithHooks(HTTPCrawlRequest):
"""Extended HTTP crawl request with hooks support"""
hooks: Optional[HookConfig] = Field(
default=None, description="Optional user-provided hook functions"
)
class MarkdownRequest(BaseModel):
"""Request body for the /md endpoint."""

View File

@@ -11,7 +11,7 @@ from crawler_pool import get_crawler, close_all, janitor
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, LinkPreviewConfig
from auth import create_access_token, get_token_dependency, TokenRequest
from pydantic import BaseModel
from typing import Optional, List, Dict
from typing import Optional, List, Dict, AsyncGenerator
from fastapi import Request, Depends
from fastapi.responses import FileResponse
import ast
@@ -20,19 +20,30 @@ import base64
import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, LinkPreviewConfig
from api import (
handle_markdown_request, handle_llm_qa,
handle_stream_crawl_request, handle_crawl_request,
stream_results
handle_crawl_request,
handle_http_crawl_request,
handle_http_stream_crawl_request,
handle_llm_qa,
handle_markdown_request,
handle_seed,
handle_stream_crawl_request,
handle_url_discovery,
stream_results,
)
from schemas import (
CrawlRequest,
CrawlRequestWithHooks,
MarkdownRequest,
RawCode,
HTMLRequest,
ScreenshotRequest,
PDFRequest,
HTTPCrawlRequest,
HTTPCrawlRequestWithHooks,
JSEndpointRequest,
LinkAnalysisRequest,
MarkdownRequest,
PDFRequest,
RawCode,
ScreenshotRequest,
SeedRequest,
URLDiscoveryRequest,
)
from utils import (
@@ -1569,9 +1580,10 @@ async def crawl(
dispatcher=dispatcher,
)
# check if all of the results are not successful
if all(not result["success"] for result in results["results"]):
if results["results"] and all(not result["success"] for result in results["results"]):
error_message = results['results'][0].get('error_message', 'Unknown error') if results['results'] else 'No results returned'
raise HTTPException(
500, f"Crawl request failed: {results['results'][0]['error_message']}"
500, f"Crawl request failed: {error_message}"
)
return JSONResponse(results)
@@ -1737,8 +1749,223 @@ async def stream_process(crawl_request: CrawlRequestWithHooks):
)
# ============================================================================
# HTTP Crawling Endpoints
# ============================================================================
@app.post("/crawl/http",
summary="Crawl URLs with HTTP-only strategy",
description="Crawl one or more URLs using a fast, lightweight HTTP-only strategy without browser rendering.",
response_description="Crawl results with extracted content, metadata, and media",
tags=["HTTP Crawling"]
)
@limiter.limit(config["rate_limiting"]["default_limit"])
async def crawl_http(
request: Request,
crawl_request: HTTPCrawlRequest | HTTPCrawlRequestWithHooks,
_td: Dict = Depends(token_dep),
):
"""
Crawl one or more URLs using HTTP-only strategy.
This endpoint provides fast, lightweight crawling without browser rendering.
Perfect for static websites, APIs, and content that doesn't require JavaScript execution.
**Request Body:**
```json
{
"urls": ["https://api.example.com/data"],
"http_config": {
"method": "GET",
"headers": {"Accept": "application/json"},
"timeout": 30
},
"crawler_config": {
"word_count_threshold": 10,
"extraction_strategy": "NoExtractionStrategy"
},
"dispatcher": "memory_adaptive"
}
```
**Response:**
```json
{
"success": true,
"results": [
{
"url": "https://api.example.com/data",
"html": "<html>...</html>",
"markdown": "# API Response\\n\\n...",
"success": true,
"status_code": 200,
"metadata": {
"title": "API Data",
"description": "JSON response data"
}
}
],
"server_processing_time_s": 0.85,
"server_memory_delta_mb": 2.1
}
```
**HTTP Config Options:**
- `method`: HTTP method ("GET", "POST", etc.) (default: "GET")
- `headers`: Custom HTTP headers
- `data`: Form data for POST requests
- `json`: JSON data for POST requests
- `follow_redirects`: Whether to follow redirects (default: true)
- `verify_ssl`: Whether to verify SSL certificates (default: true)
**Notes:**
- Thousands of times faster than browser-based crawling
- No JavaScript execution or browser rendering
- Ideal for APIs, static sites, and sitemaps
- For streaming results, use `/crawl/http/stream`
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
# Prepare hooks config if provided
hooks_config = None
if hasattr(crawl_request, 'hooks') and crawl_request.hooks:
hooks_config = {
"code": crawl_request.hooks.code,
"timeout": crawl_request.hooks.timeout,
}
# Get dispatcher from app state
dispatcher_type = crawl_request.dispatcher.value if crawl_request.dispatcher else app.state.default_dispatcher_type
dispatcher = app.state.dispatchers.get(dispatcher_type)
if not dispatcher:
raise HTTPException(
500,
f"Dispatcher '{dispatcher_type}' not available. Available dispatchers: {list(app.state.dispatchers.keys())}"
)
results = await handle_http_crawl_request(
urls=crawl_request.urls,
http_config=crawl_request.http_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config,
dispatcher=dispatcher,
)
return results
@app.post("/crawl/http/stream",
summary="Crawl URLs with HTTP-only strategy (streaming)",
description="Stream HTTP-only crawl progress in real-time using Server-Sent Events (SSE).",
response_description="Server-Sent Events stream with progress updates and results",
tags=["HTTP Crawling"]
)
@limiter.limit(config["rate_limiting"]["default_limit"])
async def crawl_http_stream(
request: Request,
crawl_request: HTTPCrawlRequestWithHooks,
_td: Dict = Depends(token_dep),
):
"""
Stream HTTP-only crawl progress in real-time.
This endpoint returns Server-Sent Events (SSE) stream with real-time updates
for fast HTTP-based crawling operations.
**Request Body:**
Same as `/crawl/http` endpoint.
**Response Stream:**
Server-Sent Events with the following event types:
```
data: {"type": "progress", "url": "https://api.example.com", "status": "started"}
data: {"type": "progress", "url": "https://api.example.com", "status": "fetching"}
data: {"type": "result", "url": "https://api.example.com", "data": {...}}
data: {"type": "complete", "success": true, "total_urls": 1}
```
**Benefits:**
- Real-time progress monitoring for HTTP crawls
- Immediate feedback on each URL
- Lightweight and fast streaming
- Can process results as they arrive
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
return await http_stream_process(crawl_request=crawl_request)
async def http_stream_process(crawl_request: HTTPCrawlRequestWithHooks):
# Prepare hooks config if provided
hooks_config = None
if hasattr(crawl_request, 'hooks') and crawl_request.hooks:
hooks_config = {
"code": crawl_request.hooks.code,
"timeout": crawl_request.hooks.timeout,
}
# Get dispatcher from app state
dispatcher_type = crawl_request.dispatcher.value if crawl_request.dispatcher else app.state.default_dispatcher_type
dispatcher = app.state.dispatchers.get(dispatcher_type)
if not dispatcher:
raise HTTPException(
500,
f"Dispatcher '{dispatcher_type}' not available. Available dispatchers: {list(app.state.dispatchers.keys())}"
)
crawler, gen, hooks_info = await handle_http_stream_crawl_request(
urls=crawl_request.urls,
http_config=crawl_request.http_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config,
dispatcher=dispatcher,
)
# Add hooks info to response headers if available
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Stream-Status": "active",
}
if hooks_info:
import json
headers["X-Hooks-Status"] = json.dumps(hooks_info["status"]["status"])
return StreamingResponse(
stream_http_results(gen),
media_type="application/x-ndjson",
headers=headers,
)
async def stream_http_results(results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]:
"""Stream HTTP results as NDJSON (dicts already)."""
import json
try:
async for result in results_gen:
try:
data = json.dumps(result) + "\n"
yield data.encode("utf-8")
except Exception as e:
error_response = {"error": str(e), "url": "unknown"}
yield (json.dumps(error_response) + "\n").encode("utf-8")
except asyncio.CancelledError:
pass
def chunk_code_functions(code_md: str) -> List[str]:
"""Extract each function/class from markdown code blocks per file."""
pattern = re.compile(
# match "## File: <path>" then a ```py fence, then capture until the closing ```
r"##\s*File:\s*(?P<path>.+?)\s*?\r?\n" # file header

View File

@@ -59,7 +59,7 @@ DISPATCHER_DEFAULTS = {
"check_interval": 1.0,
"max_session_permit": 20,
"fairness_timeout": 600.0,
"memory_wait_timeout": 600.0,
"memory_wait_timeout": None, # Disable memory timeout for testing
},
"semaphore": {
"semaphore_count": 5,