diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 10b122dd..c488843b 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -47,7 +47,7 @@ def to_serializable_dict(obj: Any) -> Dict: return obj.isoformat() # Handle lists, tuples, and sets, and basically any iterable - if isinstance(obj, (list, tuple, set)) or hasattr(obj, '__iter__'): + if isinstance(obj, (list, tuple, set)) or hasattr(obj, '__iter__') and not isinstance(obj, dict): return [to_serializable_dict(item) for item in obj] # Handle frozensets, which are not iterable diff --git a/crawl4ai/docker_client copy.py b/crawl4ai/docker_client copy.py new file mode 100644 index 00000000..7c0fce1c --- /dev/null +++ b/crawl4ai/docker_client copy.py @@ -0,0 +1,210 @@ +from typing import List, Optional, Union, AsyncGenerator, Dict, Any +import httpx +import json +from urllib.parse import urljoin + +from .async_configs import BrowserConfig, CrawlerRunConfig +from .models import CrawlResult +from .async_logger import AsyncLogger, LogLevel + + +class Crawl4aiClientError(Exception): + """Base exception for Crawl4ai Docker client errors.""" + pass + + +class ConnectionError(Crawl4aiClientError): + """Raised when connection to the Docker server fails.""" + pass + + +class RequestError(Crawl4aiClientError): + """Raised when the server returns an error response.""" + pass + + +class Crawl4aiDockerClient: + """ + Client for interacting with Crawl4AI Docker server. + + Args: + base_url (str): Base URL of the Crawl4AI Docker server + timeout (float): Default timeout for requests in seconds + verify_ssl (bool): Whether to verify SSL certificates + verbose (bool): Whether to show logging output + log_file (str, optional): Path to log file if file logging is desired + """ + + def __init__( + self, + base_url: str = "http://localhost:8000", + timeout: float = 30.0, + verify_ssl: bool = True, + verbose: bool = True, + log_file: Optional[str] = None + ) -> None: + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self._http_client = httpx.AsyncClient( + timeout=timeout, + verify=verify_ssl, + headers={"Content-Type": "application/json"} + ) + self.logger = AsyncLogger( + log_file=log_file, + log_level=LogLevel.DEBUG, + verbose=verbose + ) + + async def _check_server_connection(self) -> bool: + """Check if server is reachable.""" + try: + self.logger.info("Checking server connection...", tag="INIT") + response = await self._http_client.get(f"{self.base_url}/health") + response.raise_for_status() + self.logger.success(f"Connected to server at {self.base_url}", tag="READY") + return True + except Exception as e: + self.logger.error(f"Failed to connect to server: {str(e)}", tag="ERROR") + return False + + def _prepare_request_data( + self, + urls: List[str], + browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None + ) -> Dict[str, Any]: + """Prepare request data from configs using dump methods.""" + self.logger.debug("Preparing request data", tag="INIT") + data = { + "urls": urls, + "browser_config": browser_config.dump() if browser_config else {}, + "crawler_config": crawler_config.dump() if crawler_config else {} + } + self.logger.debug(f"Request data prepared for {len(urls)} URLs", tag="READY") + return data + + async def _make_request( + self, + method: str, + endpoint: str, + **kwargs + ) -> Union[Dict, AsyncGenerator]: + """Make HTTP request to the server with error handling.""" + url = urljoin(self.base_url, endpoint) + + try: + self.logger.debug(f"Making {method} request to {endpoint}", tag="FETCH") + response = await self._http_client.request(method, url, **kwargs) + response.raise_for_status() + self.logger.success(f"Request to {endpoint} successful", tag="COMPLETE") + return response + except httpx.TimeoutException as e: + error_msg = f"Request timed out: {str(e)}" + self.logger.error(error_msg, tag="ERROR") + raise ConnectionError(error_msg) + except httpx.RequestError as e: + error_msg = f"Failed to connect to server: {str(e)}" + self.logger.error(error_msg, tag="ERROR") + raise ConnectionError(error_msg) + except httpx.HTTPStatusError as e: + error_detail = "" + try: + error_data = e.response.json() + error_detail = error_data.get('detail', str(e)) + except (json.JSONDecodeError, AttributeError) as json_err: + error_detail = f"{str(e)} (Failed to parse error response: {str(json_err)})" + + error_msg = f"Server returned error {e.response.status_code}: {error_detail}" + self.logger.error(error_msg, tag="ERROR") + raise RequestError(error_msg) + + async def crawl( + self, + urls: List[str], + browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None + ) -> Union[CrawlResult, AsyncGenerator[CrawlResult, None]]: + """Execute a crawl operation through the Docker server.""" + # Check server connection first + if not await self._check_server_connection(): + raise ConnectionError("Cannot proceed with crawl - server is not reachable") + + request_data = self._prepare_request_data(urls, browser_config, crawler_config) + is_streaming = crawler_config.stream if crawler_config else False + + self.logger.info( + f"Starting crawl for {len(urls)} URLs {'(streaming)' if is_streaming else ''}", + tag="INIT" + ) + + if is_streaming: + async def result_generator() -> AsyncGenerator[CrawlResult, None]: + try: + async with self._http_client.stream( + "POST", + f"{self.base_url}/crawl", + json=request_data, + timeout=None + ) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + if line.strip(): + try: + result_dict = json.loads(line) + if "error" in result_dict: + self.logger.error_status( + url=result_dict.get('url', 'unknown'), + error=result_dict['error'] + ) + continue + + self.logger.url_status( + url=result_dict.get('url', 'unknown'), + success=True, + timing=result_dict.get('timing', 0.0) + ) + yield CrawlResult(**result_dict) + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse server response: {e}", tag="ERROR") + continue + except httpx.StreamError as e: + error_msg = f"Stream connection error: {str(e)}" + self.logger.error(error_msg, tag="ERROR") + raise ConnectionError(error_msg) + except Exception as e: + error_msg = f"Unexpected error during streaming: {str(e)}" + self.logger.error(error_msg, tag="ERROR") + raise Crawl4aiClientError(error_msg) + + return result_generator() + + response = await self._make_request("POST", "/crawl", json=request_data) + response_data = response.json() + + if not response_data.get("success", False): + error_msg = f"Crawl operation failed: {response_data.get('error', 'Unknown error')}" + self.logger.error(error_msg, tag="ERROR") + raise RequestError(error_msg) + + results = [CrawlResult(**result_dict) for result_dict in response_data.get("results", [])] + self.logger.success(f"Crawl completed successfully with {len(results)} results", tag="COMPLETE") + return results[0] if len(results) == 1 else results + + async def get_schema(self) -> Dict[str, Any]: + """Retrieve the configuration schemas from the server.""" + self.logger.info("Retrieving schema from server", tag="FETCH") + response = await self._make_request("GET", "/schema") + self.logger.success("Schema retrieved successfully", tag="COMPLETE") + return response.json() + + async def close(self) -> None: + """Close the HTTP client session.""" + self.logger.info("Closing client connection", tag="COMPLETE") + await self._http_client.aclose() + + async def __aenter__(self) -> "Crawl4aiDockerClient": + return self + + async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: + await self.close() \ No newline at end of file diff --git a/crawl4ai/docker_client.py b/crawl4ai/docker_client.py index 7c0fce1c..f4816eb5 100644 --- a/crawl4ai/docker_client.py +++ b/crawl4ai/docker_client.py @@ -2,6 +2,7 @@ from typing import List, Optional, Union, AsyncGenerator, Dict, Any import httpx import json from urllib.parse import urljoin +import asyncio from .async_configs import BrowserConfig, CrawlerRunConfig from .models import CrawlResult @@ -24,16 +25,7 @@ class RequestError(Crawl4aiClientError): class Crawl4aiDockerClient: - """ - Client for interacting with Crawl4AI Docker server. - - Args: - base_url (str): Base URL of the Crawl4AI Docker server - timeout (float): Default timeout for requests in seconds - verify_ssl (bool): Whether to verify SSL certificates - verbose (bool): Whether to show logging output - log_file (str, optional): Path to log file if file logging is desired - """ + """Client for interacting with Crawl4AI Docker server with token authentication.""" def __init__( self, @@ -42,169 +34,137 @@ class Crawl4aiDockerClient: verify_ssl: bool = True, verbose: bool = True, log_file: Optional[str] = None - ) -> None: + ): self.base_url = base_url.rstrip('/') self.timeout = timeout + self.logger = AsyncLogger(log_file=log_file, log_level=LogLevel.DEBUG, verbose=verbose) self._http_client = httpx.AsyncClient( timeout=timeout, verify=verify_ssl, headers={"Content-Type": "application/json"} ) - self.logger = AsyncLogger( - log_file=log_file, - log_level=LogLevel.DEBUG, - verbose=verbose - ) + self._token: Optional[str] = None - async def _check_server_connection(self) -> bool: - """Check if server is reachable.""" + async def authenticate(self, email: str) -> None: + """Authenticate with the server and store the token.""" + url = urljoin(self.base_url, "/token") try: - self.logger.info("Checking server connection...", tag="INIT") - response = await self._http_client.get(f"{self.base_url}/health") + self.logger.info(f"Authenticating with email: {email}", tag="AUTH") + response = await self._http_client.post(url, json={"email": email}) response.raise_for_status() - self.logger.success(f"Connected to server at {self.base_url}", tag="READY") - return True - except Exception as e: - self.logger.error(f"Failed to connect to server: {str(e)}", tag="ERROR") - return False + data = response.json() + self._token = data["access_token"] + self._http_client.headers["Authorization"] = f"Bearer {self._token}" + self.logger.success("Authentication successful", tag="AUTH") + except (httpx.RequestError, httpx.HTTPStatusError) as e: + error_msg = f"Authentication failed: {str(e)}" + self.logger.error(error_msg, tag="ERROR") + raise ConnectionError(error_msg) - def _prepare_request_data( - self, - urls: List[str], - browser_config: Optional[BrowserConfig] = None, - crawler_config: Optional[CrawlerRunConfig] = None - ) -> Dict[str, Any]: - """Prepare request data from configs using dump methods.""" - self.logger.debug("Preparing request data", tag="INIT") - data = { + async def _check_server(self) -> None: + """Check if server is reachable, raising an error if not.""" + try: + await self._http_client.get(urljoin(self.base_url, "/health")) + self.logger.success(f"Connected to {self.base_url}", tag="READY") + except httpx.RequestError as e: + self.logger.error(f"Server unreachable: {str(e)}", tag="ERROR") + raise ConnectionError(f"Cannot connect to server: {str(e)}") + + def _prepare_request(self, urls: List[str], browser_config: Optional[BrowserConfig] = None, + crawler_config: Optional[CrawlerRunConfig] = None) -> Dict[str, Any]: + """Prepare request data from configs.""" + return { "urls": urls, "browser_config": browser_config.dump() if browser_config else {}, "crawler_config": crawler_config.dump() if crawler_config else {} } - self.logger.debug(f"Request data prepared for {len(urls)} URLs", tag="READY") - return data - async def _make_request( - self, - method: str, - endpoint: str, - **kwargs - ) -> Union[Dict, AsyncGenerator]: - """Make HTTP request to the server with error handling.""" + async def _request(self, method: str, endpoint: str, **kwargs) -> httpx.Response: + """Make an HTTP request with error handling.""" url = urljoin(self.base_url, endpoint) - try: - self.logger.debug(f"Making {method} request to {endpoint}", tag="FETCH") response = await self._http_client.request(method, url, **kwargs) response.raise_for_status() - self.logger.success(f"Request to {endpoint} successful", tag="COMPLETE") return response except httpx.TimeoutException as e: - error_msg = f"Request timed out: {str(e)}" - self.logger.error(error_msg, tag="ERROR") - raise ConnectionError(error_msg) + raise ConnectionError(f"Request timed out: {str(e)}") except httpx.RequestError as e: - error_msg = f"Failed to connect to server: {str(e)}" - self.logger.error(error_msg, tag="ERROR") - raise ConnectionError(error_msg) + raise ConnectionError(f"Failed to connect: {str(e)}") except httpx.HTTPStatusError as e: - error_detail = "" - try: - error_data = e.response.json() - error_detail = error_data.get('detail', str(e)) - except (json.JSONDecodeError, AttributeError) as json_err: - error_detail = f"{str(e)} (Failed to parse error response: {str(json_err)})" - - error_msg = f"Server returned error {e.response.status_code}: {error_detail}" - self.logger.error(error_msg, tag="ERROR") - raise RequestError(error_msg) + error_msg = (e.response.json().get("detail", str(e)) + if "application/json" in e.response.headers.get("content-type", "") + else str(e)) + raise RequestError(f"Server error {e.response.status_code}: {error_msg}") async def crawl( self, urls: List[str], browser_config: Optional[BrowserConfig] = None, crawler_config: Optional[CrawlerRunConfig] = None - ) -> Union[CrawlResult, AsyncGenerator[CrawlResult, None]]: - """Execute a crawl operation through the Docker server.""" - # Check server connection first - if not await self._check_server_connection(): - raise ConnectionError("Cannot proceed with crawl - server is not reachable") - - request_data = self._prepare_request_data(urls, browser_config, crawler_config) - is_streaming = crawler_config.stream if crawler_config else False + ) -> Union[CrawlResult, List[CrawlResult], AsyncGenerator[CrawlResult, None]]: + """Execute a crawl operation.""" + if not self._token: + raise Crawl4aiClientError("Authentication required. Call authenticate() first.") + await self._check_server() - self.logger.info( - f"Starting crawl for {len(urls)} URLs {'(streaming)' if is_streaming else ''}", - tag="INIT" - ) + data = self._prepare_request(urls, browser_config, crawler_config) + is_streaming = crawler_config and crawler_config.stream + + self.logger.info(f"Crawling {len(urls)} URLs {'(streaming)' if is_streaming else ''}", tag="CRAWL") if is_streaming: - async def result_generator() -> AsyncGenerator[CrawlResult, None]: - try: - async with self._http_client.stream( - "POST", - f"{self.base_url}/crawl", - json=request_data, - timeout=None - ) as response: - response.raise_for_status() - async for line in response.aiter_lines(): - if line.strip(): - try: - result_dict = json.loads(line) - if "error" in result_dict: - self.logger.error_status( - url=result_dict.get('url', 'unknown'), - error=result_dict['error'] - ) - continue - - self.logger.url_status( - url=result_dict.get('url', 'unknown'), - success=True, - timing=result_dict.get('timing', 0.0) - ) - yield CrawlResult(**result_dict) - except json.JSONDecodeError as e: - self.logger.error(f"Failed to parse server response: {e}", tag="ERROR") - continue - except httpx.StreamError as e: - error_msg = f"Stream connection error: {str(e)}" - self.logger.error(error_msg, tag="ERROR") - raise ConnectionError(error_msg) - except Exception as e: - error_msg = f"Unexpected error during streaming: {str(e)}" - self.logger.error(error_msg, tag="ERROR") - raise Crawl4aiClientError(error_msg) - - return result_generator() + async def stream_results() -> AsyncGenerator[CrawlResult, None]: + async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + if line.strip(): + result = json.loads(line) + if "error" in result: + self.logger.error_status(url=result.get("url", "unknown"), error=result["error"]) + continue + self.logger.url_status(url=result.get("url", "unknown"), success=True, timing=result.get("timing", 0.0)) + if result.get("status") == "completed": + continue + else: + yield CrawlResult(**result) + return stream_results() - response = await self._make_request("POST", "/crawl", json=request_data) - response_data = response.json() + response = await self._request("POST", "/crawl", json=data) + result_data = response.json() + if not result_data.get("success", False): + raise RequestError(f"Crawl failed: {result_data.get('msg', 'Unknown error')}") - if not response_data.get("success", False): - error_msg = f"Crawl operation failed: {response_data.get('error', 'Unknown error')}" - self.logger.error(error_msg, tag="ERROR") - raise RequestError(error_msg) - - results = [CrawlResult(**result_dict) for result_dict in response_data.get("results", [])] - self.logger.success(f"Crawl completed successfully with {len(results)} results", tag="COMPLETE") + results = [CrawlResult(**r) for r in result_data.get("results", [])] + self.logger.success(f"Crawl completed with {len(results)} results", tag="CRAWL") return results[0] if len(results) == 1 else results async def get_schema(self) -> Dict[str, Any]: - """Retrieve the configuration schemas from the server.""" - self.logger.info("Retrieving schema from server", tag="FETCH") - response = await self._make_request("GET", "/schema") - self.logger.success("Schema retrieved successfully", tag="COMPLETE") + """Retrieve configuration schemas.""" + if not self._token: + raise Crawl4aiClientError("Authentication required. Call authenticate() first.") + response = await self._request("GET", "/schema") return response.json() async def close(self) -> None: """Close the HTTP client session.""" - self.logger.info("Closing client connection", tag="COMPLETE") + self.logger.info("Closing client", tag="CLOSE") await self._http_client.aclose() async def __aenter__(self) -> "Crawl4aiDockerClient": return self async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: - await self.close() \ No newline at end of file + await self.close() + + +# Example usage +async def main(): + async with Crawl4aiDockerClient(verbose=True) as client: + await client.authenticate("user@example.com") + result = await client.crawl(["https://example.com"]) + print(result) + schema = await client.get_schema() + print(schema) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/deploy/docker/Dockerfile b/deploy/Dockerfile similarity index 66% rename from deploy/docker/Dockerfile rename to deploy/Dockerfile index 310c43fe..3043bd57 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/Dockerfile @@ -4,7 +4,8 @@ FROM python:3.10-slim ARG APP_HOME=/app ARG GITHUB_REPO=https://github.com/unclecode/crawl4ai.git ARG GITHUB_BRANCH=next -ARG USE_LOCAL=true +ARG USE_LOCAL=False +ARG CONFIG_PATH="" ENV PYTHONFAULTHANDLER=1 \ PYTHONHASHSEED=random \ @@ -88,58 +89,37 @@ fi WORKDIR ${APP_HOME} -RUN echo '#!/bin/bash\n\ -if [ "$USE_LOCAL" = "true" ]; then\n\ - echo "📦 Installing from local source..."\n\ - pip install --no-cache-dir /tmp/project/\n\ -else\n\ - echo "🌐 Installing from GitHub..."\n\ - for i in {1..3}; do \n\ - git clone --branch ${GITHUB_BRANCH} ${GITHUB_REPO} /tmp/crawl4ai && break || \n\ - { echo "Attempt $i/3 failed! Taking a short break... ☕"; sleep 5; }; \n\ - done\n\ - pip install --no-cache-dir /tmp/crawl4ai\n\ -fi' > /tmp/install.sh && chmod +x /tmp/install.sh +RUN git clone --branch ${GITHUB_BRANCH} ${GITHUB_REPO} /tmp/crawl4ai -COPY . /tmp/project/ +COPY docker/supervisord.conf . +COPY docker/requirements.txt . -COPY deploy/docker/supervisord.conf . - -COPY deploy/docker/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt RUN if [ "$INSTALL_TYPE" = "all" ] ; then \ - pip install --no-cache-dir \ - torch \ - torchvision \ - torchaudio \ - scikit-learn \ - nltk \ - transformers \ - tokenizers && \ - python -m nltk.downloader punkt stopwords ; \ - fi - -RUN if [ "$INSTALL_TYPE" = "all" ] ; then \ - pip install "/tmp/project/[all]" && \ + pip install "/tmp/crawl4ai/[all]" && \ + python -m nltk.downloader punkt stopwords && \ python -m crawl4ai.model_loader ; \ elif [ "$INSTALL_TYPE" = "torch" ] ; then \ - pip install "/tmp/project/[torch]" ; \ + pip install "/tmp/crawl4ai/[torch]" ; \ elif [ "$INSTALL_TYPE" = "transformer" ] ; then \ - pip install "/tmp/project/[transformer]" && \ + pip install "/tmp/crawl4ai/[transformer]" && \ python -m crawl4ai.model_loader ; \ else \ - pip install "/tmp/project" ; \ + pip install "/tmp/crawl4ai" ; \ fi RUN pip install --no-cache-dir --upgrade pip && \ - /tmp/install.sh && \ python -c "import crawl4ai; print('✅ crawl4ai is ready to rock!')" && \ python -c "from playwright.sync_api import sync_playwright; print('✅ Playwright is feeling dramatic!')" RUN playwright install --with-deps chromium -COPY deploy/docker/* ${APP_HOME}/ +COPY docker/* ${APP_HOME}/ +RUN if [ -n "$CONFIG_PATH" ] && [ -f "$CONFIG_PATH" ]; then \ + echo "Using custom config from $CONFIG_PATH" && \ + cp $CONFIG_PATH /app/config.yml; \ +fi HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD bash -c '\ @@ -151,24 +131,7 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ redis-cli ping > /dev/null && \ curl -f http://localhost:8000/health || exit 1' -# COPY deploy/docker/docker-entrypoint.sh /usr/local/bin/ -# RUN chmod +x /usr/local/bin/docker-entrypoint.sh - -EXPOSE 6379 - -# ENTRYPOINT ["docker-entrypoint.sh"] - -# CMD service redis-server start && gunicorn \ -# --bind 0.0.0.0:8000 \ -# --workers 4 \ -# --threads 2 \ -# --timeout 120 \ -# --graceful-timeout 30 \ -# --log-level info \ -# --worker-class uvicorn.workers.UvicornWorker \ -# server:app - -# ENTRYPOINT ["docker-entrypoint.sh"] +# EXPOSE 6379 CMD ["supervisord", "-c", "supervisord.conf"] diff --git a/deploy/docker/README.md b/deploy/docker/README.md index 46dd803a..f62e58c4 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -38,14 +38,13 @@ First, clone the repository and build the Docker image: ```bash # Clone the repository git clone https://github.com/unclecode/crawl4ai.git -cd crawl4ai +cd crawl4ai/deploy # Build the Docker image -docker build -t crawl4ai-server:prod \ - --build-arg PYTHON_VERSION=3.10 \ - --build-arg INSTALL_TYPE=all \ - --build-arg ENABLE_GPU=false \ - deploy/docker/ +docker build --platform=linux/amd64 --no-cache -t crawl4ai . + +# Or build for arm64 +docker build --platform=linux/arm64 --no-cache -t crawl4ai . ``` #### 2. Environment Setup @@ -73,7 +72,7 @@ You have several options for running the container: Basic run (no LLM support): ```bash -docker run -d -p 8000:8000 --name crawl4ai crawl4ai-server:prod +docker run -d -p 8000:8000 --name crawl4ai crawl4ai ``` With LLM support: @@ -81,51 +80,16 @@ With LLM support: docker run -d -p 8000:8000 \ --env-file .llm.env \ --name crawl4ai \ - crawl4ai-server:prod + crawl4ai ``` Using host environment variables (Not a good practice, but works for local testing): ```bash docker run -d -p 8000:8000 \ --env-file .llm.env \ - --env-from "$(env)" \ + --env "$(env)" \ --name crawl4ai \ - crawl4ai-server:prod -``` - -### More on Building - -You have several options for building the Docker image based on your needs: - -#### Basic Build -```bash -# Clone the repository -git clone https://github.com/unclecode/crawl4ai.git -cd crawl4ai - -# Simple build with defaults -docker build -t crawl4ai-server:prod deploy/docker/ -``` - -#### Advanced Build Options -```bash -# Build with custom parameters -docker build -t crawl4ai-server:prod \ - --build-arg PYTHON_VERSION=3.10 \ - --build-arg INSTALL_TYPE=all \ - --build-arg ENABLE_GPU=false \ - deploy/docker/ -``` - -#### Platform-Specific Builds -The Dockerfile includes optimizations for different architectures (ARM64 and AMD64). Docker automatically detects your platform, but you can specify it explicitly: - -```bash -# Build for ARM64 -docker build --platform linux/arm64 -t crawl4ai-server:arm64 deploy/docker/ - -# Build for AMD64 -docker build --platform linux/amd64 -t crawl4ai-server:amd64 deploy/docker/ + crawl4ai ``` #### Multi-Platform Build @@ -138,9 +102,9 @@ docker buildx create --use # Build for multiple platforms docker buildx build \ --platform linux/amd64,linux/arm64 \ - -t yourusername/crawl4ai-server:multi \ + -t crawl4ai \ --push \ - deploy/docker/ + . ``` > 💡 **Note**: Multi-platform builds require Docker Buildx and need to be pushed to a registry. @@ -149,18 +113,18 @@ docker buildx build \ For development, you might want to enable all features: ```bash -docker build -t crawl4ai-server:dev \ +docker build -t crawl4ai --build-arg INSTALL_TYPE=all \ --build-arg PYTHON_VERSION=3.10 \ --build-arg ENABLE_GPU=true \ - deploy/docker/ + . ``` #### GPU-Enabled Build If you plan to use GPU acceleration: ```bash -docker build -t crawl4ai-server:gpu \ +docker build -t crawl4ai --build-arg ENABLE_GPU=true \ deploy/docker/ ``` @@ -185,31 +149,14 @@ docker build -t crawl4ai-server:gpu \ - Use --platform for specific architecture requirements - Consider buildx for multi-architecture distribution -3. **Development vs Production** - - Use `INSTALL_TYPE=all` for development - - Stick to `default` for production if you don't need extra features - - Enable GPU only if you have compatible hardware - -4. **Performance Optimization** +3. **Performance Optimization** - The image automatically includes platform-specific optimizations - AMD64 gets OpenMP optimizations - ARM64 gets OpenBLAS optimizations ### Docker Hub -> 🚧 Coming soon! The image will be available at `crawl4ai/server`. Stay tuned! - -## Dockerfile Parameters - -Configure your build with these parameters: - -| Parameter | Description | Default | Options | -|-----------|-------------|---------|----------| -| PYTHON_VERSION | Python version to use | 3.10 | 3.8, 3.9, 3.10 | -| INSTALL_TYPE | Installation profile | default | default, all, torch, transformer | -| ENABLE_GPU | Enable GPU support | false | true, false | -| APP_HOME | Application directory | /app | any valid path | -| TARGETARCH | Target architecture | auto-detected | amd64, arm64 | +> 🚧 Coming soon! The image will be available at `crawl4ai`. Stay tuned! ## Using the API @@ -223,14 +170,34 @@ The SDK makes things easier! Here's how to use it: from crawl4ai.docker_client import Crawl4aiDockerClient from crawl4ai import BrowserConfig, CrawlerRunConfig -async with Crawl4aiDockerClient() as client: - # The SDK handles serialization for you! - result = await client.crawl( - urls=["https://example.com"], - browser_config=BrowserConfig(headless=True), - crawler_config=CrawlerRunConfig(stream=False) - ) - print(result.markdown) +async def main(): + async with Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=True) as client: + # If JWT is enabled, you can authenticate like this: (more on this later) + # await client.authenticate("test@example.com") + + # Non-streaming crawl + results = await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=CrawlerRunConfig() + ) + print(f"Non-streaming results: {results}") + + # Streaming crawl + crawler_config = CrawlerRunConfig(stream=True) + async for result in await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=crawler_config + ): + print(f"Streamed result: {result}") + + # Get schema + schema = await client.get_schema() + print(f"Schema: {schema}") + +if __name__ == "__main__": + asyncio.run(main()) ``` `Crawl4aiDockerClient` is an async context manager that handles the connection for you. You can pass in optional parameters for more control: @@ -243,76 +210,49 @@ async with Crawl4aiDockerClient() as client: This client SDK generates a properly structured JSON request for the server's HTTP API. -### Second Approach: Direct API Calls +## Second Approach: Direct API Calls This is super important! The API expects a specific structure that matches our Python classes. Let me show you how it works. -#### The Magic of Type Matching +### Understanding Configuration Structure -When you send a request, each configuration object needs a "type" field that matches the exact class name from the library. Here's an example: +Let's dive deep into how configurations work in Crawl4AI. Every configuration object follows a consistent pattern of `type` and `params`. This structure enables complex, nested configurations while maintaining clarity. +#### The Basic Pattern + +Try this in Python to understand the structure: ```python -# First, let's create objects the normal way -from crawl4ai import BrowserConfig, CrawlerRunConfig, PruningContentFilter +from crawl4ai import BrowserConfig -# Create some config objects -browser_config = BrowserConfig(headless=True, viewport={"width": 1200, "height": 800}) -content_filter = PruningContentFilter(threshold=0.48, threshold_type="fixed") - -# Use dump() to see the serialized format -print(browser_config.dump()) +# Create a config and see its structure +config = BrowserConfig(headless=True) +print(config.dump()) ``` -This will output something like: +This outputs: ```json { "type": "BrowserConfig", "params": { - "headless": true, - "viewport": { - "width": 1200, - "height": 800 - } + "headless": true } } ``` +#### Simple vs Complex Values -#### Structuring Your Requests - -1. Basic Request Structure -Every request must include URLs and may include configuration objects: +The structure follows these rules: +- Simple values (strings, numbers, booleans, lists) are passed directly +- Complex values (classes, dictionaries) use the type-params pattern +For example, with dictionaries: ```json { - "urls": ["https://example.com"], - "browser_config": {...}, - "crawler_config": {...} -} -``` - -2. Understanding Type-Params Pattern -All complex objects follow this pattern: -```json -{ - "type": "ClassName", - "params": { - "param1": value1, - "param2": value2 - } -} -``` -> 💡 **Note**: Simple types (strings, numbers, booleans) are passed directly without the type-params wrapper. - -3. Browser Configuration -```json -{ - "urls": ["https://example.com"], "browser_config": { "type": "BrowserConfig", "params": { - "headless": true, - "viewport": { + "headless": true, // Simple boolean - direct value + "viewport": { // Complex dictionary - needs type-params "type": "dict", "value": { "width": 1200, @@ -324,22 +264,104 @@ All complex objects follow this pattern: } ``` -4. Simple Crawler Configuration +#### Strategy Pattern and Nesting + +Strategies (like chunking or content filtering) demonstrate why we need this structure. Consider this chunking configuration: + ```json { - "urls": ["https://example.com"], "crawler_config": { "type": "CrawlerRunConfig", "params": { - "word_count_threshold": 200, - "stream": true, - "verbose": true + "chunking_strategy": { + "type": "RegexChunking", // Strategy implementation + "params": { + "patterns": ["\n\n", "\\.\\s+"] + } + } } } } ``` -5. Advanced Crawler Configuration +Here, `chunking_strategy` accepts any chunking implementation. The `type` field tells the system which strategy to use, and `params` configures that specific strategy. + +#### Complex Nested Example + +Let's look at a more complex example with content filtering: + +```json +{ + "crawler_config": { + "type": "CrawlerRunConfig", + "params": { + "markdown_generator": { + "type": "DefaultMarkdownGenerator", + "params": { + "content_filter": { + "type": "PruningContentFilter", + "params": { + "threshold": 0.48, + "threshold_type": "fixed" + } + } + } + } + } + } +} +``` + +This shows how deeply configurations can nest while maintaining a consistent structure. + +#### Quick Grammar Overview +``` +config := { + "type": string, + "params": { + key: simple_value | complex_value + } +} + +simple_value := string | number | boolean | [simple_value] +complex_value := config | dict_value + +dict_value := { + "type": "dict", + "value": object +} +``` + +#### Important Rules 🚨 + +- Always use the type-params pattern for class instances +- Use direct values for primitives (numbers, strings, booleans) +- Wrap dictionaries with {"type": "dict", "value": {...}} +- Arrays/lists are passed directly without type-params +- All parameters are optional unless specifically required + +#### Pro Tip 💡 + +The easiest way to get the correct structure is to: +1. Create configuration objects in Python +2. Use the `dump()` method to see their JSON representation +3. Use that JSON in your API calls + +Example: +```python +from crawl4ai import CrawlerRunConfig, PruningContentFilter + +config = CrawlerRunConfig( + content_filter=PruningContentFilter(threshold=0.48) +) +print(config.dump()) # Use this JSON in your API calls +``` + + +#### More Examples + +**Advanced Crawler Configuration** + ```json { "urls": ["https://example.com"], @@ -365,26 +387,8 @@ All complex objects follow this pattern: } ``` -6. Adding Strategies - -**Chunking Strategy**: -```json -{ - "crawler_config": { - "type": "CrawlerRunConfig", - "params": { - "chunking_strategy": { - "type": "RegexChunking", - "params": { - "patterns": ["\n\n", "\\.\\s+"] - } - } - } - } -} -``` - **Extraction Strategy**: + ```json { "crawler_config": { @@ -408,6 +412,7 @@ All complex objects follow this pattern: ``` **LLM Extraction Strategy** + ```json { "crawler_config": { @@ -453,7 +458,8 @@ All complex objects follow this pattern: } ``` -**Deep Crawler Exampler** +**Deep Crawler Example** + ```json { "crawler_config": { @@ -526,15 +532,6 @@ All complex objects follow this pattern: } ``` -**Important Rules**: - -- Always use the type-params pattern for class instances -- Use direct values for primitives (numbers, strings, booleans) -- Wrap dictionaries with {"type": "dict", "value": {...}} -- Arrays/lists are passed directly without type-params -- All parameters are optional unless specifically required - - ### REST API Examples Let's look at some practical examples: @@ -544,39 +541,51 @@ Let's look at some practical examples: ```python import requests +crawl_payload = { + "urls": ["https://example.com"], + "browser_config": {"headless": True}, + "crawler_config": {"stream": False} +} response = requests.post( "http://localhost:8000/crawl", - json={ - "urls": ["https://example.com"], - "browser_config": { - "type": "BrowserConfig", - "params": {"headless": True} - } - } + # headers={"Authorization": f"Bearer {token}"}, # If JWT is enabled, more on this later + json=crawl_payload ) -print(response.json()) +print(response.json()) # Print the response for debugging ``` #### Streaming Results ```python -import requests +async def test_stream_crawl(session, token: str): + """Test the /crawl/stream endpoint with multiple URLs.""" + url = "http://localhost:8000/crawl/stream" + payload = { + "urls": [ + "https://example.com", + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3", + ], + "browser_config": {"headless": True, "viewport": {"width": 1200}}, + "crawler_config": {"stream": True, "cache_mode": "aggressive"} + } -response = requests.post( - "http://localhost:8000/crawl", - json={ - "urls": ["https://example.com"], - "crawler_config": { - "type": "CrawlerRunConfig", - "params": {"stream": True} - } - }, - stream=True -) - -for line in response.iter_lines(): - if line: - print(line.decode()) + # headers = {"Authorization": f"Bearer {token}"} # If JWT is enabled, more on this later + + try: + async with session.post(url, json=payload, headers=headers) as response: + status = response.status + print(f"Status: {status} (Expected: 200)") + assert status == 200, f"Expected 200, got {status}" + + # Read streaming response line-by-line (NDJSON) + async for line in response.content: + if line: + data = json.loads(line.decode('utf-8').strip()) + print(f"Streamed Result: {json.dumps(data, indent=2)}") + except Exception as e: + print(f"Error in streaming crawl test: {str(e)}") ``` ## Metrics & Monitoring @@ -602,184 +611,9 @@ curl http://localhost:8000/health ## Complete Examples -Check out the `examples` folder in our repository for full working examples! Here's one to get you started: - -```python -import requests -import time -import httpx -import asyncio -from typing import Dict, Any -from crawl4ai import ( - BrowserConfig, CrawlerRunConfig, DefaultMarkdownGenerator, - PruningContentFilter, JsonCssExtractionStrategy, LLMContentFilter, CacheMode -) -from crawl4ai.docker_client import Crawl4aiDockerClient - -class Crawl4AiTester: - def __init__(self, base_url: str = "http://localhost:11235"): - self.base_url = base_url - - def submit_and_wait( - self, request_data: Dict[str, Any], timeout: int = 300 - ) -> Dict[str, Any]: - # Submit crawl job - response = requests.post(f"{self.base_url}/crawl", json=request_data) - task_id = response.json()["task_id"] - print(f"Task ID: {task_id}") - - # Poll for result - start_time = time.time() - while True: - if time.time() - start_time > timeout: - raise TimeoutError( - f"Task {task_id} did not complete within {timeout} seconds" - ) - - result = requests.get(f"{self.base_url}/task/{task_id}") - status = result.json() - - if status["status"] == "failed": - print("Task failed:", status.get("error")) - raise Exception(f"Task failed: {status.get('error')}") - - if status["status"] == "completed": - return status - - time.sleep(2) - -async def test_direct_api(): - """Test direct API endpoints without using the client SDK""" - print("\n=== Testing Direct API Calls ===") - - # Test 1: Basic crawl with content filtering - browser_config = BrowserConfig( - headless=True, - viewport_width=1200, - viewport_height=800 - ) - - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter( - threshold=0.48, - threshold_type="fixed", - min_word_threshold=0 - ), - options={"ignore_links": True} - ) - ) - - request_data = { - "urls": ["https://example.com"], - "browser_config": browser_config.dump(), - "crawler_config": crawler_config.dump() - } - - # Make direct API call - async with httpx.AsyncClient() as client: - response = await client.post( - "http://localhost:8000/crawl", - json=request_data, - timeout=300 - ) - assert response.status_code == 200 - result = response.json() - print("Basic crawl result:", result["success"]) - - # Test 2: Structured extraction with JSON CSS - schema = { - "baseSelector": "article.post", - "fields": [ - {"name": "title", "selector": "h1", "type": "text"}, - {"name": "content", "selector": ".content", "type": "html"} - ] - } - - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - extraction_strategy=JsonCssExtractionStrategy(schema=schema) - ) - - request_data["crawler_config"] = crawler_config.dump() - - async with httpx.AsyncClient() as client: - response = await client.post( - "http://localhost:8000/crawl", - json=request_data - ) - assert response.status_code == 200 - result = response.json() - print("Structured extraction result:", result["success"]) - - # Test 3: Get schema - # async with httpx.AsyncClient() as client: - # response = await client.get("http://localhost:8000/schema") - # assert response.status_code == 200 - # schemas = response.json() - # print("Retrieved schemas for:", list(schemas.keys())) - -async def test_with_client(): - """Test using the Crawl4AI Docker client SDK""" - print("\n=== Testing Client SDK ===") - - async with Crawl4aiDockerClient(verbose=True) as client: - # Test 1: Basic crawl - browser_config = BrowserConfig(headless=True) - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter( - threshold=0.48, - threshold_type="fixed" - ) - ) - ) - - result = await client.crawl( - urls=["https://example.com"], - browser_config=browser_config, - crawler_config=crawler_config - ) - print("Client SDK basic crawl:", result.success) - - # Test 2: LLM extraction with streaming - crawler_config = CrawlerRunConfig( - cache_mode=CacheMode.BYPASS, - markdown_generator=DefaultMarkdownGenerator( - content_filter=LLMContentFilter( - provider="openai/gpt-40", - instruction="Extract key technical concepts" - ) - ), - stream=True - ) - - async for result in await client.crawl( - urls=["https://example.com"], - browser_config=browser_config, - crawler_config=crawler_config - ): - print(f"Streaming result for: {result.url}") - - # # Test 3: Get schema - # schemas = await client.get_schema() - # print("Retrieved client schemas for:", list(schemas.keys())) - -async def main(): - """Run all tests""" - # Test direct API - print("Testing direct API calls...") - await test_direct_api() - - # Test client SDK - print("\nTesting client SDK...") - await test_with_client() - -if __name__ == "__main__": - asyncio.run(main()) -``` +Check out the `examples` folder in our repository for full working examples! Here are two to get you started: +[Using Client SDK](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/docker_python_sdk_example.py) +[Using REST API](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/docker_python_rest_api_example.py) ## Server Configuration @@ -811,8 +645,9 @@ rate_limiting: # Security Configuration security: enabled: false # Master toggle for security features + jwt_enabled: true # Enable JWT authentication https_redirect: True # Force HTTPS - trusted_hosts: ["*"] # Allowed hosts (use specific domains in production) + trusted_hosts: ["*"] # Allowed hosts (use specific domains in production) headers: # Security headers x_content_type_options: "nosniff" x_frame_options: "DENY" @@ -842,9 +677,59 @@ observability: endpoint: "/health" # Health check endpoint ``` +### JWT Authentication + +When `security.jwt_enabled` is set to `true` in your config.yml, all endpoints require JWT authentication via bearer tokens. Here's how it works: + +#### Getting a Token +```python +POST /token +Content-Type: application/json + +{ + "email": "user@example.com" +} +``` + +The endpoint returns: +```json +{ + "email": "user@example.com", + "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOi...", + "token_type": "bearer" +} +``` + +#### Using the Token +Add the token to your requests: +```bash +curl -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGci..." http://localhost:8000/crawl +``` + +Using the Python SDK: +```python +from crawl4ai.docker_client import Crawl4aiDockerClient + +async with Crawl4aiDockerClient() as client: + # Authenticate first + await client.authenticate("user@example.com") + + # Now all requests will include the token automatically + result = await client.crawl(urls=["https://example.com"]) +``` + +#### Production Considerations 💡 +The default implementation uses a simple email verification. For production use, consider: +- Email verification via OTP/magic links +- OAuth2 integration +- Rate limiting token generation +- Token expiration and refresh mechanisms +- IP-based restrictions + ### Configuration Tips and Best Practices 1. **Production Settings** 🏭 + ```yaml app: reload: False # Disable reload in production @@ -860,6 +745,7 @@ observability: ``` 2. **Development Settings** 🛠️ + ```yaml app: reload: True # Enable hot reloading @@ -870,6 +756,7 @@ observability: ``` 3. **High-Traffic Settings** 🚦 + ```yaml crawler: memory_threshold_percent: 85.0 # More conservative memory limit @@ -880,17 +767,28 @@ observability: ### Customizing Your Configuration #### Method 1: Pre-build Configuration + ```bash # Copy and modify config before building -cp deploy/docker/config.yml custom-config.yml -vim custom-config.yml +cd crawl4ai/deploy +vim custom-config.yml # Or use any editor # Build with custom config -docker build -t crawl4ai-server:prod \ - --build-arg CONFIG_PATH=custom-config.yml . +docker build --platform=linux/amd64 --no-cache -t crawl4ai:latest . ``` -#### Method 2: Runtime Configuration +#### Method 2: Build-time Configuration + +Use a custom config during build: + +```bash +# Build with custom config +docker build --platform=linux/amd64 --no-cache \ + --build-arg CONFIG_PATH=/path/to/custom-config.yml \ + -t crawl4ai:latest . +``` + +#### Method 3: Runtime Configuration ```bash # Mount custom config at runtime docker run -d -p 8000:8000 \ @@ -898,6 +796,9 @@ docker run -d -p 8000:8000 \ crawl4ai-server:prod ``` +> 💡 Note: When using Method 2, `/path/to/custom-config.yml` is relative to deploy directory. +> 💡 Note: When using Method 3, ensure your custom config file has all required fields as the container will use this instead of the built-in config. + ### Configuration Recommendations 1. **Security First** 🔒 @@ -921,56 +822,6 @@ docker run -d -p 8000:8000 \ - Increase batch_process timeout for large content - Adjust stream_init timeout based on initial response times -### Configuration Migration - -When upgrading Crawl4AI, follow these steps: - -1. Back up your current config: - ```bash - cp /app/config.yml /app/config.yml.backup - ``` - -2. Use version control: - ```bash - git add config.yml - git commit -m "Save current server configuration" - ``` - -3. Test in staging first: - ```bash - docker run -d -p 8001:8000 \ # Use different port - -v $(pwd)/new-config.yml:/app/config.yml \ - crawl4ai-server:prod - ``` - -### Common Configuration Scenarios - -1. **Basic Development Setup** - ```yaml - security: - enabled: false - logging: - level: "DEBUG" - ``` - -2. **Production API Server** - ```yaml - security: - enabled: true - trusted_hosts: ["api.yourdomain.com"] - rate_limiting: - enabled: true - default_limit: "50/minute" - ``` - -3. **High-Performance Crawler** - ```yaml - crawler: - memory_threshold_percent: 90.0 - timeouts: - batch_process: 600.0 - ``` - ## Getting Help We're here to help you succeed with Crawl4AI! Here's how to get support: diff --git a/deploy/docker/api.py b/deploy/docker/api.py index b526fb37..df3d5c49 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -1,6 +1,8 @@ -from math import e import os import json +import asyncio +from typing import List, Tuple + import logging from typing import Optional, AsyncGenerator from urllib.parse import unquote @@ -13,7 +15,10 @@ from crawl4ai import ( AsyncWebCrawler, CrawlerRunConfig, LLMExtractionStrategy, - CacheMode + CacheMode, + BrowserConfig, + MemoryAdaptiveDispatcher, + RateLimiter ) from crawl4ai.utils import perform_completion_with_backoff from crawl4ai.content_filter_strategy import ( @@ -334,7 +339,6 @@ def create_task_response(task: dict, task_id: str, base_url: str) -> dict: async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]: """Stream results with heartbeats and completion markers.""" - import asyncio import json from utils import datetime_handler @@ -358,4 +362,81 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) try: await crawler.close() except Exception as e: - logger.error(f"Crawler cleanup error: {e}") \ No newline at end of file + logger.error(f"Crawler cleanup error: {e}") + +async def handle_crawl_request( + urls: List[str], + browser_config: dict, + crawler_config: dict, + config: dict +) -> dict: + """Handle non-streaming crawl requests.""" + try: + browser_config = BrowserConfig.load(browser_config) + crawler_config = CrawlerRunConfig.load(crawler_config) + + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=config["crawler"]["memory_threshold_percent"], + rate_limiter=RateLimiter( + base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) + ) + ) + + async with AsyncWebCrawler(config=browser_config) as crawler: + results = await crawler.arun_many( + urls=urls, + config=crawler_config, + dispatcher=dispatcher + ) + + return { + "success": True, + "results": [result.model_dump() for result in results] + } + + except Exception as e: + logger.error(f"Crawl error: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) + +async def handle_stream_crawl_request( + urls: List[str], + browser_config: dict, + crawler_config: dict, + config: dict +) -> Tuple[AsyncWebCrawler, AsyncGenerator]: + """Handle streaming crawl requests.""" + try: + browser_config = BrowserConfig.load(browser_config) + browser_config.verbose = True + crawler_config = CrawlerRunConfig.load(crawler_config) + crawler_config.scraping_strategy = LXMLWebScrapingStrategy() + + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=config["crawler"]["memory_threshold_percent"], + rate_limiter=RateLimiter( + base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) + ) + ) + + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + results_gen = await crawler.arun_many( + urls=urls, + config=crawler_config, + dispatcher=dispatcher + ) + + return crawler, results_gen + + except Exception as e: + if 'crawler' in locals(): + await crawler.close() + logger.error(f"Stream crawl error: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) \ No newline at end of file diff --git a/deploy/docker/auth.py b/deploy/docker/auth.py new file mode 100644 index 00000000..8851bd36 --- /dev/null +++ b/deploy/docker/auth.py @@ -0,0 +1,46 @@ +import os +from datetime import datetime, timedelta, timezone +from typing import Dict, Optional +from jwt import JWT, jwk_from_dict +from jwt.utils import get_int_from_datetime +from fastapi import Depends, HTTPException +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import EmailStr +from pydantic.main import BaseModel +import base64 + +instance = JWT() +security = HTTPBearer() +SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret") +ACCESS_TOKEN_EXPIRE_MINUTES = 60 + +def get_jwk_from_secret(secret: str): + """Convert a secret string into a JWK object.""" + secret_bytes = secret.encode('utf-8') + b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8') + return jwk_from_dict({"kty": "oct", "k": b64_secret}) + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: + """Create a JWT access token with an expiration.""" + to_encode = data.copy() + expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) + to_encode.update({"exp": get_int_from_datetime(expire)}) + signing_key = get_jwk_from_secret(SECRET_KEY) + return instance.encode(to_encode, signing_key, alg='HS256') + +def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict: + """Verify the JWT token from the Authorization header.""" + token = credentials.credentials + verifying_key = get_jwk_from_secret(SECRET_KEY) + try: + payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256') + return payload + except Exception: + raise HTTPException(status_code=401, detail="Invalid or expired token") + +def get_token_dependency(config: Dict): + """Return the token dependency if JWT is enabled, else None.""" + return verify_token if config.get("security", {}).get("jwt_enabled", False) else None + +class TokenRequest(BaseModel): + email: EmailStr \ No newline at end of file diff --git a/deploy/docker/config.yml b/deploy/docker/config.yml index a9408b45..fc118bf4 100644 --- a/deploy/docker/config.yml +++ b/deploy/docker/config.yml @@ -38,8 +38,9 @@ rate_limiting: # Security Configuration security: - enabled: false - https_redirect: True + enabled: true + jwt_enabled: true + https_redirect: false trusted_hosts: ["*"] headers: x_content_type_options: "nosniff" diff --git a/deploy/docker/server-secure.py b/deploy/docker/server-secure.py deleted file mode 100644 index 25548957..00000000 --- a/deploy/docker/server-secure.py +++ /dev/null @@ -1,324 +0,0 @@ -import os -import sys -import time -import base64 -from typing import List, Optional, Dict -from datetime import datetime, timedelta, timezone -from jwt import JWT, jwk_from_dict -from jwt.utils import get_int_from_datetime -from fastapi import FastAPI, HTTPException, Request, status, Depends, Query, Path -from fastapi.responses import StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse -from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials -from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware -from fastapi.middleware.trustedhost import TrustedHostMiddleware -from pydantic import BaseModel, Field -from slowapi import Limiter -from slowapi.util import get_remote_address -from prometheus_fastapi_instrumentator import Instrumentator -from redis import asyncio as aioredis -from pydantic import EmailStr - -sys.path.append(os.path.dirname(os.path.realpath(__file__))) -from utils import FilterType, load_config, setup_logging, verify_email_domain -from api import ( - handle_markdown_request, - handle_llm_qa -) - -__version__ = "0.1.2" - -class CrawlRequest(BaseModel): - urls: List[str] = Field( - min_length=1, - max_length=100, - json_schema_extra={ - "items": {"type": "string", "maxLength": 2000, "pattern": "\\S"} - } - ) - browser_config: Optional[Dict] = Field( - default_factory=dict, - example={"headless": True, "viewport": {"width": 1200}} - ) - crawler_config: Optional[Dict] = Field( - default_factory=dict, - example={"stream": True, "cache_mode": "aggressive"} - ) - -# Load configuration and setup -config = load_config() -setup_logging(config) - -# Initialize Redis -redis = aioredis.from_url(config["redis"].get("uri", "redis://localhost")) - -# Initialize rate limiter -limiter = Limiter( - key_func=get_remote_address, - default_limits=[config["rate_limiting"]["default_limit"]], - storage_uri=config["rate_limiting"]["storage_uri"] -) - -app = FastAPI( - title=config["app"]["title"], - version=config["app"]["version"] -) - -# Configure middleware -if config["security"]["enabled"]: - if config["security"]["https_redirect"]: - app.add_middleware(HTTPSRedirectMiddleware) - if config["security"]["trusted_hosts"] and config["security"]["trusted_hosts"] != ["*"]: - app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=config["security"]["trusted_hosts"] - ) - -# Prometheus instrumentation -if config["observability"]["prometheus"]["enabled"]: - Instrumentator().instrument(app).expose(app) - -# ------------------------------- -# JWT Token Authentication Setup -# ------------------------------- - -instance = JWT() - -# Use a secret key for symmetric signing (HS256) -SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret") -ACCESS_TOKEN_EXPIRE_MINUTES = 60 - -# FastAPI security scheme for extracting the Authorization header -security = HTTPBearer() - -def get_jwk_from_secret(secret: str): - """ - Convert a simple secret string into a JWK object. - The secret is base64 URL-safe encoded (without padding) as required. - """ - secret_bytes = secret.encode('utf-8') - b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8') - return jwk_from_dict({"kty": "oct", "k": b64_secret}) - -def create_access_token(data: dict, expires_delta: timedelta = None): - """ - Create a JWT access token with an expiration. - """ - to_encode = data.copy() - expire = datetime.now(timezone.utc) + (expires_delta if expires_delta else timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) - to_encode.update({"exp": get_int_from_datetime(expire)}) - # Convert the secret into a JWK object - signing_key = get_jwk_from_secret(SECRET_KEY) - encoded_jwt = instance.encode(to_encode, signing_key, alg='HS256') - return encoded_jwt - -def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): - """ - Verify the JWT token extracted from the Authorization header. - """ - token = credentials.credentials - # Convert the secret into a JWK object for verification - verifying_key = get_jwk_from_secret(SECRET_KEY) - try: - payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256') - return payload - except Exception as e: - raise HTTPException(status_code=401, detail="Invalid or expired token") - - -# ------------------------------- -# Endpoints -# ------------------------------- - -@app.middleware("http") -async def add_security_headers(request: Request, call_next): - response = await call_next(request) - if config["security"]["enabled"]: - response.headers.update(config["security"]["headers"]) - return response - - -class TokenRequest(BaseModel): - email: EmailStr - -@app.post("/token") -async def get_token(request_data: TokenRequest): - """ - Minimal endpoint to generate a JWT token. - In a real-world scenario, you'd validate credentials here. - """ - # token = create_access_token({"sub": "user1"}) - # return {"access_token": token, "token_type": "bearer"} - # Verify that the email domain likely exists (has MX records) - if not verify_email_domain(request_data.email): - raise HTTPException( - status_code=400, - detail="Email domain verification failed. Please use a valid email address." - ) - token = create_access_token({"sub": request_data.email}) - return {"email": request_data.email, "access_token": token, "token_type": "bearer"} - -@app.get("/md/{url:path}") -@limiter.limit(config["rate_limiting"]["default_limit"]) -async def get_markdown( - request: Request, - url: str, - f: FilterType = FilterType.FIT, - q: Optional[str] = None, - c: Optional[str] = "0", - token_data: dict = Depends(verify_token) -): - """Get markdown from URL with optional filtering.""" - result = await handle_markdown_request(url, f, q, c, config) - return PlainTextResponse(result) - -@app.get("/llm/{url:path}", description="URL should be without http/https prefix") -async def llm_endpoint( - request: Request, - url: str = Path(..., description="Domain and path without protocol"), - q: Optional[str] = Query(None, description="Question to ask about the page content"), - token_data: dict = Depends(verify_token) -): - """QA endpoint that uses LLM with crawled content as context.""" - if not q: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Query parameter 'q' is required" - ) - - # Ensure URL starts with http/https - if not url.startswith(('http://', 'https://')): - url = 'https://' + url - - try: - answer = await handle_llm_qa(url, q, config) - return JSONResponse({"answer": answer}) - except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -@app.get("/schema") -async def get_schema(): - """Endpoint for client-side validation schema.""" - from crawl4ai import BrowserConfig, CrawlerRunConfig - return { - "browser": BrowserConfig.model_json_schema(), - "crawler": CrawlerRunConfig.model_json_schema() - } - -@app.get(config["observability"]["health_check"]["endpoint"]) -async def health(): - """Health check endpoint.""" - return {"status": "ok", "timestamp": time.time(), "version": __version__} - -@app.get(config["observability"]["prometheus"]["endpoint"]) -async def metrics(): - """Prometheus metrics endpoint.""" - return RedirectResponse(url=config["observability"]["prometheus"]["endpoint"]) - -# ------------------------------- -# Protected Endpoint Example: /crawl -# ------------------------------- -@app.post("/crawl") -@limiter.limit(config["rate_limiting"]["default_limit"]) -async def crawl(request: Request, crawl_request: CrawlRequest, token_data: dict = Depends(verify_token)): - """Handle crawl requests. Protected by JWT authentication.""" - from crawl4ai import ( - AsyncWebCrawler, - BrowserConfig, - CrawlerRunConfig, - MemoryAdaptiveDispatcher, - RateLimiter - ) - import asyncio - import logging - - logger = logging.getLogger(__name__) - crawler = None - - try: - if not crawl_request.urls: - logger.error("Empty URL list received") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="At least one URL required" - ) - - browser_config = BrowserConfig.load(crawl_request.browser_config) - crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config) - - dispatcher = MemoryAdaptiveDispatcher( - memory_threshold_percent=config["crawler"]["memory_threshold_percent"], - rate_limiter=RateLimiter( - base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) - ) - - if crawler_config.stream: - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - results_gen = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["stream_init"] - ) - - from api import stream_results - return StreamingResponse( - stream_results(crawler, results_gen), - media_type='application/x-ndjson', - headers={ - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Stream-Status': 'active' - } - ) - else: - async with AsyncWebCrawler(config=browser_config) as crawler: - results = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["batch_process"] - ) - return JSONResponse({ - "success": True, - "results": [result.model_dump() for result in results] - }) - - except asyncio.TimeoutError as e: - logger.error(f"Operation timed out: {str(e)}") - raise HTTPException( - status_code=status.HTTP_504_GATEWAY_TIMEOUT, - detail="Processing timeout" - ) - except Exception as e: - logger.error(f"Server error: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Internal server error" - ) - finally: - if crawler: - try: - await crawler.close() - except Exception as e: - logger.error(f"Final crawler cleanup error: {e}") - - -if __name__ == "__main__": - import uvicorn - uvicorn.run( - "server-secure:app", - host=config["app"]["host"], - port=config["app"]["port"], - reload=config["app"]["reload"], - timeout_keep_alive=config["app"]["timeout_keep_alive"] - ) diff --git a/deploy/docker/server-token-free.py b/deploy/docker/server-token-free.py deleted file mode 100644 index 121693bc..00000000 --- a/deploy/docker/server-token-free.py +++ /dev/null @@ -1,267 +0,0 @@ -import os -import sys -import time -from typing import List, Optional - -sys.path.append(os.path.dirname(os.path.realpath(__file__))) - -from redis import asyncio as aioredis -from fastapi import FastAPI, HTTPException, Request, status -from fastapi.responses import StreamingResponse, RedirectResponse -from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware -from fastapi.middleware.trustedhost import TrustedHostMiddleware -from pydantic import BaseModel, Field -from slowapi import Limiter -from slowapi.util import get_remote_address -from prometheus_fastapi_instrumentator import Instrumentator -from fastapi.responses import PlainTextResponse -from fastapi.responses import JSONResponse -from fastapi.background import BackgroundTasks -from typing import Dict -from fastapi import Query, Path -import os - -from utils import ( - FilterType, - load_config, - setup_logging -) -from api import ( - handle_markdown_request, - handle_llm_request, - handle_llm_qa -) - -# Load configuration and setup -config = load_config() -setup_logging(config) - -# Initialize Redis -redis = aioredis.from_url(config["redis"].get("uri", "redis://localhost")) - -# Initialize rate limiter -limiter = Limiter( - key_func=get_remote_address, - default_limits=[config["rate_limiting"]["default_limit"]], - storage_uri=config["rate_limiting"]["storage_uri"] -) - -app = FastAPI( - title=config["app"]["title"], - version=config["app"]["version"] -) - -# Configure middleware -if config["security"]["enabled"]: - if config["security"]["https_redirect"]: - app.add_middleware(HTTPSRedirectMiddleware) - if config["security"]["trusted_hosts"] and config["security"]["trusted_hosts"] != ["*"]: - app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=config["security"]["trusted_hosts"] - ) - -# Prometheus instrumentation -if config["observability"]["prometheus"]["enabled"]: - Instrumentator().instrument(app).expose(app) - -class CrawlRequest(BaseModel): - urls: List[str] = Field( - min_length=1, - max_length=100, - json_schema_extra={ - "items": {"type": "string", "maxLength": 2000, "pattern": "\\S"} - } - ) - browser_config: Optional[Dict] = Field( - default_factory=dict, - example={"headless": True, "viewport": {"width": 1200}} - ) - crawler_config: Optional[Dict] = Field( - default_factory=dict, - example={"stream": True, "cache_mode": "aggressive"} - ) - -@app.middleware("http") -async def add_security_headers(request: Request, call_next): - response = await call_next(request) - if config["security"]["enabled"]: - response.headers.update(config["security"]["headers"]) - return response - -@app.get("/md/{url:path}") -@limiter.limit(config["rate_limiting"]["default_limit"]) -async def get_markdown( - request: Request, - url: str, - f: FilterType = FilterType.FIT, - q: Optional[str] = None, - c: Optional[str] = "0" -): - """Get markdown from URL with optional filtering.""" - result = await handle_markdown_request(url, f, q, c, config) - return PlainTextResponse(result) - -@app.get("/llm/{url:path}", description="URL should be without http/https prefix") -async def llm_endpoint( - request: Request, - url: str = Path(..., description="Domain and path without protocol"), - q: Optional[str] = Query(None, description="Question to ask about the page content"), -): - """QA endpoint that uses LLM with crawled content as context.""" - if not q: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Query parameter 'q' is required" - ) - - # Ensure URL starts with http/https - if not url.startswith(('http://', 'https://')): - url = 'https://' + url - - try: - answer = await handle_llm_qa(url, q, config) - return JSONResponse({"answer": answer}) - except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -# @app.get("/llm/{input:path}") -# @limiter.limit(config["rate_limiting"]["default_limit"]) -# async def llm_endpoint( -# request: Request, -# background_tasks: BackgroundTasks, -# input: str, -# q: Optional[str] = None, -# s: Optional[str] = None, -# c: Optional[str] = "0" -# ): -# """Handle LLM extraction requests.""" -# return await handle_llm_request( -# redis, background_tasks, request, input, q, s, c, config -# ) - - - -@app.get("/schema") -async def get_schema(): - """Endpoint for client-side validation schema.""" - from crawl4ai import BrowserConfig, CrawlerRunConfig - return { - "browser": BrowserConfig().dump(), - "crawler": CrawlerRunConfig().dump() - } - -@app.get(config["observability"]["health_check"]["endpoint"]) -async def health(): - """Health check endpoint.""" - return {"status": "ok", "timestamp": time.time()} - -@app.get(config["observability"]["prometheus"]["endpoint"]) -async def metrics(): - """Prometheus metrics endpoint.""" - return RedirectResponse(url=config["observability"]["prometheus"]["endpoint"]) - -@app.post("/crawl") -@limiter.limit(config["rate_limiting"]["default_limit"]) -async def crawl(request: Request, crawl_request: CrawlRequest): - """Handle crawl requests.""" - from crawl4ai import ( - AsyncWebCrawler, - BrowserConfig, - CrawlerRunConfig, - MemoryAdaptiveDispatcher, - RateLimiter - ) - import asyncio - import logging - - logger = logging.getLogger(__name__) - crawler = None - - try: - if not crawl_request.urls: - logger.error("Empty URL list received") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="At least one URL required" - ) - - browser_config = BrowserConfig.load(crawl_request.browser_config) - crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config) - - dispatcher = MemoryAdaptiveDispatcher( - memory_threshold_percent=config["crawler"]["memory_threshold_percent"], - rate_limiter=RateLimiter( - base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) - ) - - if crawler_config.stream: - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - results_gen = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["stream_init"] - ) - - from api import stream_results - return StreamingResponse( - stream_results(crawler, results_gen), - media_type='application/x-ndjson', - headers={ - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Stream-Status': 'active' - } - ) - else: - async with AsyncWebCrawler(config=browser_config) as crawler: - results = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["batch_process"] - ) - return JSONResponse({ - "success": True, - "results": [result.model_dump() for result in results] - }) - - except asyncio.TimeoutError as e: - logger.error(f"Operation timed out: {str(e)}") - raise HTTPException( - status_code=status.HTTP_504_GATEWAY_TIMEOUT, - detail="Processing timeout" - ) - except Exception as e: - logger.error(f"Server error: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Internal server error" - ) - finally: - if crawler: - try: - await crawler.close() - except Exception as e: - logger.error(f"Final crawler cleanup error: {e}") - -if __name__ == "__main__": - import uvicorn - uvicorn.run( - "server:app", - host=config["app"]["host"], - port=config["app"]["port"], - reload=config["app"]["reload"], - timeout_keep_alive=config["app"]["timeout_keep_alive"] - ) \ No newline at end of file diff --git a/deploy/docker/server.py b/deploy/docker/server.py index 121693bc..edb55130 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -1,36 +1,34 @@ import os import sys import time -from typing import List, Optional - -sys.path.append(os.path.dirname(os.path.realpath(__file__))) - -from redis import asyncio as aioredis -from fastapi import FastAPI, HTTPException, Request, status -from fastapi.responses import StreamingResponse, RedirectResponse +from typing import List, Optional, Dict +from fastapi import FastAPI, HTTPException, Request, Query, Path, Depends +from fastapi.responses import StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from pydantic import BaseModel, Field from slowapi import Limiter from slowapi.util import get_remote_address from prometheus_fastapi_instrumentator import Instrumentator -from fastapi.responses import PlainTextResponse -from fastapi.responses import JSONResponse -from fastapi.background import BackgroundTasks -from typing import Dict -from fastapi import Query, Path -import os +from redis import asyncio as aioredis -from utils import ( - FilterType, - load_config, - setup_logging -) +sys.path.append(os.path.dirname(os.path.realpath(__file__))) +from utils import FilterType, load_config, setup_logging, verify_email_domain from api import ( handle_markdown_request, - handle_llm_request, - handle_llm_qa + handle_llm_qa, + handle_stream_crawl_request, + handle_crawl_request, + stream_results ) +from auth import create_access_token, get_token_dependency, TokenRequest # Import from auth.py + +__version__ = "0.2.6" + +class CrawlRequest(BaseModel): + urls: List[str] = Field(min_length=1, max_length=100) + browser_config: Optional[Dict] = Field(default_factory=dict) + crawler_config: Optional[Dict] = Field(default_factory=dict) # Load configuration and setup config = load_config() @@ -52,36 +50,24 @@ app = FastAPI( ) # Configure middleware -if config["security"]["enabled"]: - if config["security"]["https_redirect"]: - app.add_middleware(HTTPSRedirectMiddleware) - if config["security"]["trusted_hosts"] and config["security"]["trusted_hosts"] != ["*"]: - app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=config["security"]["trusted_hosts"] - ) +def setup_security_middleware(app, config): + sec_config = config.get("security", {}) + if sec_config.get("enabled", False): + if sec_config.get("https_redirect", False): + app.add_middleware(HTTPSRedirectMiddleware) + if sec_config.get("trusted_hosts", []) != ["*"]: + app.add_middleware(TrustedHostMiddleware, allowed_hosts=sec_config["trusted_hosts"]) + +setup_security_middleware(app, config) # Prometheus instrumentation if config["observability"]["prometheus"]["enabled"]: Instrumentator().instrument(app).expose(app) -class CrawlRequest(BaseModel): - urls: List[str] = Field( - min_length=1, - max_length=100, - json_schema_extra={ - "items": {"type": "string", "maxLength": 2000, "pattern": "\\S"} - } - ) - browser_config: Optional[Dict] = Field( - default_factory=dict, - example={"headless": True, "viewport": {"width": 1200}} - ) - crawler_config: Optional[Dict] = Field( - default_factory=dict, - example={"stream": True, "cache_mode": "aggressive"} - ) +# Get token dependency based on config +token_dependency = get_token_dependency(config) +# Middleware for security headers @app.middleware("http") async def add_security_headers(request: Request, call_next): response = await call_next(request) @@ -89,6 +75,15 @@ async def add_security_headers(request: Request, call_next): response.headers.update(config["security"]["headers"]) return response +# Token endpoint (always available, but usage depends on config) +@app.post("/token") +async def get_token(request_data: TokenRequest): + if not verify_email_domain(request_data.email): + raise HTTPException(status_code=400, detail="Invalid email domain") + token = create_access_token({"sub": request_data.email}) + return {"email": request_data.email, "access_token": token, "token_type": "bearer"} + +# Endpoints with conditional auth @app.get("/md/{url:path}") @limiter.limit(config["rate_limiting"]["default_limit"]) async def get_markdown( @@ -96,165 +91,84 @@ async def get_markdown( url: str, f: FilterType = FilterType.FIT, q: Optional[str] = None, - c: Optional[str] = "0" + c: Optional[str] = "0", + token_data: Optional[Dict] = Depends(token_dependency) ): - """Get markdown from URL with optional filtering.""" result = await handle_markdown_request(url, f, q, c, config) return PlainTextResponse(result) @app.get("/llm/{url:path}", description="URL should be without http/https prefix") async def llm_endpoint( request: Request, - url: str = Path(..., description="Domain and path without protocol"), - q: Optional[str] = Query(None, description="Question to ask about the page content"), + url: str = Path(...), + q: Optional[str] = Query(None), + token_data: Optional[Dict] = Depends(token_dependency) ): - """QA endpoint that uses LLM with crawled content as context.""" if not q: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Query parameter 'q' is required" - ) - - # Ensure URL starts with http/https + raise HTTPException(status_code=400, detail="Query parameter 'q' is required") if not url.startswith(('http://', 'https://')): url = 'https://' + url - try: answer = await handle_llm_qa(url, q, config) return JSONResponse({"answer": answer}) except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) + raise HTTPException(status_code=500, detail=str(e)) -# @app.get("/llm/{input:path}") -# @limiter.limit(config["rate_limiting"]["default_limit"]) -# async def llm_endpoint( -# request: Request, -# background_tasks: BackgroundTasks, -# input: str, -# q: Optional[str] = None, -# s: Optional[str] = None, -# c: Optional[str] = "0" -# ): -# """Handle LLM extraction requests.""" -# return await handle_llm_request( -# redis, background_tasks, request, input, q, s, c, config -# ) - - - @app.get("/schema") async def get_schema(): - """Endpoint for client-side validation schema.""" from crawl4ai import BrowserConfig, CrawlerRunConfig - return { - "browser": BrowserConfig().dump(), - "crawler": CrawlerRunConfig().dump() - } + return {"browser": BrowserConfig().dump(), "crawler": CrawlerRunConfig().dump()} @app.get(config["observability"]["health_check"]["endpoint"]) async def health(): - """Health check endpoint.""" - return {"status": "ok", "timestamp": time.time()} + return {"status": "ok", "timestamp": time.time(), "version": __version__} @app.get(config["observability"]["prometheus"]["endpoint"]) async def metrics(): - """Prometheus metrics endpoint.""" return RedirectResponse(url=config["observability"]["prometheus"]["endpoint"]) @app.post("/crawl") @limiter.limit(config["rate_limiting"]["default_limit"]) -async def crawl(request: Request, crawl_request: CrawlRequest): - """Handle crawl requests.""" - from crawl4ai import ( - AsyncWebCrawler, - BrowserConfig, - CrawlerRunConfig, - MemoryAdaptiveDispatcher, - RateLimiter +async def crawl( + request: Request, + crawl_request: CrawlRequest, + token_data: Optional[Dict] = Depends(token_dependency) +): + if not crawl_request.urls: + raise HTTPException(status_code=400, detail="At least one URL required") + + results = await handle_crawl_request( + urls=crawl_request.urls, + browser_config=crawl_request.browser_config, + crawler_config=crawl_request.crawler_config, + config=config ) - import asyncio - import logging - logger = logging.getLogger(__name__) - crawler = None + return JSONResponse(results) - try: - if not crawl_request.urls: - logger.error("Empty URL list received") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="At least one URL required" - ) - browser_config = BrowserConfig.load(crawl_request.browser_config) - crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config) +@app.post("/crawl/stream") +@limiter.limit(config["rate_limiting"]["default_limit"]) +async def crawl_stream( + request: Request, + crawl_request: CrawlRequest, + token_data: Optional[Dict] = Depends(token_dependency) +): + if not crawl_request.urls: + raise HTTPException(status_code=400, detail="At least one URL required") - dispatcher = MemoryAdaptiveDispatcher( - memory_threshold_percent=config["crawler"]["memory_threshold_percent"], - rate_limiter=RateLimiter( - base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) - ) + crawler, results_gen = await handle_stream_crawl_request( + urls=crawl_request.urls, + browser_config=crawl_request.browser_config, + crawler_config=crawl_request.crawler_config, + config=config + ) - if crawler_config.stream: - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - results_gen = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["stream_init"] - ) - - from api import stream_results - return StreamingResponse( - stream_results(crawler, results_gen), - media_type='application/x-ndjson', - headers={ - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Stream-Status': 'active' - } - ) - else: - async with AsyncWebCrawler(config=browser_config) as crawler: - results = await asyncio.wait_for( - crawler.arun_many( - urls=crawl_request.urls, - config=crawler_config, - dispatcher=dispatcher - ), - timeout=config["crawler"]["timeouts"]["batch_process"] - ) - return JSONResponse({ - "success": True, - "results": [result.model_dump() for result in results] - }) - - except asyncio.TimeoutError as e: - logger.error(f"Operation timed out: {str(e)}") - raise HTTPException( - status_code=status.HTTP_504_GATEWAY_TIMEOUT, - detail="Processing timeout" - ) - except Exception as e: - logger.error(f"Server error: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Internal server error" - ) - finally: - if crawler: - try: - await crawler.close() - except Exception as e: - logger.error(f"Final crawler cleanup error: {e}") + return StreamingResponse( + stream_results(crawler, results_gen), + media_type='application/x-ndjson', + headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Stream-Status': 'active'} + ) if __name__ == "__main__": import uvicorn diff --git a/deploy/docker/supervisord.conf b/deploy/docker/supervisord.conf index 628f50ba..1274f2c3 100644 --- a/deploy/docker/supervisord.conf +++ b/deploy/docker/supervisord.conf @@ -7,6 +7,6 @@ autorestart=true priority=10 [program:gunicorn] -command=gunicorn --bind 0.0.0.0:8000 --workers 4 --threads 2 --timeout 120 --graceful-timeout 30 --log-level info --worker-class uvicorn.workers.UvicornWorker server:app +command=gunicorn --bind 0.0.0.0:8000 --workers 4 --threads 2 --timeout 300 --graceful-timeout 60 --keep-alive 65 --log-level debug --worker-class uvicorn.workers.UvicornWorker --max-requests 1000 --max-requests-jitter 50 server:app autorestart=true priority=20 \ No newline at end of file diff --git a/docs/examples/docker_python_rest_api_example.py b/docs/examples/docker_python_rest_api_example.py new file mode 100644 index 00000000..000d6464 --- /dev/null +++ b/docs/examples/docker_python_rest_api_example.py @@ -0,0 +1,214 @@ +import asyncio +import json +from typing import Optional +from urllib.parse import quote + +async def get_token(session, email: str = "test@example.com") -> str: + """Fetch a JWT token from the /token endpoint.""" + url = "http://localhost:8000/token" + payload = {"email": email} + print(f"\nFetching token from {url} with email: {email}") + try: + async with session.post(url, json=payload) as response: + status = response.status + data = await response.json() + print(f"Token Response Status: {status}") + print(f"Token Response: {json.dumps(data, indent=2)}") + if status == 200: + return data["access_token"] + else: + raise Exception(f"Failed to get token: {data.get('detail', 'Unknown error')}") + except Exception as e: + print(f"Error fetching token: {str(e)}") + raise + +async def test_endpoint( + session, + endpoint: str, + url: str, + token: str, + params: Optional[dict] = None, + expected_status: int = 200 +) -> Optional[dict]: + """Test an endpoint with token and print results.""" + params = params or {} + param_str = "&".join(f"{k}={v}" for k, v in params.items()) + full_url = f"http://localhost:8000/{endpoint}/{quote(url)}" + if param_str: + full_url += f"?{param_str}" + + headers = {"Authorization": f"Bearer {token}"} + print(f"\nTesting: {full_url}") + + try: + async with session.get(full_url, headers=headers) as response: + status = response.status + try: + data = await response.json() + except: + data = await response.text() + + print(f"Status: {status} (Expected: {expected_status})") + if isinstance(data, dict): + print(f"Response: {json.dumps(data, indent=2)}") + else: + print(f"Response: {data[:500]}...") # First 500 chars + assert status == expected_status, f"Expected {expected_status}, got {status}" + return data + except Exception as e: + print(f"Error: {str(e)}") + return None + + +async def test_stream_crawl(session, token: str): + """Test the /crawl/stream endpoint with multiple URLs.""" + url = "http://localhost:8000/crawl/stream" + payload = { + "urls": [ + "https://example.com", + "https://example.com/page1", # Replicated example.com with variation + "https://example.com/page2", # Replicated example.com with variation + "https://example.com/page3", # Replicated example.com with variation + # "https://www.python.org", + # "https://news.ycombinator.com/news" + ], + "browser_config": {"headless": True, "viewport": {"width": 1200}}, + "crawler_config": {"stream": True, "cache_mode": "aggressive"} + } + headers = {"Authorization": f"Bearer {token}"} + print(f"\nTesting Streaming Crawl: {url}") + print(f"Payload: {json.dumps(payload, indent=2)}") + + try: + async with session.post(url, json=payload, headers=headers) as response: + status = response.status + print(f"Status: {status} (Expected: 200)") + assert status == 200, f"Expected 200, got {status}" + + # Read streaming response line-by-line (NDJSON) + async for line in response.content: + if line: + data = json.loads(line.decode('utf-8').strip()) + print(f"Streamed Result: {json.dumps(data, indent=2)}") + except Exception as e: + print(f"Error in streaming crawl test: {str(e)}") + +async def run_tests(): + import aiohttp + print("Starting API Tests...") + + # Test URLs + urls = [ + "example.com", + "https://www.python.org", + "https://news.ycombinator.com/news", + "https://github.com/trending" + ] + + async with aiohttp.ClientSession() as session: + token = "test_token" + # If jwt is enabled, authenticate first + # Fetch token once and reuse it + # token = await get_token(session) + # if not token: + # print("Aborting tests due to token failure!") + # return + + print("\n=== Testing Crawl Endpoint ===") + crawl_payload = { + "urls": ["https://example.com"], + "browser_config": {"headless": True}, + "crawler_config": {"stream": False} + } + async with session.post( + "http://localhost:8000/crawl", + json=crawl_payload, + headers={"Authorization": f"Bearer {token}"} + ) as response: + status = response.status + data = await response.json() + print(f"\nCrawl Endpoint Status: {status}") + print(f"Crawl Response: {json.dumps(data, indent=2)}") + + + print("\n=== Testing Crawl Stream Endpoint ===") + await test_stream_crawl(session, token) + + print("\n=== Testing Markdown Endpoint ===") + for url in []: #urls: + for filter_type in ["raw", "fit", "bm25", "llm"]: + params = {"f": filter_type} + if filter_type in ["bm25", "llm"]: + params["q"] = "extract main content" + + for cache in ["0", "1"]: + params["c"] = cache + await test_endpoint(session, "md", url, token, params) + await asyncio.sleep(1) # Be nice to the server + + print("\n=== Testing LLM Endpoint ===") + for url in urls: + # Test basic extraction (direct response now) + result = await test_endpoint( + session, + "llm", + url, + token, + {"q": "Extract title and main content"} + ) + + # Test with schema (direct response) + schema = { + "type": "object", + "properties": { + "title": {"type": "string"}, + "content": {"type": "string"}, + "links": {"type": "array", "items": {"type": "string"}} + } + } + result = await test_endpoint( + session, + "llm", + url, + token, + { + "q": "Extract content with links", + "s": json.dumps(schema), + "c": "1" # Test with cache + } + ) + await asyncio.sleep(2) # Be nice to the server + + print("\n=== Testing Error Cases ===") + # Test invalid URL + await test_endpoint( + session, + "md", + "not_a_real_url", + token, + expected_status=500 + ) + + # Test invalid filter type + await test_endpoint( + session, + "md", + "example.com", + token, + {"f": "invalid"}, + expected_status=422 + ) + + # Test LLM without query (should fail per your server logic) + await test_endpoint( + session, + "llm", + "example.com", + token, + expected_status=400 + ) + + print("\nAll tests completed!") + +if __name__ == "__main__": + asyncio.run(run_tests()) \ No newline at end of file diff --git a/docs/examples/docker_python_sdk_example.py b/docs/examples/docker_python_sdk_example.py new file mode 100644 index 00000000..72091da0 --- /dev/null +++ b/docs/examples/docker_python_sdk_example.py @@ -0,0 +1,35 @@ +import asyncio +from crawl4ai.docker_client import Crawl4aiDockerClient +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig +) + +async def main(): + async with Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=True) as client: + # If jwt is enabled, authenticate first + # await client.authenticate("test@example.com") + + # Non-streaming crawl + results = await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=CrawlerRunConfig() + ) + print(f"Non-streaming results: {results}") + + # Streaming crawl + crawler_config = CrawlerRunConfig(stream=True) + async for result in await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=crawler_config + ): + print(f"Streamed result: {result}") + + # Get schema + schema = await client.get_schema() + print(f"Schema: {schema}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/docker/test_dockerclient.py b/tests/docker/test_dockerclient.py new file mode 100644 index 00000000..cba6c4c9 --- /dev/null +++ b/tests/docker/test_dockerclient.py @@ -0,0 +1,34 @@ +import asyncio +from crawl4ai.docker_client import Crawl4aiDockerClient +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig +) + +async def main(): + async with Crawl4aiDockerClient(base_url="http://localhost:8000", verbose=True) as client: + await client.authenticate("test@example.com") + + # Non-streaming crawl + results = await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=CrawlerRunConfig() + ) + print(f"Non-streaming results: {results}") + + # Streaming crawl + crawler_config = CrawlerRunConfig(stream=True) + async for result in await client.crawl( + ["https://example.com", "https://python.org"], + browser_config=BrowserConfig(headless=True), + crawler_config=crawler_config + ): + print(f"Streamed result: {result}") + + # Get schema + schema = await client.get_schema() + print(f"Schema: {schema}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/docker/test_server_token.py b/tests/docker/test_server_token.py new file mode 100644 index 00000000..d8c7df89 --- /dev/null +++ b/tests/docker/test_server_token.py @@ -0,0 +1,212 @@ +import asyncio +import json +from typing import Optional +from urllib.parse import quote + +async def get_token(session, email: str = "test@example.com") -> str: + """Fetch a JWT token from the /token endpoint.""" + url = "http://localhost:8000/token" + payload = {"email": email} + print(f"\nFetching token from {url} with email: {email}") + try: + async with session.post(url, json=payload) as response: + status = response.status + data = await response.json() + print(f"Token Response Status: {status}") + print(f"Token Response: {json.dumps(data, indent=2)}") + if status == 200: + return data["access_token"] + else: + raise Exception(f"Failed to get token: {data.get('detail', 'Unknown error')}") + except Exception as e: + print(f"Error fetching token: {str(e)}") + raise + +async def test_endpoint( + session, + endpoint: str, + url: str, + token: str, + params: Optional[dict] = None, + expected_status: int = 200 +) -> Optional[dict]: + """Test an endpoint with token and print results.""" + params = params or {} + param_str = "&".join(f"{k}={v}" for k, v in params.items()) + full_url = f"http://localhost:8000/{endpoint}/{quote(url)}" + if param_str: + full_url += f"?{param_str}" + + headers = {"Authorization": f"Bearer {token}"} + print(f"\nTesting: {full_url}") + + try: + async with session.get(full_url, headers=headers) as response: + status = response.status + try: + data = await response.json() + except: + data = await response.text() + + print(f"Status: {status} (Expected: {expected_status})") + if isinstance(data, dict): + print(f"Response: {json.dumps(data, indent=2)}") + else: + print(f"Response: {data[:500]}...") # First 500 chars + assert status == expected_status, f"Expected {expected_status}, got {status}" + return data + except Exception as e: + print(f"Error: {str(e)}") + return None + + +async def test_stream_crawl(session, token: str): + """Test the /crawl/stream endpoint with multiple URLs.""" + url = "http://localhost:8000/crawl/stream" + payload = { + "urls": [ + "https://example.com", + "https://example.com/page1", # Replicated example.com with variation + "https://example.com/page2", # Replicated example.com with variation + "https://example.com/page3", # Replicated example.com with variation + # "https://www.python.org", + # "https://news.ycombinator.com/news" + ], + "browser_config": {"headless": True, "viewport": {"width": 1200}}, + "crawler_config": {"stream": True, "cache_mode": "aggressive"} + } + headers = {"Authorization": f"Bearer {token}"} + print(f"\nTesting Streaming Crawl: {url}") + print(f"Payload: {json.dumps(payload, indent=2)}") + + try: + async with session.post(url, json=payload, headers=headers) as response: + status = response.status + print(f"Status: {status} (Expected: 200)") + assert status == 200, f"Expected 200, got {status}" + + # Read streaming response line-by-line (NDJSON) + async for line in response.content: + if line: + data = json.loads(line.decode('utf-8').strip()) + print(f"Streamed Result: {json.dumps(data, indent=2)}") + except Exception as e: + print(f"Error in streaming crawl test: {str(e)}") + +async def run_tests(): + import aiohttp + print("Starting API Tests...") + + # Test URLs + urls = [ + "example.com", + "https://www.python.org", + "https://news.ycombinator.com/news", + "https://github.com/trending" + ] + + async with aiohttp.ClientSession() as session: + # Fetch token once and reuse it + token = await get_token(session) + if not token: + print("Aborting tests due to token failure!") + return + + print("\n=== Testing Crawl Endpoint ===") + crawl_payload = { + "urls": ["https://example.com"], + "browser_config": {"headless": True}, + "crawler_config": {"stream": False} + } + async with session.post( + "http://localhost:8000/crawl", + json=crawl_payload, + headers={"Authorization": f"Bearer {token}"} + ) as response: + status = response.status + data = await response.json() + print(f"\nCrawl Endpoint Status: {status}") + print(f"Crawl Response: {json.dumps(data, indent=2)}") + + + print("\n=== Testing Crawl Stream Endpoint ===") + await test_stream_crawl(session, token) + + print("\n=== Testing Markdown Endpoint ===") + for url in []: #urls: + for filter_type in ["raw", "fit", "bm25", "llm"]: + params = {"f": filter_type} + if filter_type in ["bm25", "llm"]: + params["q"] = "extract main content" + + for cache in ["0", "1"]: + params["c"] = cache + await test_endpoint(session, "md", url, token, params) + await asyncio.sleep(1) # Be nice to the server + + print("\n=== Testing LLM Endpoint ===") + for url in urls: + # Test basic extraction (direct response now) + result = await test_endpoint( + session, + "llm", + url, + token, + {"q": "Extract title and main content"} + ) + + # Test with schema (direct response) + schema = { + "type": "object", + "properties": { + "title": {"type": "string"}, + "content": {"type": "string"}, + "links": {"type": "array", "items": {"type": "string"}} + } + } + result = await test_endpoint( + session, + "llm", + url, + token, + { + "q": "Extract content with links", + "s": json.dumps(schema), + "c": "1" # Test with cache + } + ) + await asyncio.sleep(2) # Be nice to the server + + print("\n=== Testing Error Cases ===") + # Test invalid URL + await test_endpoint( + session, + "md", + "not_a_real_url", + token, + expected_status=500 + ) + + # Test invalid filter type + await test_endpoint( + session, + "md", + "example.com", + token, + {"f": "invalid"}, + expected_status=422 + ) + + # Test LLM without query (should fail per your server logic) + await test_endpoint( + session, + "llm", + "example.com", + token, + expected_status=400 + ) + + print("\nAll tests completed!") + +if __name__ == "__main__": + asyncio.run(run_tests()) \ No newline at end of file