feat(docker): add JWT authentication and improve server architecture

Add JWT token-based authentication to Docker server and client.
Refactor server architecture for better code organization and error handling.
Move Dockerfile to root deploy directory and update configuration.
Add comprehensive documentation and examples.

BREAKING CHANGE: Docker server now requires authentication by default.
Endpoints require JWT tokens when security.jwt_enabled is true in config.
This commit is contained in:
UncleCode
2025-02-18 22:07:13 +08:00
parent 2864015469
commit 392c923980
16 changed files with 1294 additions and 1364 deletions

View File

@@ -2,6 +2,7 @@ from typing import List, Optional, Union, AsyncGenerator, Dict, Any
import httpx
import json
from urllib.parse import urljoin
import asyncio
from .async_configs import BrowserConfig, CrawlerRunConfig
from .models import CrawlResult
@@ -24,16 +25,7 @@ class RequestError(Crawl4aiClientError):
class Crawl4aiDockerClient:
"""
Client for interacting with Crawl4AI Docker server.
Args:
base_url (str): Base URL of the Crawl4AI Docker server
timeout (float): Default timeout for requests in seconds
verify_ssl (bool): Whether to verify SSL certificates
verbose (bool): Whether to show logging output
log_file (str, optional): Path to log file if file logging is desired
"""
"""Client for interacting with Crawl4AI Docker server with token authentication."""
def __init__(
self,
@@ -42,169 +34,137 @@ class Crawl4aiDockerClient:
verify_ssl: bool = True,
verbose: bool = True,
log_file: Optional[str] = None
) -> None:
):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.logger = AsyncLogger(log_file=log_file, log_level=LogLevel.DEBUG, verbose=verbose)
self._http_client = httpx.AsyncClient(
timeout=timeout,
verify=verify_ssl,
headers={"Content-Type": "application/json"}
)
self.logger = AsyncLogger(
log_file=log_file,
log_level=LogLevel.DEBUG,
verbose=verbose
)
self._token: Optional[str] = None
async def _check_server_connection(self) -> bool:
"""Check if server is reachable."""
async def authenticate(self, email: str) -> None:
"""Authenticate with the server and store the token."""
url = urljoin(self.base_url, "/token")
try:
self.logger.info("Checking server connection...", tag="INIT")
response = await self._http_client.get(f"{self.base_url}/health")
self.logger.info(f"Authenticating with email: {email}", tag="AUTH")
response = await self._http_client.post(url, json={"email": email})
response.raise_for_status()
self.logger.success(f"Connected to server at {self.base_url}", tag="READY")
return True
except Exception as e:
self.logger.error(f"Failed to connect to server: {str(e)}", tag="ERROR")
return False
data = response.json()
self._token = data["access_token"]
self._http_client.headers["Authorization"] = f"Bearer {self._token}"
self.logger.success("Authentication successful", tag="AUTH")
except (httpx.RequestError, httpx.HTTPStatusError) as e:
error_msg = f"Authentication failed: {str(e)}"
self.logger.error(error_msg, tag="ERROR")
raise ConnectionError(error_msg)
def _prepare_request_data(
self,
urls: List[str],
browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None
) -> Dict[str, Any]:
"""Prepare request data from configs using dump methods."""
self.logger.debug("Preparing request data", tag="INIT")
data = {
async def _check_server(self) -> None:
"""Check if server is reachable, raising an error if not."""
try:
await self._http_client.get(urljoin(self.base_url, "/health"))
self.logger.success(f"Connected to {self.base_url}", tag="READY")
except httpx.RequestError as e:
self.logger.error(f"Server unreachable: {str(e)}", tag="ERROR")
raise ConnectionError(f"Cannot connect to server: {str(e)}")
def _prepare_request(self, urls: List[str], browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None) -> Dict[str, Any]:
"""Prepare request data from configs."""
return {
"urls": urls,
"browser_config": browser_config.dump() if browser_config else {},
"crawler_config": crawler_config.dump() if crawler_config else {}
}
self.logger.debug(f"Request data prepared for {len(urls)} URLs", tag="READY")
return data
async def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Union[Dict, AsyncGenerator]:
"""Make HTTP request to the server with error handling."""
async def _request(self, method: str, endpoint: str, **kwargs) -> httpx.Response:
"""Make an HTTP request with error handling."""
url = urljoin(self.base_url, endpoint)
try:
self.logger.debug(f"Making {method} request to {endpoint}", tag="FETCH")
response = await self._http_client.request(method, url, **kwargs)
response.raise_for_status()
self.logger.success(f"Request to {endpoint} successful", tag="COMPLETE")
return response
except httpx.TimeoutException as e:
error_msg = f"Request timed out: {str(e)}"
self.logger.error(error_msg, tag="ERROR")
raise ConnectionError(error_msg)
raise ConnectionError(f"Request timed out: {str(e)}")
except httpx.RequestError as e:
error_msg = f"Failed to connect to server: {str(e)}"
self.logger.error(error_msg, tag="ERROR")
raise ConnectionError(error_msg)
raise ConnectionError(f"Failed to connect: {str(e)}")
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_data = e.response.json()
error_detail = error_data.get('detail', str(e))
except (json.JSONDecodeError, AttributeError) as json_err:
error_detail = f"{str(e)} (Failed to parse error response: {str(json_err)})"
error_msg = f"Server returned error {e.response.status_code}: {error_detail}"
self.logger.error(error_msg, tag="ERROR")
raise RequestError(error_msg)
error_msg = (e.response.json().get("detail", str(e))
if "application/json" in e.response.headers.get("content-type", "")
else str(e))
raise RequestError(f"Server error {e.response.status_code}: {error_msg}")
async def crawl(
self,
urls: List[str],
browser_config: Optional[BrowserConfig] = None,
crawler_config: Optional[CrawlerRunConfig] = None
) -> Union[CrawlResult, AsyncGenerator[CrawlResult, None]]:
"""Execute a crawl operation through the Docker server."""
# Check server connection first
if not await self._check_server_connection():
raise ConnectionError("Cannot proceed with crawl - server is not reachable")
request_data = self._prepare_request_data(urls, browser_config, crawler_config)
is_streaming = crawler_config.stream if crawler_config else False
) -> Union[CrawlResult, List[CrawlResult], AsyncGenerator[CrawlResult, None]]:
"""Execute a crawl operation."""
if not self._token:
raise Crawl4aiClientError("Authentication required. Call authenticate() first.")
await self._check_server()
self.logger.info(
f"Starting crawl for {len(urls)} URLs {'(streaming)' if is_streaming else ''}",
tag="INIT"
)
data = self._prepare_request(urls, browser_config, crawler_config)
is_streaming = crawler_config and crawler_config.stream
self.logger.info(f"Crawling {len(urls)} URLs {'(streaming)' if is_streaming else ''}", tag="CRAWL")
if is_streaming:
async def result_generator() -> AsyncGenerator[CrawlResult, None]:
try:
async with self._http_client.stream(
"POST",
f"{self.base_url}/crawl",
json=request_data,
timeout=None
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
try:
result_dict = json.loads(line)
if "error" in result_dict:
self.logger.error_status(
url=result_dict.get('url', 'unknown'),
error=result_dict['error']
)
continue
self.logger.url_status(
url=result_dict.get('url', 'unknown'),
success=True,
timing=result_dict.get('timing', 0.0)
)
yield CrawlResult(**result_dict)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse server response: {e}", tag="ERROR")
continue
except httpx.StreamError as e:
error_msg = f"Stream connection error: {str(e)}"
self.logger.error(error_msg, tag="ERROR")
raise ConnectionError(error_msg)
except Exception as e:
error_msg = f"Unexpected error during streaming: {str(e)}"
self.logger.error(error_msg, tag="ERROR")
raise Crawl4aiClientError(error_msg)
return result_generator()
async def stream_results() -> AsyncGenerator[CrawlResult, None]:
async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
result = json.loads(line)
if "error" in result:
self.logger.error_status(url=result.get("url", "unknown"), error=result["error"])
continue
self.logger.url_status(url=result.get("url", "unknown"), success=True, timing=result.get("timing", 0.0))
if result.get("status") == "completed":
continue
else:
yield CrawlResult(**result)
return stream_results()
response = await self._make_request("POST", "/crawl", json=request_data)
response_data = response.json()
response = await self._request("POST", "/crawl", json=data)
result_data = response.json()
if not result_data.get("success", False):
raise RequestError(f"Crawl failed: {result_data.get('msg', 'Unknown error')}")
if not response_data.get("success", False):
error_msg = f"Crawl operation failed: {response_data.get('error', 'Unknown error')}"
self.logger.error(error_msg, tag="ERROR")
raise RequestError(error_msg)
results = [CrawlResult(**result_dict) for result_dict in response_data.get("results", [])]
self.logger.success(f"Crawl completed successfully with {len(results)} results", tag="COMPLETE")
results = [CrawlResult(**r) for r in result_data.get("results", [])]
self.logger.success(f"Crawl completed with {len(results)} results", tag="CRAWL")
return results[0] if len(results) == 1 else results
async def get_schema(self) -> Dict[str, Any]:
"""Retrieve the configuration schemas from the server."""
self.logger.info("Retrieving schema from server", tag="FETCH")
response = await self._make_request("GET", "/schema")
self.logger.success("Schema retrieved successfully", tag="COMPLETE")
"""Retrieve configuration schemas."""
if not self._token:
raise Crawl4aiClientError("Authentication required. Call authenticate() first.")
response = await self._request("GET", "/schema")
return response.json()
async def close(self) -> None:
"""Close the HTTP client session."""
self.logger.info("Closing client connection", tag="COMPLETE")
self.logger.info("Closing client", tag="CLOSE")
await self._http_client.aclose()
async def __aenter__(self) -> "Crawl4aiDockerClient":
return self
async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None:
await self.close()
await self.close()
# Example usage
async def main():
async with Crawl4aiDockerClient(verbose=True) as client:
await client.authenticate("user@example.com")
result = await client.crawl(["https://example.com"])
print(result)
schema = await client.get_schema()
print(schema)
if __name__ == "__main__":
asyncio.run(main())