diff --git a/.continuerules b/.continuerules new file mode 100644 index 00000000..d16f5b48 --- /dev/null +++ b/.continuerules @@ -0,0 +1,4 @@ +- This project Crawl4ai, I am very sensitive to any change, make sure to follow my instruction closely. +- Never apply changes I never asked, you are not 100$ free agent to do whatever you want. +- All changes must be relevant to the requests I asked for. +- Do not jumpt tp make changes first share your plane and explain to me what you wanna do. \ No newline at end of file diff --git a/.gitignore b/.gitignore index 302892e4..5d39e6e9 100644 --- a/.gitignore +++ b/.gitignore @@ -229,5 +229,24 @@ tree.md .codeiumignore todo/ -# windsurf rules -.windsurfrules +# Continue development files +.continue/ +.continuerc.json +continue.lock +continue_core.log +contextProviders/ +continue_workspace/ +.continue-cache/ +continue_config.json + +# Continue temporary files +.continue-temp/ +.continue-logs/ +.continue-downloads/ + +# Continue VS Code specific +.vscode-continue/ +.vscode-continue-cache/ + +.prompts/ + diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index ed40b8b4..3f7fbe39 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -25,7 +25,6 @@ import random from abc import ABC, abstractmethod - class RateLimiter: def __init__( self, @@ -162,22 +161,22 @@ class CrawlerMonitor: table.add_row( "[yellow]In Queue[/yellow]", str(queued), - f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + f"{(queued / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", ) table.add_row( "[blue]In Progress[/blue]", str(in_progress), - f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + f"{(in_progress / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", ) table.add_row( "[green]Completed[/green]", str(completed), - f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + f"{(completed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", ) table.add_row( "[red]Failed[/red]", str(failed), - f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", + f"{(failed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", ) # Add memory information @@ -420,59 +419,59 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): urls: List[str], crawler: "AsyncWebCrawler", # noqa: F821 config: CrawlerRunConfig, - ) -> List[CrawlerTaskResult]: - self.crawler = crawler + ) -> List[CrawlerTaskResult]: + self.crawler = crawler - if self.monitor: - self.monitor.start() + if self.monitor: + self.monitor.start() - try: - pending_tasks = [] - active_tasks = [] - task_queue = [] + try: + pending_tasks = [] + active_tasks = [] + task_queue = [] - for url in urls: - task_id = str(uuid.uuid4()) - if self.monitor: - self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + task_queue.append((url, task_id)) - while task_queue or active_tasks: - wait_start_time = time.time() - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - # Check if we've exceeded the timeout - if time.time() - wait_start_time > self.memory_wait_timeout: - raise MemoryError( - f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" - ) - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks: + while task_queue or active_tasks: + wait_start_time = time.time() + while len(active_tasks) < self.max_session_permit and task_queue: + if psutil.virtual_memory().percent >= self.memory_threshold_percent: + # Check if we've exceeded the timeout + if time.time() - wait_start_time > self.memory_wait_timeout: + raise MemoryError( + f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" + ) await asyncio.sleep(self.check_interval) continue - done, pending = await asyncio.wait( - active_tasks, return_when=asyncio.FIRST_COMPLETED - ) + url, task_id = task_queue.pop(0) + task = asyncio.create_task(self.crawl_url(url, config, task_id)) + active_tasks.append(task) - pending_tasks.extend(done) - active_tasks = list(pending) + if not active_tasks: + await asyncio.sleep(self.check_interval) + continue - return await asyncio.gather(*pending_tasks) - finally: - if self.monitor: - self.monitor.stop() + done, pending = await asyncio.wait( + active_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + pending_tasks.extend(done) + active_tasks = list(pending) + + return await asyncio.gather(*pending_tasks) + finally: + if self.monitor: + self.monitor.stop() async def run_urls_stream( self, urls: List[str], - crawler: "AsyncWebCrawler", + crawler: "AsyncWebCrawler", # noqa: F821 config: CrawlerRunConfig, ) -> AsyncGenerator[CrawlerTaskResult, None]: self.crawler = crawler @@ -509,9 +508,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): # Wait for any task to complete and yield results if active_tasks: done, pending = await asyncio.wait( - active_tasks, - timeout=0.1, - return_when=asyncio.FIRST_COMPLETED + active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED ) for completed_task in done: result = await completed_task @@ -525,6 +522,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): if self.monitor: self.monitor.stop() + class SemaphoreDispatcher(BaseDispatcher): def __init__( self, diff --git a/crawl4ai/async_dispatcher_.py b/crawl4ai/async_dispatcher_.py deleted file mode 100644 index 64578bf6..00000000 --- a/crawl4ai/async_dispatcher_.py +++ /dev/null @@ -1,588 +0,0 @@ -from typing import Dict, Optional, List, Tuple -from .async_configs import CrawlerRunConfig -from .models import ( - CrawlResult, - CrawlerTaskResult, - CrawlStatus, - DisplayMode, - CrawlStats, - DomainState, -) - -from rich.live import Live -from rich.table import Table -from rich.console import Console -from rich import box -from datetime import datetime, timedelta - -import time -import psutil -import asyncio -import uuid - -from urllib.parse import urlparse -import random -from abc import ABC, abstractmethod - - -class RateLimiter: - def __init__( - self, - base_delay: Tuple[float, float] = (1.0, 3.0), - max_delay: float = 60.0, - max_retries: int = 3, - rate_limit_codes: List[int] = None, - ): - self.base_delay = base_delay - self.max_delay = max_delay - self.max_retries = max_retries - self.rate_limit_codes = rate_limit_codes or [429, 503] - self.domains: Dict[str, DomainState] = {} - - def get_domain(self, url: str) -> str: - return urlparse(url).netloc - - async def wait_if_needed(self, url: str) -> None: - domain = self.get_domain(url) - state = self.domains.get(domain) - - if not state: - self.domains[domain] = DomainState() - state = self.domains[domain] - - now = time.time() - if state.last_request_time: - wait_time = max(0, state.current_delay - (now - state.last_request_time)) - if wait_time > 0: - await asyncio.sleep(wait_time) - - # Random delay within base range if no current delay - if state.current_delay == 0: - state.current_delay = random.uniform(*self.base_delay) - - state.last_request_time = time.time() - - def update_delay(self, url: str, status_code: int) -> bool: - domain = self.get_domain(url) - state = self.domains[domain] - - if status_code in self.rate_limit_codes: - state.fail_count += 1 - if state.fail_count > self.max_retries: - return False - - # Exponential backoff with random jitter - state.current_delay = min( - state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay - ) - else: - # Gradually reduce delay on success - state.current_delay = max( - random.uniform(*self.base_delay), state.current_delay * 0.75 - ) - state.fail_count = 0 - - return True - - -class CrawlerMonitor: - def __init__( - self, - max_visible_rows: int = 15, - display_mode: DisplayMode = DisplayMode.DETAILED, - ): - self.console = Console() - self.max_visible_rows = max_visible_rows - self.display_mode = display_mode - self.stats: Dict[str, CrawlStats] = {} - self.process = psutil.Process() - self.start_time = datetime.now() - self.live = Live(self._create_table(), refresh_per_second=2) - - def start(self): - self.live.start() - - def stop(self): - self.live.stop() - - def add_task(self, task_id: str, url: str): - self.stats[task_id] = CrawlStats( - task_id=task_id, url=url, status=CrawlStatus.QUEUED - ) - self.live.update(self._create_table()) - - def update_task(self, task_id: str, **kwargs): - if task_id in self.stats: - for key, value in kwargs.items(): - setattr(self.stats[task_id], key, value) - self.live.update(self._create_table()) - - def _create_aggregated_table(self) -> Table: - """Creates a compact table showing only aggregated statistics""" - table = Table( - box=box.ROUNDED, - title="Crawler Status Overview", - title_style="bold magenta", - header_style="bold blue", - show_lines=True, - ) - - # Calculate statistics - total_tasks = len(self.stats) - queued = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED - ) - in_progress = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - # Memory statistics - current_memory = self.process.memory_info().rss / (1024 * 1024) - total_task_memory = sum(stat.memory_usage for stat in self.stats.values()) - peak_memory = max( - (stat.peak_memory for stat in self.stats.values()), default=0.0 - ) - - # Duration - duration = datetime.now() - self.start_time - - # Create status row - table.add_column("Status", style="bold cyan") - table.add_column("Count", justify="right") - table.add_column("Percentage", justify="right") - - table.add_row("Total Tasks", str(total_tasks), "100%") - table.add_row( - "[yellow]In Queue[/yellow]", - str(queued), - f"{(queued/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[blue]In Progress[/blue]", - str(in_progress), - f"{(in_progress/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[green]Completed[/green]", - str(completed), - f"{(completed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[red]Failed[/red]", - str(failed), - f"{(failed/total_tasks*100):.1f}%" if total_tasks > 0 else "0%", - ) - - # Add memory information - table.add_section() - table.add_row( - "[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", "" - ) - table.add_row( - "[yellow]Runtime[/yellow]", - str(timedelta(seconds=int(duration.total_seconds()))), - "", - ) - - return table - - def _create_detailed_table(self) -> Table: - table = Table( - box=box.ROUNDED, - title="Crawler Performance Monitor", - title_style="bold magenta", - header_style="bold blue", - ) - - # Add columns - table.add_column("Task ID", style="cyan", no_wrap=True) - table.add_column("URL", style="cyan", no_wrap=True) - table.add_column("Status", style="bold") - table.add_column("Memory (MB)", justify="right") - table.add_column("Peak (MB)", justify="right") - table.add_column("Duration", justify="right") - table.add_column("Info", style="italic") - - # Add summary row - total_memory = sum(stat.memory_usage for stat in self.stats.values()) - active_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - table.add_row( - "[bold yellow]SUMMARY", - f"Total: {len(self.stats)}", - f"Active: {active_count}", - f"{total_memory:.1f}", - f"{self.process.memory_info().rss / (1024 * 1024):.1f}", - str( - timedelta( - seconds=int((datetime.now() - self.start_time).total_seconds()) - ) - ), - f"✓{completed_count} ✗{failed_count}", - style="bold", - ) - - table.add_section() - - # Add rows for each task - visible_stats = sorted( - self.stats.values(), - key=lambda x: ( - x.status != CrawlStatus.IN_PROGRESS, - x.status != CrawlStatus.QUEUED, - x.end_time or datetime.max, - ), - )[: self.max_visible_rows] - - for stat in visible_stats: - status_style = { - CrawlStatus.QUEUED: "white", - CrawlStatus.IN_PROGRESS: "yellow", - CrawlStatus.COMPLETED: "green", - CrawlStatus.FAILED: "red", - }[stat.status] - - table.add_row( - stat.task_id[:8], # Show first 8 chars of task ID - stat.url[:40] + "..." if len(stat.url) > 40 else stat.url, - f"[{status_style}]{stat.status.value}[/{status_style}]", - f"{stat.memory_usage:.1f}", - f"{stat.peak_memory:.1f}", - stat.duration, - stat.error_message[:40] if stat.error_message else "", - ) - - return table - - def _create_table(self) -> Table: - """Creates the appropriate table based on display mode""" - if self.display_mode == DisplayMode.AGGREGATED: - return self._create_aggregated_table() - return self._create_detailed_table() - - -class BaseDispatcher(ABC): - def __init__( - self, - rate_limiter: Optional[RateLimiter] = None, - monitor: Optional[CrawlerMonitor] = None, - ): - self.crawler = None - self._domain_last_hit: Dict[str, float] = {} - self.concurrent_sessions = 0 - self.rate_limiter = rate_limiter - self.monitor = monitor - - @abstractmethod - async def crawl_url( - self, - url: str, - config: CrawlerRunConfig, - task_id: str, - monitor: Optional[CrawlerMonitor] = None, - ) -> CrawlerTaskResult: - pass - - @abstractmethod - async def run_urls( - self, - urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 - config: CrawlerRunConfig, - monitor: Optional[CrawlerMonitor] = None, - ) -> List[CrawlerTaskResult]: - pass - - -class MemoryAdaptiveDispatcher(BaseDispatcher): - def __init__( - self, - memory_threshold_percent: float = 90.0, - check_interval: float = 1.0, - max_session_permit: int = 20, - memory_wait_timeout: float = 300.0, # 5 minutes default timeout - rate_limiter: Optional[RateLimiter] = None, - monitor: Optional[CrawlerMonitor] = None, - ): - super().__init__(rate_limiter, monitor) - self.memory_threshold_percent = memory_threshold_percent - self.check_interval = check_interval - self.max_session_permit = max_session_permit - self.memory_wait_timeout = memory_wait_timeout - - async def crawl_url( - self, - url: str, - config: CrawlerRunConfig, - task_id: str, - ) -> CrawlerTaskResult: - start_time = datetime.now() - error_message = "" - memory_usage = peak_memory = 0.0 - - try: - if self.monitor: - self.monitor.update_task( - task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time - ) - self.concurrent_sessions += 1 - - if self.rate_limiter: - await self.rate_limiter.wait_if_needed(url) - - process = psutil.Process() - start_memory = process.memory_info().rss / (1024 * 1024) - result = await self.crawler.arun(url, config=config, session_id=task_id) - end_memory = process.memory_info().rss / (1024 * 1024) - - memory_usage = peak_memory = end_memory - start_memory - - if self.rate_limiter and result.status_code: - if not self.rate_limiter.update_delay(url, result.status_code): - error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - return CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=datetime.now(), - error_message=error_message, - ) - - if not result.success: - error_message = result.error_message - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - elif self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) - - except Exception as e: - error_message = str(e) - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - result = CrawlResult( - url=url, html="", metadata={}, success=False, error_message=str(e) - ) - - finally: - end_time = datetime.now() - if self.monitor: - self.monitor.update_task( - task_id, - end_time=end_time, - memory_usage=memory_usage, - peak_memory=peak_memory, - error_message=error_message, - ) - self.concurrent_sessions -= 1 - - return CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=end_time, - error_message=error_message, - ) - - async def run_urls( - self, - urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 - config: CrawlerRunConfig, - ) -> List[CrawlerTaskResult]: - self.crawler = crawler - - if self.monitor: - self.monitor.start() - - try: - pending_tasks = [] - active_tasks = [] - task_queue = [] - - for url in urls: - task_id = str(uuid.uuid4()) - if self.monitor: - self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) - - while task_queue or active_tasks: - wait_start_time = time.time() - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - # Check if we've exceeded the timeout - if time.time() - wait_start_time > self.memory_wait_timeout: - raise MemoryError( - f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" - ) - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks: - await asyncio.sleep(self.check_interval) - continue - - done, pending = await asyncio.wait( - active_tasks, return_when=asyncio.FIRST_COMPLETED - ) - - pending_tasks.extend(done) - active_tasks = list(pending) - - return await asyncio.gather(*pending_tasks) - finally: - if self.monitor: - self.monitor.stop() - - -class SemaphoreDispatcher(BaseDispatcher): - def __init__( - self, - semaphore_count: int = 5, - max_session_permit: int = 20, - rate_limiter: Optional[RateLimiter] = None, - monitor: Optional[CrawlerMonitor] = None, - ): - super().__init__(rate_limiter, monitor) - self.semaphore_count = semaphore_count - self.max_session_permit = max_session_permit - - async def crawl_url( - self, - url: str, - config: CrawlerRunConfig, - task_id: str, - semaphore: asyncio.Semaphore = None, - ) -> CrawlerTaskResult: - start_time = datetime.now() - error_message = "" - memory_usage = peak_memory = 0.0 - - try: - if self.monitor: - self.monitor.update_task( - task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time - ) - - if self.rate_limiter: - await self.rate_limiter.wait_if_needed(url) - - async with semaphore: - process = psutil.Process() - start_memory = process.memory_info().rss / (1024 * 1024) - result = await self.crawler.arun(url, config=config, session_id=task_id) - end_memory = process.memory_info().rss / (1024 * 1024) - - memory_usage = peak_memory = end_memory - start_memory - - if self.rate_limiter and result.status_code: - if not self.rate_limiter.update_delay(url, result.status_code): - error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - return CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=datetime.now(), - error_message=error_message, - ) - - if not result.success: - error_message = result.error_message - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - elif self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) - - except Exception as e: - error_message = str(e) - if self.monitor: - self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - result = CrawlResult( - url=url, html="", metadata={}, success=False, error_message=str(e) - ) - - finally: - end_time = datetime.now() - if self.monitor: - self.monitor.update_task( - task_id, - end_time=end_time, - memory_usage=memory_usage, - peak_memory=peak_memory, - error_message=error_message, - ) - - return CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=end_time, - error_message=error_message, - ) - - async def run_urls( - self, - crawler: "AsyncWebCrawler", # noqa: F821 - urls: List[str], - config: CrawlerRunConfig, - ) -> List[CrawlerTaskResult]: - self.crawler = crawler - if self.monitor: - self.monitor.start() - - try: - semaphore = asyncio.Semaphore(self.semaphore_count) - tasks = [] - - for url in urls: - task_id = str(uuid.uuid4()) - if self.monitor: - self.monitor.add_task(task_id, url) - task = asyncio.create_task( - self.crawl_url(url, config, task_id, semaphore) - ) - tasks.append(task) - - return await asyncio.gather(*tasks, return_exceptions=True) - finally: - if self.monitor: - self.monitor.stop() diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile new file mode 100644 index 00000000..864b9f27 --- /dev/null +++ b/deploy/docker/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.10-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + wget \ + && rm -rf /var/lib/apt/lists/* + +# Install Playwright dependencies +RUN playwright install --with-deps chromium + +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY . . + +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/deploy/docker/models.py b/deploy/docker/models.py new file mode 100644 index 00000000..ecf113de --- /dev/null +++ b/deploy/docker/models.py @@ -0,0 +1,78 @@ +from typing import List, Optional, Any, Dict +from pydantic import BaseModel +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig, + DefaultMarkdownGenerator, + PruningContentFilter, + BM25ContentFilter, + LLMContentFilter, + # Add other strategy classes as needed +) + +class StrategyConfig(BaseModel): + """Base class for strategy configurations""" + type: str + params: Dict[str, Any] + + def create_instance(self): + """Convert config to actual strategy instance""" + strategy_mappings = { + # Markdown Generators + 'DefaultMarkdownGenerator': DefaultMarkdownGenerator, + + # Content Filters + 'PruningContentFilter': PruningContentFilter, + 'BM25ContentFilter': BM25ContentFilter, + 'LLMContentFilter': LLMContentFilter, + + # Add other mappings as needed + # 'CustomStrategy': CustomStrategyClass, + } + + strategy_class = strategy_mappings.get(self.type) + if not strategy_class: + raise ValueError(f"Unknown strategy type: {self.type}") + + # Handle nested strategy configurations + processed_params = {} + for key, value in self.params.items(): + if isinstance(value, dict) and 'type' in value: + # Recursively create nested strategy instances + nested_config = StrategyConfig(type=value['type'], params=value.get('params', {})) + processed_params[key] = nested_config.create_instance() + else: + processed_params[key] = value + + return strategy_class(**processed_params) + +class CrawlRequest(BaseModel): + urls: List[str] + browser_config: Optional[dict] = None + crawler_config: Optional[dict] = None + + def get_configs(self): + """Enhanced conversion of dicts to config objects""" + browser_config = BrowserConfig.from_kwargs(self.browser_config or {}) + + crawler_dict = self.crawler_config or {} + + # Process strategy configurations + for key, value in crawler_dict.items(): + if isinstance(value, dict) and 'type' in value: + # Convert strategy configuration to actual instance + strategy_config = StrategyConfig( + type=value['type'], + params=value.get('params', {}) + ) + crawler_dict[key] = strategy_config.create_instance() + + crawler_config = CrawlerRunConfig.from_kwargs(crawler_dict) + return browser_config, crawler_config + +class CrawlResponse(BaseModel): + success: bool + results: List[dict] # Will contain serialized CrawlResults + + class Config: + arbitrary_types_allowed = True \ No newline at end of file diff --git a/deploy/docker/requirements.txt b/deploy/docker/requirements.txt new file mode 100644 index 00000000..9636247d --- /dev/null +++ b/deploy/docker/requirements.txt @@ -0,0 +1,3 @@ +crawl4ai +fastapi +uvicorn \ No newline at end of file diff --git a/deploy/docker/server.py b/deploy/docker/server.py new file mode 100644 index 00000000..fa7a8443 --- /dev/null +++ b/deploy/docker/server.py @@ -0,0 +1,148 @@ +# pyright: ignore +from fastapi import FastAPI, HTTPException +from fastapi.responses import StreamingResponse +import json +import asyncio +from typing import AsyncGenerator +from datetime import datetime +from crawl4ai import ( + BrowserConfig, + CrawlerRunConfig, + AsyncWebCrawler, + MemoryAdaptiveDispatcher, + RateLimiter, +) +from .models import CrawlRequest, CrawlResponse + +class CrawlJSONEncoder(json.JSONEncoder): + """Custom JSON encoder for crawler results""" + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, bytes): + return obj.decode('utf-8', errors='ignore') + if hasattr(obj, 'model_dump'): + return obj.model_dump() + if hasattr(obj, '__dict__'): + return {k: v for k, v in obj.__dict__.items() if not k.startswith('_')} + return str(obj) # Fallback to string representation + +def serialize_result(result) -> dict: + """Safely serialize a crawler result""" + try: + # Convert to dict handling special cases + if hasattr(result, 'model_dump'): + result_dict = result.model_dump() + else: + result_dict = { + k: v for k, v in result.__dict__.items() + if not k.startswith('_') + } + + # Remove known non-serializable objects + result_dict.pop('ssl_certificate', None) + result_dict.pop('downloaded_files', None) + + return result_dict + except Exception as e: + print(f"Error serializing result: {e}") + return {"error": str(e), "url": getattr(result, 'url', 'unknown')} + +app = FastAPI(title="Crawl4AI API") + +async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]: + """Stream results and manage crawler lifecycle""" + try: + async for result in results_gen: + try: + # Handle serialization of result + result_dict = serialize_result(result) + # Remove non-serializable objects + print(f"Streaming result for URL: {result_dict['url']}, Success: {result_dict['success']}") + yield (json.dumps(result_dict, cls=CrawlJSONEncoder) + "\n").encode('utf-8') + except Exception as e: + # Log error but continue streaming + print(f"Error serializing result: {e}") + error_response = { + "error": str(e), + "url": getattr(result, 'url', 'unknown') + } + yield (json.dumps(error_response) + "\n").encode('utf-8') + except asyncio.CancelledError: + # Handle client disconnection gracefully + print("Client disconnected, cleaning up...") + finally: + # Ensure crawler cleanup happens in all cases + try: + await crawler.close() + except Exception as e: + print(f"Error closing crawler: {e}") + +@app.post("/crawl") +async def crawl(request: CrawlRequest): + browser_config, crawler_config = request.get_configs() + + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=75.0, + rate_limiter=RateLimiter(base_delay=(1.0, 2.0)), + # monitor=CrawlerMonitor(display_mode=DisplayMode.DETAILED) + ) + + try: + if crawler_config.stream: + # For streaming, manage crawler lifecycle manually + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + results_gen = await crawler.arun_many( + urls=request.urls, + config=crawler_config, + dispatcher=dispatcher + ) + + return StreamingResponse( + stream_results(crawler, results_gen), + media_type='application/x-ndjson' + ) + else: + # For non-streaming, use context manager + async with AsyncWebCrawler(config=browser_config) as crawler: + results = await crawler.arun_many( + urls=request.urls, + config=crawler_config, + dispatcher=dispatcher + ) + # Handle serialization of results + results_dict = [] + for result in results: + try: + result_dict = { + k: v for k, v in (result.model_dump() if hasattr(result, 'model_dump') + else result.__dict__).items() + if not k.startswith('_') + } + result_dict.pop('ssl_certificate', None) + result_dict.pop('downloaded_files', None) + results_dict.append(result_dict) + except Exception as e: + print(f"Error serializing result: {e}") + continue + + return CrawlResponse(success=True, results=results_dict) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/schema") +async def get_schema(): + """Return config schemas for client validation""" + return { + "browser": BrowserConfig.model_json_schema(), + "crawler": CrawlerRunConfig.model_json_schema() + } + + +if __name__ == "__main__": + import uvicorn + # Run in auto reload mode + # WARNING: You must pass the application as an import string to enable 'reload' or 'workers'. + uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True) \ No newline at end of file diff --git a/deploy/docker/test.py b/deploy/docker/test.py new file mode 100644 index 00000000..c0b27ea9 --- /dev/null +++ b/deploy/docker/test.py @@ -0,0 +1,108 @@ +import httpx +import asyncio +import json + +async def test_regular(): + """Test non-streaming API call""" + async with httpx.AsyncClient() as client: + response = await client.post("http://localhost:8000/crawl", json={ + "urls": ["https://example.com"] * 3, # Test with 3 identical URLs + "browser_config": { + "headless": True, + "verbose": False + }, + "crawler_config": { + "cache_mode": "BYPASS", + "stream": False + } + }) + results = response.json() + print("\nRegular Response:") + print(f"Got {len(results['results'])} results at once") + for result in results['results']: + print(f"URL: {result['url']}, Success: {result['success']}") + +async def test_streaming(): + """Test streaming API call""" + async with httpx.AsyncClient() as client: + try: + response = await client.post( + "http://localhost:8000/crawl", + json={ + "urls": ["https://example.com"] * 3, + "browser_config": { + "headless": True, + "verbose": False + }, + "crawler_config": { + "cache_mode": "BYPASS", + "stream": True + } + }, + timeout=30.0 + ) + + print("\nStreaming Response:") + async for line in response.aiter_lines(): + if line.strip(): + try: + result = json.loads(line) + print(f"Received result for URL: {result['url']}, Success: {result['success']}") + except json.JSONDecodeError as e: + print(f"Error decoding response: {e}") + continue + except Exception as e: + print(f"Error during streaming: {e}") + +async def test_complex_config(): + """Test API with complex nested configurations""" + async with httpx.AsyncClient() as client: + response = await client.post("http://localhost:8000/crawl", + timeout=30.0, json={ + "urls": ["https://en.wikipedia.org/wiki/Apple"], + "browser_config": { + "headless": True, + "verbose": False + }, + "crawler_config": { + "cache_mode": "BYPASS", + "excluded_tags": ["nav", "footer", "aside"], + "remove_overlay_elements": True, + "markdown_generator": { + "type": "DefaultMarkdownGenerator", + "params": { + "content_filter": { + "type": "PruningContentFilter", + "params": { + "threshold": 0.48, + "threshold_type": "fixed", + "min_word_threshold": 0 + } + }, + "options": {"ignore_links": True} + } + } + } + }) + + result = response.json() + if result['success']: + for r in result['results']: + print(f"Full Markdown Length: {len(r['markdown_v2']['raw_markdown'])}") + print(f"Fit Markdown Length: {len(r['markdown_v2']['fit_markdown'])}") + +async def main(): + """Run both tests""" + print("Testing Crawl4AI API...") + + # print("\n1. Testing regular (non-streaming) endpoint...") + # await test_regular() + + # print("\n2. Testing streaming endpoint...") + # await test_streaming() + + print("\n3. Testing complex configuration...") + await test_complex_config() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file