feat(docker): add Docker deployment configuration and API server

Add Docker deployment setup with FastAPI server implementation for Crawl4AI:
- Create Dockerfile with Python 3.10 and Playwright dependencies
- Implement FastAPI server with streaming and non-streaming endpoints
- Add request/response models and JSON serialization
- Include test script for API verification

Also includes:
- Update .gitignore for Continue development files
- Add project rules in .continuerules
- Clean up async_dispatcher.py formatting
This commit is contained in:
UncleCode
2025-01-31 15:22:21 +08:00
parent f81712eb91
commit ce4f04dad2
9 changed files with 426 additions and 638 deletions

4
.continuerules Normal file
View File

@@ -0,0 +1,4 @@
- This project Crawl4ai, I am very sensitive to any change, make sure to follow my instruction closely.
- Never apply changes I never asked, you are not 100$ free agent to do whatever you want.
- All changes must be relevant to the requests I asked for.
- Do not jumpt tp make changes first share your plane and explain to me what you wanna do.

23
.gitignore vendored
View File

@@ -229,5 +229,24 @@ tree.md
.codeiumignore
todo/
# windsurf rules
.windsurfrules
# Continue development files
.continue/
.continuerc.json
continue.lock
continue_core.log
contextProviders/
continue_workspace/
.continue-cache/
continue_config.json
# Continue temporary files
.continue-temp/
.continue-logs/
.continue-downloads/
# Continue VS Code specific
.vscode-continue/
.vscode-continue-cache/
.prompts/

View File

@@ -25,7 +25,6 @@ import random
from abc import ABC, abstractmethod
class RateLimiter:
def __init__(
self,
@@ -162,22 +161,22 @@ class CrawlerMonitor:
table.add_row(
"[yellow]In Queue[/yellow]",
str(queued),
f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
f"{(queued / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[blue]In Progress[/blue]",
str(in_progress),
f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
f"{(in_progress / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[green]Completed[/green]",
str(completed),
f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
f"{(completed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[red]Failed[/red]",
str(failed),
f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
f"{(failed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
)
# Add memory information
@@ -420,59 +419,59 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
urls: List[str],
crawler: "AsyncWebCrawler", # noqa: F821
config: CrawlerRunConfig,
) -> List[CrawlerTaskResult]:
self.crawler = crawler
) -> List[CrawlerTaskResult]:
self.crawler = crawler
if self.monitor:
self.monitor.start()
if self.monitor:
self.monitor.start()
try:
pending_tasks = []
active_tasks = []
task_queue = []
try:
pending_tasks = []
active_tasks = []
task_queue = []
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
task_queue.append((url, task_id))
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
task_queue.append((url, task_id))
while task_queue or active_tasks:
wait_start_time = time.time()
while len(active_tasks) < self.max_session_permit and task_queue:
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
# Check if we've exceeded the timeout
if time.time() - wait_start_time > self.memory_wait_timeout:
raise MemoryError(
f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds"
)
await asyncio.sleep(self.check_interval)
continue
url, task_id = task_queue.pop(0)
task = asyncio.create_task(self.crawl_url(url, config, task_id))
active_tasks.append(task)
if not active_tasks:
while task_queue or active_tasks:
wait_start_time = time.time()
while len(active_tasks) < self.max_session_permit and task_queue:
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
# Check if we've exceeded the timeout
if time.time() - wait_start_time > self.memory_wait_timeout:
raise MemoryError(
f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds"
)
await asyncio.sleep(self.check_interval)
continue
done, pending = await asyncio.wait(
active_tasks, return_when=asyncio.FIRST_COMPLETED
)
url, task_id = task_queue.pop(0)
task = asyncio.create_task(self.crawl_url(url, config, task_id))
active_tasks.append(task)
pending_tasks.extend(done)
active_tasks = list(pending)
if not active_tasks:
await asyncio.sleep(self.check_interval)
continue
return await asyncio.gather(*pending_tasks)
finally:
if self.monitor:
self.monitor.stop()
done, pending = await asyncio.wait(
active_tasks, return_when=asyncio.FIRST_COMPLETED
)
pending_tasks.extend(done)
active_tasks = list(pending)
return await asyncio.gather(*pending_tasks)
finally:
if self.monitor:
self.monitor.stop()
async def run_urls_stream(
self,
urls: List[str],
crawler: "AsyncWebCrawler",
crawler: "AsyncWebCrawler", # noqa: F821
config: CrawlerRunConfig,
) -> AsyncGenerator[CrawlerTaskResult, None]:
self.crawler = crawler
@@ -509,9 +508,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
# Wait for any task to complete and yield results
if active_tasks:
done, pending = await asyncio.wait(
active_tasks,
timeout=0.1,
return_when=asyncio.FIRST_COMPLETED
active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
)
for completed_task in done:
result = await completed_task
@@ -525,6 +522,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
if self.monitor:
self.monitor.stop()
class SemaphoreDispatcher(BaseDispatcher):
def __init__(
self,

View File

@@ -1,588 +0,0 @@
from typing import Dict, Optional, List, Tuple
from .async_configs import CrawlerRunConfig
from .models import (
CrawlResult,
CrawlerTaskResult,
CrawlStatus,
DisplayMode,
CrawlStats,
DomainState,
)
from rich.live import Live
from rich.table import Table
from rich.console import Console
from rich import box
from datetime import datetime, timedelta
import time
import psutil
import asyncio
import uuid
from urllib.parse import urlparse
import random
from abc import ABC, abstractmethod
class RateLimiter:
def __init__(
self,
base_delay: Tuple[float, float] = (1.0, 3.0),
max_delay: float = 60.0,
max_retries: int = 3,
rate_limit_codes: List[int] = None,
):
self.base_delay = base_delay
self.max_delay = max_delay
self.max_retries = max_retries
self.rate_limit_codes = rate_limit_codes or [429, 503]
self.domains: Dict[str, DomainState] = {}
def get_domain(self, url: str) -> str:
return urlparse(url).netloc
async def wait_if_needed(self, url: str) -> None:
domain = self.get_domain(url)
state = self.domains.get(domain)
if not state:
self.domains[domain] = DomainState()
state = self.domains[domain]
now = time.time()
if state.last_request_time:
wait_time = max(0, state.current_delay - (now - state.last_request_time))
if wait_time > 0:
await asyncio.sleep(wait_time)
# Random delay within base range if no current delay
if state.current_delay == 0:
state.current_delay = random.uniform(*self.base_delay)
state.last_request_time = time.time()
def update_delay(self, url: str, status_code: int) -> bool:
domain = self.get_domain(url)
state = self.domains[domain]
if status_code in self.rate_limit_codes:
state.fail_count += 1
if state.fail_count > self.max_retries:
return False
# Exponential backoff with random jitter
state.current_delay = min(
state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay
)
else:
# Gradually reduce delay on success
state.current_delay = max(
random.uniform(*self.base_delay), state.current_delay * 0.75
)
state.fail_count = 0
return True
class CrawlerMonitor:
def __init__(
self,
max_visible_rows: int = 15,
display_mode: DisplayMode = DisplayMode.DETAILED,
):
self.console = Console()
self.max_visible_rows = max_visible_rows
self.display_mode = display_mode
self.stats: Dict[str, CrawlStats] = {}
self.process = psutil.Process()
self.start_time = datetime.now()
self.live = Live(self._create_table(), refresh_per_second=2)
def start(self):
self.live.start()
def stop(self):
self.live.stop()
def add_task(self, task_id: str, url: str):
self.stats[task_id] = CrawlStats(
task_id=task_id, url=url, status=CrawlStatus.QUEUED
)
self.live.update(self._create_table())
def update_task(self, task_id: str, **kwargs):
if task_id in self.stats:
for key, value in kwargs.items():
setattr(self.stats[task_id], key, value)
self.live.update(self._create_table())
def _create_aggregated_table(self) -> Table:
"""Creates a compact table showing only aggregated statistics"""
table = Table(
box=box.ROUNDED,
title="Crawler Status Overview",
title_style="bold magenta",
header_style="bold blue",
show_lines=True,
)
# Calculate statistics
total_tasks = len(self.stats)
queued = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED
)
in_progress = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS
)
completed = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED
)
failed = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED
)
# Memory statistics
current_memory = self.process.memory_info().rss / (1024 * 1024)
total_task_memory = sum(stat.memory_usage for stat in self.stats.values())
peak_memory = max(
(stat.peak_memory for stat in self.stats.values()), default=0.0
)
# Duration
duration = datetime.now() - self.start_time
# Create status row
table.add_column("Status", style="bold cyan")
table.add_column("Count", justify="right")
table.add_column("Percentage", justify="right")
table.add_row("Total Tasks", str(total_tasks), "100%")
table.add_row(
"[yellow]In Queue[/yellow]",
str(queued),
f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[blue]In Progress[/blue]",
str(in_progress),
f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[green]Completed[/green]",
str(completed),
f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
)
table.add_row(
"[red]Failed[/red]",
str(failed),
f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%",
)
# Add memory information
table.add_section()
table.add_row(
"[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", ""
)
table.add_row(
"[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", ""
)
table.add_row(
"[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", ""
)
table.add_row(
"[yellow]Runtime[/yellow]",
str(timedelta(seconds=int(duration.total_seconds()))),
"",
)
return table
def _create_detailed_table(self) -> Table:
table = Table(
box=box.ROUNDED,
title="Crawler Performance Monitor",
title_style="bold magenta",
header_style="bold blue",
)
# Add columns
table.add_column("Task ID", style="cyan", no_wrap=True)
table.add_column("URL", style="cyan", no_wrap=True)
table.add_column("Status", style="bold")
table.add_column("Memory (MB)", justify="right")
table.add_column("Peak (MB)", justify="right")
table.add_column("Duration", justify="right")
table.add_column("Info", style="italic")
# Add summary row
total_memory = sum(stat.memory_usage for stat in self.stats.values())
active_count = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS
)
completed_count = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED
)
failed_count = sum(
1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED
)
table.add_row(
"[bold yellow]SUMMARY",
f"Total: {len(self.stats)}",
f"Active: {active_count}",
f"{total_memory:.1f}",
f"{self.process.memory_info().rss / (1024 * 1024):.1f}",
str(
timedelta(
seconds=int((datetime.now() - self.start_time).total_seconds())
)
),
f"{completed_count}{failed_count}",
style="bold",
)
table.add_section()
# Add rows for each task
visible_stats = sorted(
self.stats.values(),
key=lambda x: (
x.status != CrawlStatus.IN_PROGRESS,
x.status != CrawlStatus.QUEUED,
x.end_time or datetime.max,
),
)[: self.max_visible_rows]
for stat in visible_stats:
status_style = {
CrawlStatus.QUEUED: "white",
CrawlStatus.IN_PROGRESS: "yellow",
CrawlStatus.COMPLETED: "green",
CrawlStatus.FAILED: "red",
}[stat.status]
table.add_row(
stat.task_id[:8], # Show first 8 chars of task ID
stat.url[:40] + "..." if len(stat.url) > 40 else stat.url,
f"[{status_style}]{stat.status.value}[/{status_style}]",
f"{stat.memory_usage:.1f}",
f"{stat.peak_memory:.1f}",
stat.duration,
stat.error_message[:40] if stat.error_message else "",
)
return table
def _create_table(self) -> Table:
"""Creates the appropriate table based on display mode"""
if self.display_mode == DisplayMode.AGGREGATED:
return self._create_aggregated_table()
return self._create_detailed_table()
class BaseDispatcher(ABC):
def __init__(
self,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
self.crawler = None
self._domain_last_hit: Dict[str, float] = {}
self.concurrent_sessions = 0
self.rate_limiter = rate_limiter
self.monitor = monitor
@abstractmethod
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
task_id: str,
monitor: Optional[CrawlerMonitor] = None,
) -> CrawlerTaskResult:
pass
@abstractmethod
async def run_urls(
self,
urls: List[str],
crawler: "AsyncWebCrawler", # noqa: F821
config: CrawlerRunConfig,
monitor: Optional[CrawlerMonitor] = None,
) -> List[CrawlerTaskResult]:
pass
class MemoryAdaptiveDispatcher(BaseDispatcher):
def __init__(
self,
memory_threshold_percent: float = 90.0,
check_interval: float = 1.0,
max_session_permit: int = 20,
memory_wait_timeout: float = 300.0, # 5 minutes default timeout
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
super().__init__(rate_limiter, monitor)
self.memory_threshold_percent = memory_threshold_percent
self.check_interval = check_interval
self.max_session_permit = max_session_permit
self.memory_wait_timeout = memory_wait_timeout
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
task_id: str,
) -> CrawlerTaskResult:
start_time = datetime.now()
error_message = ""
memory_usage = peak_memory = 0.0
try:
if self.monitor:
self.monitor.update_task(
task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time
)
self.concurrent_sessions += 1
if self.rate_limiter:
await self.rate_limiter.wait_if_needed(url)
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
result = await self.crawler.arun(url, config=config, session_id=task_id)
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=datetime.now(),
error_message=error_message,
)
if not result.success:
error_message = result.error_message
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
elif self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
except Exception as e:
error_message = str(e)
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
result = CrawlResult(
url=url, html="", metadata={}, success=False, error_message=str(e)
)
finally:
end_time = datetime.now()
if self.monitor:
self.monitor.update_task(
task_id,
end_time=end_time,
memory_usage=memory_usage,
peak_memory=peak_memory,
error_message=error_message,
)
self.concurrent_sessions -= 1
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=end_time,
error_message=error_message,
)
async def run_urls(
self,
urls: List[str],
crawler: "AsyncWebCrawler", # noqa: F821
config: CrawlerRunConfig,
) -> List[CrawlerTaskResult]:
self.crawler = crawler
if self.monitor:
self.monitor.start()
try:
pending_tasks = []
active_tasks = []
task_queue = []
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
task_queue.append((url, task_id))
while task_queue or active_tasks:
wait_start_time = time.time()
while len(active_tasks) < self.max_session_permit and task_queue:
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
# Check if we've exceeded the timeout
if time.time() - wait_start_time > self.memory_wait_timeout:
raise MemoryError(
f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds"
)
await asyncio.sleep(self.check_interval)
continue
url, task_id = task_queue.pop(0)
task = asyncio.create_task(self.crawl_url(url, config, task_id))
active_tasks.append(task)
if not active_tasks:
await asyncio.sleep(self.check_interval)
continue
done, pending = await asyncio.wait(
active_tasks, return_when=asyncio.FIRST_COMPLETED
)
pending_tasks.extend(done)
active_tasks = list(pending)
return await asyncio.gather(*pending_tasks)
finally:
if self.monitor:
self.monitor.stop()
class SemaphoreDispatcher(BaseDispatcher):
def __init__(
self,
semaphore_count: int = 5,
max_session_permit: int = 20,
rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None,
):
super().__init__(rate_limiter, monitor)
self.semaphore_count = semaphore_count
self.max_session_permit = max_session_permit
async def crawl_url(
self,
url: str,
config: CrawlerRunConfig,
task_id: str,
semaphore: asyncio.Semaphore = None,
) -> CrawlerTaskResult:
start_time = datetime.now()
error_message = ""
memory_usage = peak_memory = 0.0
try:
if self.monitor:
self.monitor.update_task(
task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time
)
if self.rate_limiter:
await self.rate_limiter.wait_if_needed(url)
async with semaphore:
process = psutil.Process()
start_memory = process.memory_info().rss / (1024 * 1024)
result = await self.crawler.arun(url, config=config, session_id=task_id)
end_memory = process.memory_info().rss / (1024 * 1024)
memory_usage = peak_memory = end_memory - start_memory
if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=datetime.now(),
error_message=error_message,
)
if not result.success:
error_message = result.error_message
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
elif self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
except Exception as e:
error_message = str(e)
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
result = CrawlResult(
url=url, html="", metadata={}, success=False, error_message=str(e)
)
finally:
end_time = datetime.now()
if self.monitor:
self.monitor.update_task(
task_id,
end_time=end_time,
memory_usage=memory_usage,
peak_memory=peak_memory,
error_message=error_message,
)
return CrawlerTaskResult(
task_id=task_id,
url=url,
result=result,
memory_usage=memory_usage,
peak_memory=peak_memory,
start_time=start_time,
end_time=end_time,
error_message=error_message,
)
async def run_urls(
self,
crawler: "AsyncWebCrawler", # noqa: F821
urls: List[str],
config: CrawlerRunConfig,
) -> List[CrawlerTaskResult]:
self.crawler = crawler
if self.monitor:
self.monitor.start()
try:
semaphore = asyncio.Semaphore(self.semaphore_count)
tasks = []
for url in urls:
task_id = str(uuid.uuid4())
if self.monitor:
self.monitor.add_task(task_id, url)
task = asyncio.create_task(
self.crawl_url(url, config, task_id, semaphore)
)
tasks.append(task)
return await asyncio.gather(*tasks, return_exceptions=True)
finally:
if self.monitor:
self.monitor.stop()

18
deploy/docker/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM python:3.10-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
wget \
&& rm -rf /var/lib/apt/lists/*
# Install Playwright dependencies
RUN playwright install --with-deps chromium
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]

78
deploy/docker/models.py Normal file
View File

@@ -0,0 +1,78 @@
from typing import List, Optional, Any, Dict
from pydantic import BaseModel
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
DefaultMarkdownGenerator,
PruningContentFilter,
BM25ContentFilter,
LLMContentFilter,
# Add other strategy classes as needed
)
class StrategyConfig(BaseModel):
"""Base class for strategy configurations"""
type: str
params: Dict[str, Any]
def create_instance(self):
"""Convert config to actual strategy instance"""
strategy_mappings = {
# Markdown Generators
'DefaultMarkdownGenerator': DefaultMarkdownGenerator,
# Content Filters
'PruningContentFilter': PruningContentFilter,
'BM25ContentFilter': BM25ContentFilter,
'LLMContentFilter': LLMContentFilter,
# Add other mappings as needed
# 'CustomStrategy': CustomStrategyClass,
}
strategy_class = strategy_mappings.get(self.type)
if not strategy_class:
raise ValueError(f"Unknown strategy type: {self.type}")
# Handle nested strategy configurations
processed_params = {}
for key, value in self.params.items():
if isinstance(value, dict) and 'type' in value:
# Recursively create nested strategy instances
nested_config = StrategyConfig(type=value['type'], params=value.get('params', {}))
processed_params[key] = nested_config.create_instance()
else:
processed_params[key] = value
return strategy_class(**processed_params)
class CrawlRequest(BaseModel):
urls: List[str]
browser_config: Optional[dict] = None
crawler_config: Optional[dict] = None
def get_configs(self):
"""Enhanced conversion of dicts to config objects"""
browser_config = BrowserConfig.from_kwargs(self.browser_config or {})
crawler_dict = self.crawler_config or {}
# Process strategy configurations
for key, value in crawler_dict.items():
if isinstance(value, dict) and 'type' in value:
# Convert strategy configuration to actual instance
strategy_config = StrategyConfig(
type=value['type'],
params=value.get('params', {})
)
crawler_dict[key] = strategy_config.create_instance()
crawler_config = CrawlerRunConfig.from_kwargs(crawler_dict)
return browser_config, crawler_config
class CrawlResponse(BaseModel):
success: bool
results: List[dict] # Will contain serialized CrawlResults
class Config:
arbitrary_types_allowed = True

View File

@@ -0,0 +1,3 @@
crawl4ai
fastapi
uvicorn

148
deploy/docker/server.py Normal file
View File

@@ -0,0 +1,148 @@
# pyright: ignore
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import json
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from crawl4ai import (
BrowserConfig,
CrawlerRunConfig,
AsyncWebCrawler,
MemoryAdaptiveDispatcher,
RateLimiter,
)
from .models import CrawlRequest, CrawlResponse
class CrawlJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder for crawler results"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode('utf-8', errors='ignore')
if hasattr(obj, 'model_dump'):
return obj.model_dump()
if hasattr(obj, '__dict__'):
return {k: v for k, v in obj.__dict__.items() if not k.startswith('_')}
return str(obj) # Fallback to string representation
def serialize_result(result) -> dict:
"""Safely serialize a crawler result"""
try:
# Convert to dict handling special cases
if hasattr(result, 'model_dump'):
result_dict = result.model_dump()
else:
result_dict = {
k: v for k, v in result.__dict__.items()
if not k.startswith('_')
}
# Remove known non-serializable objects
result_dict.pop('ssl_certificate', None)
result_dict.pop('downloaded_files', None)
return result_dict
except Exception as e:
print(f"Error serializing result: {e}")
return {"error": str(e), "url": getattr(result, 'url', 'unknown')}
app = FastAPI(title="Crawl4AI API")
async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]:
"""Stream results and manage crawler lifecycle"""
try:
async for result in results_gen:
try:
# Handle serialization of result
result_dict = serialize_result(result)
# Remove non-serializable objects
print(f"Streaming result for URL: {result_dict['url']}, Success: {result_dict['success']}")
yield (json.dumps(result_dict, cls=CrawlJSONEncoder) + "\n").encode('utf-8')
except Exception as e:
# Log error but continue streaming
print(f"Error serializing result: {e}")
error_response = {
"error": str(e),
"url": getattr(result, 'url', 'unknown')
}
yield (json.dumps(error_response) + "\n").encode('utf-8')
except asyncio.CancelledError:
# Handle client disconnection gracefully
print("Client disconnected, cleaning up...")
finally:
# Ensure crawler cleanup happens in all cases
try:
await crawler.close()
except Exception as e:
print(f"Error closing crawler: {e}")
@app.post("/crawl")
async def crawl(request: CrawlRequest):
browser_config, crawler_config = request.get_configs()
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=75.0,
rate_limiter=RateLimiter(base_delay=(1.0, 2.0)),
# monitor=CrawlerMonitor(display_mode=DisplayMode.DETAILED)
)
try:
if crawler_config.stream:
# For streaming, manage crawler lifecycle manually
crawler = AsyncWebCrawler(config=browser_config)
await crawler.start()
results_gen = await crawler.arun_many(
urls=request.urls,
config=crawler_config,
dispatcher=dispatcher
)
return StreamingResponse(
stream_results(crawler, results_gen),
media_type='application/x-ndjson'
)
else:
# For non-streaming, use context manager
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=request.urls,
config=crawler_config,
dispatcher=dispatcher
)
# Handle serialization of results
results_dict = []
for result in results:
try:
result_dict = {
k: v for k, v in (result.model_dump() if hasattr(result, 'model_dump')
else result.__dict__).items()
if not k.startswith('_')
}
result_dict.pop('ssl_certificate', None)
result_dict.pop('downloaded_files', None)
results_dict.append(result_dict)
except Exception as e:
print(f"Error serializing result: {e}")
continue
return CrawlResponse(success=True, results=results_dict)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/schema")
async def get_schema():
"""Return config schemas for client validation"""
return {
"browser": BrowserConfig.model_json_schema(),
"crawler": CrawlerRunConfig.model_json_schema()
}
if __name__ == "__main__":
import uvicorn
# Run in auto reload mode
# WARNING: You must pass the application as an import string to enable 'reload' or 'workers'.
uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True)

108
deploy/docker/test.py Normal file
View File

@@ -0,0 +1,108 @@
import httpx
import asyncio
import json
async def test_regular():
"""Test non-streaming API call"""
async with httpx.AsyncClient() as client:
response = await client.post("http://localhost:8000/crawl", json={
"urls": ["https://example.com"] * 3, # Test with 3 identical URLs
"browser_config": {
"headless": True,
"verbose": False
},
"crawler_config": {
"cache_mode": "BYPASS",
"stream": False
}
})
results = response.json()
print("\nRegular Response:")
print(f"Got {len(results['results'])} results at once")
for result in results['results']:
print(f"URL: {result['url']}, Success: {result['success']}")
async def test_streaming():
"""Test streaming API call"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
"http://localhost:8000/crawl",
json={
"urls": ["https://example.com"] * 3,
"browser_config": {
"headless": True,
"verbose": False
},
"crawler_config": {
"cache_mode": "BYPASS",
"stream": True
}
},
timeout=30.0
)
print("\nStreaming Response:")
async for line in response.aiter_lines():
if line.strip():
try:
result = json.loads(line)
print(f"Received result for URL: {result['url']}, Success: {result['success']}")
except json.JSONDecodeError as e:
print(f"Error decoding response: {e}")
continue
except Exception as e:
print(f"Error during streaming: {e}")
async def test_complex_config():
"""Test API with complex nested configurations"""
async with httpx.AsyncClient() as client:
response = await client.post("http://localhost:8000/crawl",
timeout=30.0, json={
"urls": ["https://en.wikipedia.org/wiki/Apple"],
"browser_config": {
"headless": True,
"verbose": False
},
"crawler_config": {
"cache_mode": "BYPASS",
"excluded_tags": ["nav", "footer", "aside"],
"remove_overlay_elements": True,
"markdown_generator": {
"type": "DefaultMarkdownGenerator",
"params": {
"content_filter": {
"type": "PruningContentFilter",
"params": {
"threshold": 0.48,
"threshold_type": "fixed",
"min_word_threshold": 0
}
},
"options": {"ignore_links": True}
}
}
}
})
result = response.json()
if result['success']:
for r in result['results']:
print(f"Full Markdown Length: {len(r['markdown_v2']['raw_markdown'])}")
print(f"Fit Markdown Length: {len(r['markdown_v2']['fit_markdown'])}")
async def main():
"""Run both tests"""
print("Testing Crawl4AI API...")
# print("\n1. Testing regular (non-streaming) endpoint...")
# await test_regular()
# print("\n2. Testing streaming endpoint...")
# await test_streaming()
print("\n3. Testing complex configuration...")
await test_complex_config()
if __name__ == "__main__":
asyncio.run(main())