From a58c8000aab067d51db15a871a0c3fe377e73788 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Sun, 20 Apr 2025 20:14:26 +0800 Subject: [PATCH] refactor(server): migrate to pool-based crawler management Replace crawler_manager.py with simpler crawler_pool.py implementation: - Add global page semaphore for hard concurrency cap - Implement browser pool with idle cleanup - Add playground UI for testing and stress testing - Update API handlers to use pooled crawlers - Enhance logging levels and symbols BREAKING CHANGE: Removes CrawlerManager class in favor of simpler pool-based approach --- Dockerfile | 3 + crawl4ai/async_logger.py | 36 + crawl4ai/browser_manager.py | 3 + deploy/docker/api copy.py | 503 ------------- deploy/docker/api.py | 59 +- deploy/docker/config.yml | 56 +- deploy/docker/crawler_manager.py | 556 -------------- deploy/docker/crawler_pool.py | 60 ++ deploy/docker/server.py | 509 +++++-------- deploy/docker/static/playground/index.html | 813 +++++++++++++++++++++ tests/memory/cap_test.py | 34 + tests/memory/test_docker_congif_gen.py | 35 + tests/memory/test_stress_api.py | 12 +- tests/memory/test_stress_api_xs.py | 203 +++++ 14 files changed, 1447 insertions(+), 1435 deletions(-) delete mode 100644 deploy/docker/api copy.py delete mode 100644 deploy/docker/crawler_manager.py create mode 100644 deploy/docker/crawler_pool.py create mode 100644 deploy/docker/static/playground/index.html create mode 100644 tests/memory/cap_test.py create mode 100644 tests/memory/test_docker_congif_gen.py create mode 100644 tests/memory/test_stress_api_xs.py diff --git a/Dockerfile b/Dockerfile index a4ab56df..d32639a5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -162,6 +162,9 @@ RUN crawl4ai-doctor # Copy application code COPY deploy/docker/* ${APP_HOME}/ +# copy the playground + any future static assets +COPY deploy/docker/static ${APP_HOME}/static + # Change ownership of the application directory to the non-root user RUN chown -R appuser:appuser ${APP_HOME} diff --git a/crawl4ai/async_logger.py b/crawl4ai/async_logger.py index 273ef53b..541f755a 100644 --- a/crawl4ai/async_logger.py +++ b/crawl4ai/async_logger.py @@ -7,11 +7,18 @@ from datetime import datetime class LogLevel(Enum): + DEFAULT = 0 DEBUG = 1 INFO = 2 SUCCESS = 3 WARNING = 4 ERROR = 5 + CRITICAL = 6 + ALERT = 7 + NOTICE = 8 + EXCEPTION = 9 + FATAL = 10 + @@ -61,6 +68,13 @@ class AsyncLogger(AsyncLoggerBase): "DEBUG": "⋯", "INFO": "ℹ", "WARNING": "⚠", + "SUCCESS": "✔", + "CRITICAL": "‼", + "ALERT": "⚡", + "NOTICE": "ℹ", + "EXCEPTION": "❗", + "FATAL": "☠", + "DEFAULT": "•", } DEFAULT_COLORS = { @@ -69,6 +83,12 @@ class AsyncLogger(AsyncLoggerBase): LogLevel.SUCCESS: Fore.GREEN, LogLevel.WARNING: Fore.YELLOW, LogLevel.ERROR: Fore.RED, + LogLevel.CRITICAL: Fore.RED + Style.BRIGHT, + LogLevel.ALERT: Fore.RED + Style.BRIGHT, + LogLevel.NOTICE: Fore.BLUE, + LogLevel.EXCEPTION: Fore.RED + Style.BRIGHT, + LogLevel.FATAL: Fore.RED + Style.BRIGHT, + LogLevel.DEFAULT: Fore.WHITE, } def __init__( @@ -212,6 +232,22 @@ class AsyncLogger(AsyncLoggerBase): def warning(self, message: str, tag: str = "WARNING", **kwargs): """Log a warning message.""" self._log(LogLevel.WARNING, message, tag, **kwargs) + + def critical(self, message: str, tag: str = "CRITICAL", **kwargs): + """Log a critical message.""" + self._log(LogLevel.ERROR, message, tag, **kwargs) + def exception(self, message: str, tag: str = "EXCEPTION", **kwargs): + """Log an exception message.""" + self._log(LogLevel.ERROR, message, tag, **kwargs) + def fatal(self, message: str, tag: str = "FATAL", **kwargs): + """Log a fatal message.""" + self._log(LogLevel.ERROR, message, tag, **kwargs) + def alert(self, message: str, tag: str = "ALERT", **kwargs): + """Log an alert message.""" + self._log(LogLevel.ERROR, message, tag, **kwargs) + def notice(self, message: str, tag: str = "NOTICE", **kwargs): + """Log a notice message.""" + self._log(LogLevel.INFO, message, tag, **kwargs) def error(self, message: str, tag: str = "ERROR", **kwargs): """Log an error message.""" diff --git a/crawl4ai/browser_manager.py b/crawl4ai/browser_manager.py index a338d71d..642fd6c2 100644 --- a/crawl4ai/browser_manager.py +++ b/crawl4ai/browser_manager.py @@ -572,6 +572,9 @@ class BrowserManager: if self.config.extra_args: args.extend(self.config.extra_args) + # Deduplicate args + args = list(dict.fromkeys(args)) + browser_args = {"headless": self.config.headless, "args": args} if self.config.chrome_channel: diff --git a/deploy/docker/api copy.py b/deploy/docker/api copy.py deleted file mode 100644 index 341e23e1..00000000 --- a/deploy/docker/api copy.py +++ /dev/null @@ -1,503 +0,0 @@ -import os -import json -import asyncio -from typing import List, Tuple -from functools import partial - -import logging -from typing import Optional, AsyncGenerator -from urllib.parse import unquote -from fastapi import HTTPException, Request, status -from fastapi.background import BackgroundTasks -from fastapi.responses import JSONResponse -from redis import asyncio as aioredis - -from crawl4ai import ( - AsyncWebCrawler, - CrawlerRunConfig, - LLMExtractionStrategy, - CacheMode, - BrowserConfig, - MemoryAdaptiveDispatcher, - RateLimiter, - LLMConfig -) -from crawl4ai.utils import perform_completion_with_backoff -from crawl4ai.content_filter_strategy import ( - PruningContentFilter, - BM25ContentFilter, - LLMContentFilter -) -from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator -from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy - -from utils import ( - TaskStatus, - FilterType, - get_base_url, - is_task_id, - should_cleanup_task, - decode_redis_hash -) - -import psutil, time - -logger = logging.getLogger(__name__) - -# --- Helper to get memory --- -def _get_memory_mb(): - try: - return psutil.Process().memory_info().rss / (1024 * 1024) - except Exception as e: - logger.warning(f"Could not get memory info: {e}") - return None - - -async def handle_llm_qa( - url: str, - query: str, - config: dict -) -> str: - """Process QA using LLM with crawled content as context.""" - try: - # Extract base URL by finding last '?q=' occurrence - last_q_index = url.rfind('?q=') - if last_q_index != -1: - url = url[:last_q_index] - - # Get markdown content - async with AsyncWebCrawler() as crawler: - result = await crawler.arun(url) - if not result.success: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=result.error_message - ) - content = result.markdown.fit_markdown - - # Create prompt and get LLM response - prompt = f"""Use the following content as context to answer the question. - Content: - {content} - - Question: {query} - - Answer:""" - - response = perform_completion_with_backoff( - provider=config["llm"]["provider"], - prompt_with_variables=prompt, - api_token=os.environ.get(config["llm"].get("api_key_env", "")) - ) - - return response.choices[0].message.content - except Exception as e: - logger.error(f"QA processing error: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -async def process_llm_extraction( - redis: aioredis.Redis, - config: dict, - task_id: str, - url: str, - instruction: str, - schema: Optional[str] = None, - cache: str = "0" -) -> None: - """Process LLM extraction in background.""" - try: - # If config['llm'] has api_key then ignore the api_key_env - api_key = "" - if "api_key" in config["llm"]: - api_key = config["llm"]["api_key"] - else: - api_key = os.environ.get(config["llm"].get("api_key_env", None), "") - llm_strategy = LLMExtractionStrategy( - llm_config=LLMConfig( - provider=config["llm"]["provider"], - api_token=api_key - ), - instruction=instruction, - schema=json.loads(schema) if schema else None, - ) - - cache_mode = CacheMode.ENABLED if cache == "1" else CacheMode.WRITE_ONLY - - async with AsyncWebCrawler() as crawler: - result = await crawler.arun( - url=url, - config=CrawlerRunConfig( - extraction_strategy=llm_strategy, - scraping_strategy=LXMLWebScrapingStrategy(), - cache_mode=cache_mode - ) - ) - - if not result.success: - await redis.hset(f"task:{task_id}", mapping={ - "status": TaskStatus.FAILED, - "error": result.error_message - }) - return - - try: - content = json.loads(result.extracted_content) - except json.JSONDecodeError: - content = result.extracted_content - await redis.hset(f"task:{task_id}", mapping={ - "status": TaskStatus.COMPLETED, - "result": json.dumps(content) - }) - - except Exception as e: - logger.error(f"LLM extraction error: {str(e)}", exc_info=True) - await redis.hset(f"task:{task_id}", mapping={ - "status": TaskStatus.FAILED, - "error": str(e) - }) - -async def handle_markdown_request( - url: str, - filter_type: FilterType, - query: Optional[str] = None, - cache: str = "0", - config: Optional[dict] = None -) -> str: - """Handle markdown generation requests.""" - try: - decoded_url = unquote(url) - if not decoded_url.startswith(('http://', 'https://')): - decoded_url = 'https://' + decoded_url - - if filter_type == FilterType.RAW: - md_generator = DefaultMarkdownGenerator() - else: - content_filter = { - FilterType.FIT: PruningContentFilter(), - FilterType.BM25: BM25ContentFilter(user_query=query or ""), - FilterType.LLM: LLMContentFilter( - llm_config=LLMConfig( - provider=config["llm"]["provider"], - api_token=os.environ.get(config["llm"].get("api_key_env", None), ""), - ), - instruction=query or "Extract main content" - ) - }[filter_type] - md_generator = DefaultMarkdownGenerator(content_filter=content_filter) - - cache_mode = CacheMode.ENABLED if cache == "1" else CacheMode.WRITE_ONLY - - async with AsyncWebCrawler() as crawler: - result = await crawler.arun( - url=decoded_url, - config=CrawlerRunConfig( - markdown_generator=md_generator, - scraping_strategy=LXMLWebScrapingStrategy(), - cache_mode=cache_mode - ) - ) - - if not result.success: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=result.error_message - ) - - return (result.markdown.raw_markdown - if filter_type == FilterType.RAW - else result.markdown.fit_markdown) - - except Exception as e: - logger.error(f"Markdown error: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -async def handle_llm_request( - redis: aioredis.Redis, - background_tasks: BackgroundTasks, - request: Request, - input_path: str, - query: Optional[str] = None, - schema: Optional[str] = None, - cache: str = "0", - config: Optional[dict] = None -) -> JSONResponse: - """Handle LLM extraction requests.""" - base_url = get_base_url(request) - - try: - if is_task_id(input_path): - return await handle_task_status( - redis, input_path, base_url - ) - - if not query: - return JSONResponse({ - "message": "Please provide an instruction", - "_links": { - "example": { - "href": f"{base_url}/llm/{input_path}?q=Extract+main+content", - "title": "Try this example" - } - } - }) - - return await create_new_task( - redis, - background_tasks, - input_path, - query, - schema, - cache, - base_url, - config - ) - - except Exception as e: - logger.error(f"LLM endpoint error: {str(e)}", exc_info=True) - return JSONResponse({ - "error": str(e), - "_links": { - "retry": {"href": str(request.url)} - } - }, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) - -async def handle_task_status( - redis: aioredis.Redis, - task_id: str, - base_url: str -) -> JSONResponse: - """Handle task status check requests.""" - task = await redis.hgetall(f"task:{task_id}") - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Task not found" - ) - - task = decode_redis_hash(task) - response = create_task_response(task, task_id, base_url) - - if task["status"] in [TaskStatus.COMPLETED, TaskStatus.FAILED]: - if should_cleanup_task(task["created_at"]): - await redis.delete(f"task:{task_id}") - - return JSONResponse(response) - -async def create_new_task( - redis: aioredis.Redis, - background_tasks: BackgroundTasks, - input_path: str, - query: str, - schema: Optional[str], - cache: str, - base_url: str, - config: dict -) -> JSONResponse: - """Create and initialize a new task.""" - decoded_url = unquote(input_path) - if not decoded_url.startswith(('http://', 'https://')): - decoded_url = 'https://' + decoded_url - - from datetime import datetime - task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}" - - await redis.hset(f"task:{task_id}", mapping={ - "status": TaskStatus.PROCESSING, - "created_at": datetime.now().isoformat(), - "url": decoded_url - }) - - background_tasks.add_task( - process_llm_extraction, - redis, - config, - task_id, - decoded_url, - query, - schema, - cache - ) - - return JSONResponse({ - "task_id": task_id, - "status": TaskStatus.PROCESSING, - "url": decoded_url, - "_links": { - "self": {"href": f"{base_url}/llm/{task_id}"}, - "status": {"href": f"{base_url}/llm/{task_id}"} - } - }) - -def create_task_response(task: dict, task_id: str, base_url: str) -> dict: - """Create response for task status check.""" - response = { - "task_id": task_id, - "status": task["status"], - "created_at": task["created_at"], - "url": task["url"], - "_links": { - "self": {"href": f"{base_url}/llm/{task_id}"}, - "refresh": {"href": f"{base_url}/llm/{task_id}"} - } - } - - if task["status"] == TaskStatus.COMPLETED: - response["result"] = json.loads(task["result"]) - elif task["status"] == TaskStatus.FAILED: - response["error"] = task["error"] - - return response - -async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]: - """Stream results with heartbeats and completion markers.""" - import json - from utils import datetime_handler - - try: - async for result in results_gen: - try: - server_memory_mb = _get_memory_mb() - result_dict = result.model_dump() - result_dict['server_memory_mb'] = server_memory_mb - logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") - data = json.dumps(result_dict, default=datetime_handler) + "\n" - yield data.encode('utf-8') - except Exception as e: - logger.error(f"Serialization error: {e}") - error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')} - yield (json.dumps(error_response) + "\n").encode('utf-8') - - yield json.dumps({"status": "completed"}).encode('utf-8') - - except asyncio.CancelledError: - logger.warning("Client disconnected during streaming") - finally: - try: - await crawler.close() - except Exception as e: - logger.error(f"Crawler cleanup error: {e}") - -async def handle_crawl_request( - urls: List[str], - browser_config: dict, - crawler_config: dict, - config: dict -) -> dict: - """Handle non-streaming crawl requests.""" - start_mem_mb = _get_memory_mb() # <--- Get memory before - start_time = time.time() - mem_delta_mb = None - peak_mem_mb = start_mem_mb - - try: - browser_config = BrowserConfig.load(browser_config) - crawler_config = CrawlerRunConfig.load(crawler_config) - - dispatcher = MemoryAdaptiveDispatcher( - memory_threshold_percent=config["crawler"]["memory_threshold_percent"], - rate_limiter=RateLimiter( - base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) - ) - - crawler: AsyncWebCrawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - results = [] - func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many") - partial_func = partial(func, - urls[0] if len(urls) == 1 else urls, - config=crawler_config, - dispatcher=dispatcher) - results = await partial_func() - await crawler.close() - - end_mem_mb = _get_memory_mb() # <--- Get memory after - end_time = time.time() - - if start_mem_mb is not None and end_mem_mb is not None: - mem_delta_mb = end_mem_mb - start_mem_mb # <--- Calculate delta - peak_mem_mb = max(peak_mem_mb if peak_mem_mb else 0, end_mem_mb) # <--- Get peak memory - logger.info(f"Memory usage: Start: {start_mem_mb} MB, End: {end_mem_mb} MB, Delta: {mem_delta_mb} MB, Peak: {peak_mem_mb} MB") - - return { - "success": True, - "results": [result.model_dump() for result in results], - "server_processing_time_s": end_time - start_time, - "server_memory_delta_mb": mem_delta_mb, - "server_peak_memory_mb": peak_mem_mb - } - - except Exception as e: - logger.error(f"Crawl error: {str(e)}", exc_info=True) - if 'crawler' in locals() and crawler.ready: # Check if crawler was initialized and started - try: - await crawler.close() - except Exception as close_e: - logger.error(f"Error closing crawler during exception handling: {close_e}") - - # Measure memory even on error if possible - end_mem_mb_error = _get_memory_mb() - if start_mem_mb is not None and end_mem_mb_error is not None: - mem_delta_mb = end_mem_mb_error - start_mem_mb - - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=json.dumps({ # Send structured error - "error": str(e), - "server_memory_delta_mb": mem_delta_mb, - "server_peak_memory_mb": max(peak_mem_mb if peak_mem_mb else 0, end_mem_mb_error or 0) - }) - ) - -async def handle_stream_crawl_request( - urls: List[str], - browser_config: dict, - crawler_config: dict, - config: dict -) -> Tuple[AsyncWebCrawler, AsyncGenerator]: - """Handle streaming crawl requests.""" - try: - browser_config = BrowserConfig.load(browser_config) - # browser_config.verbose = True # Set to False or remove for production stress testing - browser_config.verbose = False - crawler_config = CrawlerRunConfig.load(crawler_config) - crawler_config.scraping_strategy = LXMLWebScrapingStrategy() - crawler_config.stream = True - - dispatcher = MemoryAdaptiveDispatcher( - memory_threshold_percent=config["crawler"]["memory_threshold_percent"], - rate_limiter=RateLimiter( - base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) - ) - - crawler = AsyncWebCrawler(config=browser_config) - await crawler.start() - - results_gen = await crawler.arun_many( - urls=urls, - config=crawler_config, - dispatcher=dispatcher - ) - - return crawler, results_gen - - except Exception as e: - # Make sure to close crawler if started during an error here - if 'crawler' in locals() and crawler.ready: - try: - await crawler.close() - except Exception as close_e: - logger.error(f"Error closing crawler during stream setup exception: {close_e}") - logger.error(f"Stream crawl error: {str(e)}", exc_info=True) - # Raising HTTPException here will prevent streaming response - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) \ No newline at end of file diff --git a/deploy/docker/api.py b/deploy/docker/api.py index b226682f..130b57d0 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -377,14 +377,14 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) except asyncio.CancelledError: logger.warning("Client disconnected during streaming") - # finally: - # try: - # await crawler.close() - # except Exception as e: - # logger.error(f"Crawler cleanup error: {e}") + finally: + # try: + # await crawler.close() + # except Exception as e: + # logger.error(f"Crawler cleanup error: {e}") + pass async def handle_crawl_request( - crawler: AsyncWebCrawler, urls: List[str], browser_config: dict, crawler_config: dict, @@ -404,24 +404,29 @@ async def handle_crawl_request( memory_threshold_percent=config["crawler"]["memory_threshold_percent"], rate_limiter=RateLimiter( base_delay=tuple(config["crawler"]["rate_limiter"]["base_delay"]) - ) + ) if config["crawler"]["rate_limiter"]["enabled"] else None ) + + from crawler_pool import get_crawler + crawler = await get_crawler(browser_config) # crawler: AsyncWebCrawler = AsyncWebCrawler(config=browser_config) # await crawler.start() + + base_config = config["crawler"]["base_config"] + # Iterate on key-value pairs in global_config then use haseattr to set them + for key, value in base_config.items(): + if hasattr(crawler_config, key): + setattr(crawler_config, key, value) + results = [] func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many") partial_func = partial(func, urls[0] if len(urls) == 1 else urls, config=crawler_config, dispatcher=dispatcher) - - # Simulate work being done by the crawler - # logger.debug(f"Request (URLs: {len(urls)}) starting simulated work...") # Add log - # await asyncio.sleep(2) # <--- ADD ARTIFICIAL DELAY (e.g., 0.5 seconds) - # logger.debug(f"Request (URLs: {len(urls)}) finished simulated work.") - results = await partial_func() + # await crawler.close() end_mem_mb = _get_memory_mb() # <--- Get memory after @@ -442,11 +447,12 @@ async def handle_crawl_request( except Exception as e: logger.error(f"Crawl error: {str(e)}", exc_info=True) - # if 'crawler' in locals() and crawler.ready: # Check if crawler was initialized and started - # try: - # await crawler.close() - # except Exception as close_e: - # logger.error(f"Error closing crawler during exception handling: {close_e}") + if 'crawler' in locals() and crawler.ready: # Check if crawler was initialized and started + # try: + # await crawler.close() + # except Exception as close_e: + # logger.error(f"Error closing crawler during exception handling: {close_e}") + logger.error(f"Error closing crawler during exception handling: {close_e}") # Measure memory even on error if possible end_mem_mb_error = _get_memory_mb() @@ -463,7 +469,6 @@ async def handle_crawl_request( ) async def handle_stream_crawl_request( - crawler: AsyncWebCrawler, urls: List[str], browser_config: dict, crawler_config: dict, @@ -485,6 +490,9 @@ async def handle_stream_crawl_request( ) ) + from crawler_pool import get_crawler + crawler = await get_crawler(browser_config) + # crawler = AsyncWebCrawler(config=browser_config) # await crawler.start() @@ -494,17 +502,16 @@ async def handle_stream_crawl_request( dispatcher=dispatcher ) - # Return the *same* crawler instance and the generator - # The caller (server.py) manages the crawler lifecycle via the pool context return crawler, results_gen except Exception as e: # Make sure to close crawler if started during an error here - # if 'crawler' in locals() and crawler.ready: - # try: - # await crawler.close() - # except Exception as close_e: - # logger.error(f"Error closing crawler during stream setup exception: {close_e}") + if 'crawler' in locals() and crawler.ready: + # try: + # await crawler.close() + # except Exception as close_e: + # logger.error(f"Error closing crawler during stream setup exception: {close_e}") + logger.error(f"Error closing crawler during stream setup exception: {close_e}") logger.error(f"Stream crawl error: {str(e)}", exc_info=True) # Raising HTTPException here will prevent streaming response raise HTTPException( diff --git a/deploy/docker/config.yml b/deploy/docker/config.yml index 17848e99..e93343c1 100644 --- a/deploy/docker/config.yml +++ b/deploy/docker/config.yml @@ -5,6 +5,7 @@ app: host: "0.0.0.0" port: 8020 reload: False + workers: 4 timeout_keep_alive: 300 # Default LLM Configuration @@ -48,53 +49,38 @@ security: content_security_policy: "default-src 'self'" strict_transport_security: "max-age=63072000; includeSubDomains" -# Crawler Pool Configuration -crawler_pool: - enabled: true # Set to false to disable the pool - - # --- Option 1: Auto-calculate size --- - auto_calculate_size: true - calculation_params: - mem_headroom_mb: 512 # Memory reserved for OS/other apps - avg_page_mem_mb: 150 # Estimated MB per concurrent "tab"/page in browsers - fd_per_page: 20 # Estimated file descriptors per page - core_multiplier: 4 # Max crawlers per CPU core - min_pool_size: 2 # Minimum number of primary crawlers - max_pool_size: 16 # Maximum number of primary crawlers - - # --- Option 2: Manual size (ignored if auto_calculate_size is true) --- - # pool_size: 8 - - # --- Other Pool Settings --- - backup_pool_size: 1 # Number of backup crawlers - max_wait_time_s: 30.0 # Max seconds a request waits for a free crawler - throttle_threshold_percent: 70.0 # Start throttling delay above this % usage - throttle_delay_min_s: 0.1 # Min throttle delay - throttle_delay_max_s: 0.5 # Max throttle delay - - # --- Browser Config for Pooled Crawlers --- - browser_config: - # No need for "type": "BrowserConfig" here, just params - headless: true - verbose: false # Keep pool crawlers less verbose in production - # user_agent: "MyPooledCrawler/1.0" # Example - # Add other BrowserConfig params as needed (e.g., proxy, viewport) - # Crawler Configuration crawler: + base_config: + simulate_user: true memory_threshold_percent: 95.0 rate_limiter: + enabled: true base_delay: [1.0, 2.0] timeouts: stream_init: 30.0 # Timeout for stream initialization batch_process: 300.0 # Timeout for batch processing + pool: + max_pages: 40 # ← GLOBAL_SEM permits + idle_ttl_sec: 1800 # ← 30 min janitor cutoff + browser: + kwargs: + headless: true + text_mode: true + extra_args: + # - "--single-process" + - "--no-sandbox" + - "--disable-dev-shm-usage" + - "--disable-gpu" + - "--disable-software-rasterizer" + - "--disable-web-security" + - "--allow-insecure-localhost" + - "--ignore-certificate-errors" # Logging Configuration logging: level: "INFO" format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - file: "logs/app.log" - verbose: true # Observability Configuration observability: @@ -102,4 +88,4 @@ observability: enabled: True endpoint: "/metrics" health_check: - endpoint: "/health" + endpoint: "/health" \ No newline at end of file diff --git a/deploy/docker/crawler_manager.py b/deploy/docker/crawler_manager.py deleted file mode 100644 index b566e2d3..00000000 --- a/deploy/docker/crawler_manager.py +++ /dev/null @@ -1,556 +0,0 @@ -# crawler_manager.py -import asyncio -import time -import uuid -import psutil -import os -import resource # For FD limit -import random -import math -from typing import Optional, Tuple, Any, List, Dict, AsyncGenerator -from pydantic import BaseModel, Field, field_validator -from contextlib import asynccontextmanager -import logging - -from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, AsyncLogger -# Assuming api.py handlers are accessible or refactored slightly if needed -# We might need to import the specific handler functions if we call them directly -# from api import handle_crawl_request, handle_stream_crawl_request, _get_memory_mb, stream_results - -# --- Custom Exceptions --- -class PoolTimeoutError(Exception): - """Raised when waiting for a crawler resource times out.""" - pass - -class PoolConfigurationError(Exception): - """Raised for configuration issues.""" - pass - -class NoHealthyCrawlerError(Exception): - """Raised when no healthy crawler is available.""" - pass - - -# --- Configuration Models --- -class CalculationParams(BaseModel): - mem_headroom_mb: int = 512 - avg_page_mem_mb: int = 150 - fd_per_page: int = 20 - core_multiplier: int = 4 - min_pool_size: int = 1 # Min safe pages should be at least 1 - max_pool_size: int = 16 - - # V2 validation for avg_page_mem_mb - @field_validator('avg_page_mem_mb') - @classmethod - def check_avg_page_mem(cls, v: int) -> int: - if v <= 0: - raise ValueError("avg_page_mem_mb must be positive") - return v - - # V2 validation for fd_per_page - @field_validator('fd_per_page') - @classmethod - def check_fd_per_page(cls, v: int) -> int: - if v <= 0: - raise ValueError("fd_per_page must be positive") - return v - -# crawler_manager.py -# ... (imports including BaseModel, Field from pydantic) ... -from pydantic import BaseModel, Field, field_validator # <-- Import field_validator - -# --- Configuration Models (Pydantic V2 Syntax) --- -class CalculationParams(BaseModel): - mem_headroom_mb: int = 512 - avg_page_mem_mb: int = 150 - fd_per_page: int = 20 - core_multiplier: int = 4 - min_pool_size: int = 1 # Min safe pages should be at least 1 - max_pool_size: int = 16 - - # V2 validation for avg_page_mem_mb - @field_validator('avg_page_mem_mb') - @classmethod - def check_avg_page_mem(cls, v: int) -> int: - if v <= 0: - raise ValueError("avg_page_mem_mb must be positive") - return v - - # V2 validation for fd_per_page - @field_validator('fd_per_page') - @classmethod - def check_fd_per_page(cls, v: int) -> int: - if v <= 0: - raise ValueError("fd_per_page must be positive") - return v - -class CrawlerManagerConfig(BaseModel): - enabled: bool = True - auto_calculate_size: bool = True - calculation_params: CalculationParams = Field(default_factory=CalculationParams) # Use Field for default_factory - backup_pool_size: int = Field(1, ge=0) # Allow 0 backups - max_wait_time_s: float = 30.0 - throttle_threshold_percent: float = Field(70.0, ge=0, le=100) - throttle_delay_min_s: float = 0.1 - throttle_delay_max_s: float = 0.5 - browser_config: Dict[str, Any] = Field(default_factory=lambda: {"headless": True, "verbose": False}) # Use Field for default_factory - primary_reload_delay_s: float = 60.0 - -# --- Crawler Manager --- -class CrawlerManager: - """Manages shared AsyncWebCrawler instances, concurrency, and failover.""" - - def __init__(self, config: CrawlerManagerConfig, logger = None): - if not config.enabled: - self.logger.warning("CrawlerManager is disabled by configuration.") - # Set defaults to allow server to run, but manager won't function - self.config = config - self._initialized = False, - return - - self.config = config - self._primary_crawler: Optional[AsyncWebCrawler] = None - self._secondary_crawlers: List[AsyncWebCrawler] = [] - self._active_crawler_index: int = 0 # 0 for primary, 1+ for secondary index - self._primary_healthy: bool = False - self._secondary_healthy_flags: List[bool] = [] - - self._safe_pages: int = 1 # Default, calculated in initialize - self._semaphore: Optional[asyncio.Semaphore] = None - self._state_lock = asyncio.Lock() # Protects active_crawler, health flags - self._reload_tasks: List[Optional[asyncio.Task]] = [] # Track reload background tasks - - self._initialized = False - self._shutting_down = False - - # Initialize logger if provided - if logger is None: - self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging.INFO) - else: - self.logger = logger - - self.logger.info("CrawlerManager initialized with config.") - self.logger.debug(f"Config: {self.config.model_dump_json(indent=2)}") - - def is_enabled(self) -> bool: - return self.config.enabled and self._initialized - - def _get_system_resources(self) -> Tuple[int, int, int]: - """Gets RAM, CPU cores, and FD limit.""" - total_ram_mb = 0 - cpu_cores = 0 - try: - mem_info = psutil.virtual_memory() - total_ram_mb = mem_info.total // (1024 * 1024) - cpu_cores = psutil.cpu_count(logical=False) or psutil.cpu_count(logical=True) # Prefer physical cores - except Exception as e: - self.logger.warning(f"Could not get RAM/CPU info via psutil: {e}") - total_ram_mb = 2048 # Default fallback - cpu_cores = 2 # Default fallback - - fd_limit = 1024 # Default fallback - try: - soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) - fd_limit = soft_limit # Use the soft limit - except (ImportError, ValueError, OSError, AttributeError) as e: - self.logger.warning(f"Could not get file descriptor limit (common on Windows): {e}. Using default: {fd_limit}") - - self.logger.info(f"System Resources: RAM={total_ram_mb}MB, Cores={cpu_cores}, FD Limit={fd_limit}") - return total_ram_mb, cpu_cores, fd_limit - - def _calculate_safe_pages(self) -> int: - """Calculates the safe number of concurrent pages based on resources.""" - if not self.config.auto_calculate_size: - # If auto-calc is off, use max_pool_size as the hard limit - # This isn't ideal based on the prompt, but provides *some* manual override - # A dedicated `manual_safe_pages` might be better. Let's use max_pool_size for now. - self.logger.warning("Auto-calculation disabled. Using max_pool_size as safe_pages limit.") - return self.config.calculation_params.max_pool_size - - params = self.config.calculation_params - total_ram_mb, cpu_cores, fd_limit = self._get_system_resources() - - available_ram_mb = total_ram_mb - params.mem_headroom_mb - if available_ram_mb <= 0: - self.logger.error(f"Not enough RAM ({total_ram_mb}MB) after headroom ({params.mem_headroom_mb}MB). Cannot calculate safe pages.") - return params.min_pool_size # Fallback to minimum - - try: - # Calculate limits from each resource - mem_limit = available_ram_mb // params.avg_page_mem_mb if params.avg_page_mem_mb > 0 else float('inf') - fd_limit_pages = fd_limit // params.fd_per_page if params.fd_per_page > 0 else float('inf') - cpu_limit = cpu_cores * params.core_multiplier if cpu_cores > 0 else float('inf') - - # Determine the most constraining limit - calculated_limit = math.floor(min(mem_limit, fd_limit_pages, cpu_limit)) - - except ZeroDivisionError: - self.logger.error("Division by zero in safe_pages calculation (avg_page_mem_mb or fd_per_page is zero).") - calculated_limit = params.min_pool_size # Fallback - - # Clamp the result within min/max bounds - safe_pages = max(params.min_pool_size, min(calculated_limit, params.max_pool_size)) - - self.logger.info(f"Calculated safe pages: MemoryLimit={mem_limit}, FDLimit={fd_limit_pages}, CPULimit={cpu_limit} -> RawCalc={calculated_limit} -> Clamped={safe_pages}") - return safe_pages - - async def _create_and_start_crawler(self, crawler_id: str) -> Optional[AsyncWebCrawler]: - """Creates, starts, and returns a crawler instance.""" - try: - # Create BrowserConfig from the dictionary in manager config - browser_conf = BrowserConfig(**self.config.browser_config) - crawler = AsyncWebCrawler(config=browser_conf) - await crawler.start() - self.logger.info(f"Successfully started crawler instance: {crawler_id}") - return crawler - except Exception as e: - self.logger.error(f"Failed to start crawler instance {crawler_id}: {e}", exc_info=True) - return None - - async def initialize(self): - """Initializes crawlers and semaphore. Called at server startup.""" - if not self.config.enabled or self._initialized: - return - - self.logger.info("Initializing CrawlerManager...") - self._safe_pages = self._calculate_safe_pages() - self._semaphore = asyncio.Semaphore(self._safe_pages) - - self._primary_crawler = await self._create_and_start_crawler("Primary") - if self._primary_crawler: - self._primary_healthy = True - else: - self._primary_healthy = False - self.logger.critical("Primary crawler failed to initialize!") - - self._secondary_crawlers = [] - self._secondary_healthy_flags = [] - self._reload_tasks = [None] * (1 + self.config.backup_pool_size) # For primary + backups - - for i in range(self.config.backup_pool_size): - sec_id = f"Secondary-{i+1}" - crawler = await self._create_and_start_crawler(sec_id) - self._secondary_crawlers.append(crawler) # Add even if None - self._secondary_healthy_flags.append(crawler is not None) - if crawler is None: - self.logger.error(f"{sec_id} crawler failed to initialize!") - - # Set initial active crawler (prefer primary) - if self._primary_healthy: - self._active_crawler_index = 0 - self.logger.info("Primary crawler is active.") - else: - # Find the first healthy secondary - found_healthy_backup = False - for i, healthy in enumerate(self._secondary_healthy_flags): - if healthy: - self._active_crawler_index = i + 1 # 1-based index for secondaries - self.logger.warning(f"Primary failed, Secondary-{i+1} is active.") - found_healthy_backup = True - break - if not found_healthy_backup: - self.logger.critical("FATAL: No healthy crawlers available after initialization!") - # Server should probably refuse connections in this state - - self._initialized = True - self.logger.info(f"CrawlerManager initialized. Safe Pages: {self._safe_pages}. Active Crawler Index: {self._active_crawler_index}") - - async def shutdown(self): - """Shuts down all crawler instances. Called at server shutdown.""" - if not self._initialized or self._shutting_down: - return - - self._shutting_down = True - self.logger.info("Shutting down CrawlerManager...") - - # Cancel any ongoing reload tasks - for i, task in enumerate(self._reload_tasks): - if task and not task.done(): - try: - task.cancel() - await task # Wait for cancellation - self.logger.info(f"Cancelled reload task for crawler index {i}.") - except asyncio.CancelledError: - self.logger.info(f"Reload task for crawler index {i} was already cancelled.") - except Exception as e: - self.logger.warning(f"Error cancelling reload task for crawler index {i}: {e}") - self._reload_tasks = [] - - - # Close primary - if self._primary_crawler: - try: - self.logger.info("Closing primary crawler...") - await self._primary_crawler.close() - self._primary_crawler = None - except Exception as e: - self.logger.error(f"Error closing primary crawler: {e}", exc_info=True) - - # Close secondaries - for i, crawler in enumerate(self._secondary_crawlers): - if crawler: - try: - self.logger.info(f"Closing secondary crawler {i+1}...") - await crawler.close() - except Exception as e: - self.logger.error(f"Error closing secondary crawler {i+1}: {e}", exc_info=True) - self._secondary_crawlers = [] - - self._initialized = False - self.logger.info("CrawlerManager shut down complete.") - - @asynccontextmanager - async def get_crawler(self) -> AsyncGenerator[AsyncWebCrawler, None]: - """Acquires semaphore, yields active crawler, handles throttling & failover.""" - if not self.is_enabled(): - raise NoHealthyCrawlerError("CrawlerManager is disabled or not initialized.") - - if self._shutting_down: - raise NoHealthyCrawlerError("CrawlerManager is shutting down.") - - active_crawler: Optional[AsyncWebCrawler] = None - acquired = False - request_id = uuid.uuid4() - start_wait = time.time() - - # --- Throttling --- - try: - # Check semaphore value without acquiring - current_usage = self._safe_pages - self._semaphore._value - usage_percent = (current_usage / self._safe_pages) * 100 if self._safe_pages > 0 else 0 - - if usage_percent >= self.config.throttle_threshold_percent: - delay = random.uniform(self.config.throttle_delay_min_s, self.config.throttle_delay_max_s) - self.logger.debug(f"Throttling: Usage {usage_percent:.1f}% >= {self.config.throttle_threshold_percent}%. Delaying {delay:.3f}s") - await asyncio.sleep(delay) - except Exception as e: - self.logger.warning(f"Error during throttling check: {e}") # Continue attempt even if throttle check fails - - # --- Acquire Semaphore --- - try: - # self.logger.debug(f"Attempting to acquire semaphore (Available: {self._semaphore._value}/{self._safe_pages}). Wait Timeout: {self.config.max_wait_time_s}s") - - # --- Logging Before Acquire --- - sem_value = self._semaphore._value if self._semaphore else 'N/A' - sem_waiters = len(self._semaphore._waiters) if self._semaphore and self._semaphore._waiters else 0 - self.logger.debug(f"Req {request_id}: Attempting acquire. Available={sem_value}/{self._safe_pages}, Waiters={sem_waiters}, Timeout={self.config.max_wait_time_s}s") - - await asyncio.wait_for( - self._semaphore.acquire(), timeout=self.config.max_wait_time_s - ) - acquired = True - wait_duration = time.time() - start_wait - if wait_duration > 1: - self.logger.warning(f"Semaphore acquired after {wait_duration:.3f}s. (Available: {self._semaphore._value}/{self._safe_pages})") - - self.logger.debug(f"Semaphore acquired successfully after {wait_duration:.3f}s. (Available: {self._semaphore._value}/{self._safe_pages})") - - # --- Select Active Crawler (Critical Section) --- - async with self._state_lock: - current_active_index = self._active_crawler_index - is_primary_active = (current_active_index == 0) - - if is_primary_active: - if self._primary_healthy and self._primary_crawler: - active_crawler = self._primary_crawler - else: - # Primary is supposed to be active but isn't healthy - self.logger.warning("Primary crawler unhealthy, attempting immediate failover...") - if not await self._try_failover_sync(): # Try to switch active crawler NOW - raise NoHealthyCrawlerError("Primary unhealthy and no healthy backup available.") - # If failover succeeded, active_crawler_index is updated - current_active_index = self._active_crawler_index - # Fall through to select the new active secondary - - # Check if we need to use a secondary (either initially or after failover) - if current_active_index > 0: - secondary_idx = current_active_index - 1 - if secondary_idx < len(self._secondary_crawlers) and \ - self._secondary_healthy_flags[secondary_idx] and \ - self._secondary_crawlers[secondary_idx]: - active_crawler = self._secondary_crawlers[secondary_idx] - else: - self.logger.error(f"Selected Secondary-{current_active_index} is unhealthy or missing.") - # Attempt failover to *another* secondary if possible? (Adds complexity) - # For now, raise error if the selected one isn't good. - raise NoHealthyCrawlerError(f"Selected Secondary-{current_active_index} is unavailable.") - - if active_crawler is None: - # This shouldn't happen if logic above is correct, but safeguard - raise NoHealthyCrawlerError("Failed to select a healthy active crawler.") - - # --- Yield Crawler --- - try: - yield active_crawler - except Exception as crawl_error: - self.logger.error(f"Error during crawl execution using {active_crawler}: {crawl_error}", exc_info=True) - # Determine if this error warrants failover - # For now, let's assume any exception triggers a health check/failover attempt - await self._handle_crawler_failure(active_crawler) - raise # Re-raise the original error for the API handler - - except asyncio.TimeoutError: - self.logger.warning(f"Timeout waiting for semaphore after {self.config.max_wait_time_s}s.") - raise PoolTimeoutError(f"Timed out waiting for available crawler resource after {self.config.max_wait_time_s}s") - except NoHealthyCrawlerError: - # Logged within the selection logic - raise # Re-raise for API handler - except Exception as e: - self.logger.error(f"Unexpected error in get_crawler context manager: {e}", exc_info=True) - raise # Re-raise potentially unknown errors - finally: - if acquired: - self._semaphore.release() - self.logger.debug(f"Semaphore released. (Available: {self._semaphore._value}/{self._safe_pages})") - - - async def _try_failover_sync(self) -> bool: - """Synchronous part of failover logic (must be called under state_lock). Finds next healthy secondary.""" - if not self._primary_healthy: # Only failover if primary is already marked down - found_healthy_backup = False - start_idx = (self._active_crawler_index % (self.config.backup_pool_size +1)) # Start check after current - for i in range(self.config.backup_pool_size): - check_idx = (start_idx + i) % self.config.backup_pool_size # Circular check - if self._secondary_healthy_flags[check_idx] and self._secondary_crawlers[check_idx]: - self._active_crawler_index = check_idx + 1 - self.logger.warning(f"Failover successful: Switched active crawler to Secondary-{self._active_crawler_index}") - found_healthy_backup = True - break # Found one - if not found_healthy_backup: - # If primary is down AND no backups are healthy, mark primary as active index (0) but it's still unhealthy - self._active_crawler_index = 0 - self.logger.error("Failover failed: No healthy secondary crawlers available.") - return False - return True - return True # Primary is healthy, no failover needed - - async def _handle_crawler_failure(self, failed_crawler: AsyncWebCrawler): - """Handles marking a crawler as unhealthy and initiating recovery.""" - if self._shutting_down: return # Don't handle failures during shutdown - - async with self._state_lock: - crawler_index = -1 - is_primary = False - - if failed_crawler is self._primary_crawler and self._primary_healthy: - self.logger.warning("Primary crawler reported failure.") - self._primary_healthy = False - is_primary = True - crawler_index = 0 - # Try immediate failover within the lock - await self._try_failover_sync() - # Start reload task if not already running for primary - if self._reload_tasks[0] is None or self._reload_tasks[0].done(): - self.logger.info("Initiating primary crawler reload task.") - self._reload_tasks[0] = asyncio.create_task(self._reload_crawler(0)) - - else: - # Check if it was one of the secondaries - for i, crawler in enumerate(self._secondary_crawlers): - if failed_crawler is crawler and self._secondary_healthy_flags[i]: - self.logger.warning(f"Secondary-{i+1} crawler reported failure.") - self._secondary_healthy_flags[i] = False - is_primary = False - crawler_index = i + 1 - # If this *was* the active crawler, trigger failover check - if self._active_crawler_index == crawler_index: - self.logger.warning(f"Active secondary {crawler_index} failed, attempting failover...") - await self._try_failover_sync() - # Start reload task for this secondary - if self._reload_tasks[crawler_index] is None or self._reload_tasks[crawler_index].done(): - self.logger.info(f"Initiating Secondary-{i+1} crawler reload task.") - self._reload_tasks[crawler_index] = asyncio.create_task(self._reload_crawler(crawler_index)) - break # Found the failed secondary - - if crawler_index == -1: - self.logger.debug("Failure reported by an unknown or already unhealthy crawler instance. Ignoring.") - - - async def _reload_crawler(self, crawler_index_to_reload: int): - """Background task to close, recreate, and start a specific crawler.""" - is_primary = (crawler_index_to_reload == 0) - crawler_id = "Primary" if is_primary else f"Secondary-{crawler_index_to_reload}" - original_crawler = self._primary_crawler if is_primary else self._secondary_crawlers[crawler_index_to_reload - 1] - - self.logger.info(f"Starting reload process for {crawler_id}...") - - # 1. Delay before attempting reload (e.g., allow transient issues to clear) - if not is_primary: # Maybe shorter delay for backups? - await asyncio.sleep(self.config.primary_reload_delay_s / 2) - else: - await asyncio.sleep(self.config.primary_reload_delay_s) - - - # 2. Attempt to close the old instance cleanly - if original_crawler: - try: - self.logger.info(f"Attempting to close existing {crawler_id} instance...") - await original_crawler.close() - self.logger.info(f"Successfully closed old {crawler_id} instance.") - except Exception as e: - self.logger.warning(f"Error closing old {crawler_id} instance during reload: {e}") - - # 3. Create and start a new instance - self.logger.info(f"Attempting to start new {crawler_id} instance...") - new_crawler = await self._create_and_start_crawler(crawler_id) - - # 4. Update state if successful - async with self._state_lock: - if new_crawler: - self.logger.info(f"Successfully reloaded {crawler_id}. Marking as healthy.") - if is_primary: - self._primary_crawler = new_crawler - self._primary_healthy = True - # Switch back to primary if no other failures occurred - # Check if ANY secondary is currently active - secondary_is_active = self._active_crawler_index > 0 - if not secondary_is_active or not self._secondary_healthy_flags[self._active_crawler_index - 1]: - self.logger.info("Switching active crawler back to primary.") - self._active_crawler_index = 0 - else: # Is secondary - secondary_idx = crawler_index_to_reload - 1 - self._secondary_crawlers[secondary_idx] = new_crawler - self._secondary_healthy_flags[secondary_idx] = True - # Potentially switch back if primary is still down and this was needed? - if not self._primary_healthy and self._active_crawler_index == 0: - self.logger.info(f"Primary still down, activating reloaded Secondary-{crawler_index_to_reload}.") - self._active_crawler_index = crawler_index_to_reload - - else: - self.logger.error(f"Failed to reload {crawler_id}. It remains unhealthy.") - # Keep the crawler marked as unhealthy - if is_primary: - self._primary_healthy = False # Ensure it stays false - else: - self._secondary_healthy_flags[crawler_index_to_reload - 1] = False - - - # Clear the reload task reference for this index - self._reload_tasks[crawler_index_to_reload] = None - - - async def get_status(self) -> Dict: - """Returns the current status of the manager.""" - if not self.is_enabled(): - return {"status": "disabled"} - - async with self._state_lock: - active_id = "Primary" if self._active_crawler_index == 0 else f"Secondary-{self._active_crawler_index}" - primary_status = "Healthy" if self._primary_healthy else "Unhealthy" - secondary_statuses = [f"Secondary-{i+1}: {'Healthy' if healthy else 'Unhealthy'}" - for i, healthy in enumerate(self._secondary_healthy_flags)] - semaphore_available = self._semaphore._value if self._semaphore else 'N/A' - semaphore_locked = len(self._semaphore._waiters) if self._semaphore and self._semaphore._waiters else 0 - - return { - "status": "enabled", - "safe_pages": self._safe_pages, - "semaphore_available": semaphore_available, - "semaphore_waiters": semaphore_locked, - "active_crawler": active_id, - "primary_status": primary_status, - "secondary_statuses": secondary_statuses, - "reloading_tasks": [i for i, t in enumerate(self._reload_tasks) if t and not t.done()] - } \ No newline at end of file diff --git a/deploy/docker/crawler_pool.py b/deploy/docker/crawler_pool.py new file mode 100644 index 00000000..d15102e4 --- /dev/null +++ b/deploy/docker/crawler_pool.py @@ -0,0 +1,60 @@ +# crawler_pool.py (new file) +import asyncio, json, hashlib, time, psutil +from contextlib import suppress +from typing import Dict +from crawl4ai import AsyncWebCrawler, BrowserConfig +from typing import Dict +from utils import load_config + +CONFIG = load_config() + +POOL: Dict[str, AsyncWebCrawler] = {} +LAST_USED: Dict[str, float] = {} +LOCK = asyncio.Lock() + +MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0) # % RAM – refuse new browsers above this +IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 1800) # close if unused for 30 min + +def _sig(cfg: BrowserConfig) -> str: + payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",",":")) + return hashlib.sha1(payload.encode()).hexdigest() + +async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler: + try: + sig = _sig(cfg) + async with LOCK: + if sig in POOL: + LAST_USED[sig] = time.time(); + return POOL[sig] + if psutil.virtual_memory().percent >= MEM_LIMIT: + raise MemoryError("RAM pressure – new browser denied") + crawler = AsyncWebCrawler(config=cfg, thread_safe=False) + await crawler.start() + POOL[sig] = crawler; LAST_USED[sig] = time.time() + return crawler + except MemoryError as e: + raise MemoryError(f"RAM pressure – new browser denied: {e}") + except Exception as e: + raise RuntimeError(f"Failed to start browser: {e}") + finally: + if sig in POOL: + LAST_USED[sig] = time.time() + else: + # If we failed to start the browser, we should remove it from the pool + POOL.pop(sig, None) + LAST_USED.pop(sig, None) + # If we failed to start the browser, we should remove it from the pool +async def close_all(): + async with LOCK: + await asyncio.gather(*(c.close() for c in POOL.values()), return_exceptions=True) + POOL.clear(); LAST_USED.clear() + +async def janitor(): + while True: + await asyncio.sleep(60) + now = time.time() + async with LOCK: + for sig, crawler in list(POOL.items()): + if now - LAST_USED[sig] > IDLE_TTL: + with suppress(Exception): await crawler.close() + POOL.pop(sig, None); LAST_USED.pop(sig, None) diff --git a/deploy/docker/server.py b/deploy/docker/server.py index f577348b..ae60ffa2 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -1,167 +1,200 @@ -# Import from auth.py -from auth import create_access_token, get_token_dependency, TokenRequest -from api import ( - handle_markdown_request, - handle_llm_qa, - handle_stream_crawl_request, - handle_crawl_request, - stream_results, - _get_memory_mb -) -from utils import FilterType, load_config, setup_logging, verify_email_domain -import os -import sys -import time -from typing import List, Optional, Dict, AsyncGenerator +# ───────────────────────── server.py ───────────────────────── +""" +Crawl4AI FastAPI entry‑point +• Browser pool + global page cap +• Rate‑limiting, security, metrics +• /crawl, /crawl/stream, /md, /llm endpoints +""" + +# ── stdlib & 3rd‑party imports ─────────────────────────────── +import os, sys, time, asyncio +from typing import List, Optional, Dict from contextlib import asynccontextmanager -from fastapi import FastAPI, HTTPException, Request, Query, Path, Depends, status -from fastapi.responses import StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse +import pathlib + +from fastapi import ( + FastAPI, HTTPException, Request, Path, Query, Depends +) +from fastapi.responses import ( + StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse +) from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware +from fastapi.staticfiles import StaticFiles + +import ast, crawl4ai as _c4 from pydantic import BaseModel, Field from slowapi import Limiter from slowapi.util import get_remote_address from prometheus_fastapi_instrumentator import Instrumentator from redis import asyncio as aioredis -from crawl4ai import ( - BrowserConfig, - CrawlerRunConfig, - AsyncLogger -) - -from crawler_manager import ( - CrawlerManager, - CrawlerManagerConfig, - PoolTimeoutError, - NoHealthyCrawlerError -) - +# ── internal imports (after sys.path append) ───────────────── sys.path.append(os.path.dirname(os.path.realpath(__file__))) +from utils import ( + FilterType, load_config, setup_logging, verify_email_domain +) +from api import ( + handle_markdown_request, handle_llm_qa, + handle_stream_crawl_request, handle_crawl_request, + stream_results +) +from auth import create_access_token, get_token_dependency, TokenRequest +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig +from crawler_pool import get_crawler, close_all, janitor -__version__ = "0.2.6" - - -class CrawlRequest(BaseModel): - urls: List[str] = Field(min_length=1, max_length=100) - browser_config: Optional[Dict] = Field(default_factory=dict) - crawler_config: Optional[Dict] = Field(default_factory=dict) - - -# Load configuration and setup +# ────────────────── configuration / logging ────────────────── config = load_config() setup_logging(config) -logger = AsyncLogger( - log_file=config["logging"].get("log_file", "app.log"), - verbose=config["logging"].get("verbose", False), - tag_width=10, -) -# Initialize Redis -redis = aioredis.from_url(config["redis"].get("uri", "redis://localhost")) +__version__ = "0.5.1-d1" -# Initialize rate limiter -limiter = Limiter( - key_func=get_remote_address, - default_limits=[config["rate_limiting"]["default_limit"]], - storage_uri=config["rate_limiting"]["storage_uri"] -) +# ── global page semaphore (hard cap) ───────────────────────── +MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30) +GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES) -# --- Initialize Manager (will be done in lifespan) --- -# Load manager config from the main config -manager_config_dict = config.get("crawler_pool", {}) -# Use Pydantic to parse and validate -manager_config = CrawlerManagerConfig(**manager_config_dict) -crawler_manager = CrawlerManager(config=manager_config, logger=logger) - -# --- FastAPI App and Lifespan --- +# import logging +# page_log = logging.getLogger("page_cap") +# orig_arun = AsyncWebCrawler.arun +# async def capped_arun(self, *a, **kw): +# await GLOBAL_SEM.acquire() # ← take slot +# try: +# in_flight = MAX_PAGES - GLOBAL_SEM._value # used permits +# page_log.info("🕸️ pages_in_flight=%s / %s", in_flight, MAX_PAGES) +# return await orig_arun(self, *a, **kw) +# finally: +# GLOBAL_SEM.release() # ← free slot +orig_arun = AsyncWebCrawler.arun +async def capped_arun(self, *a, **kw): + async with GLOBAL_SEM: + return await orig_arun(self, *a, **kw) +AsyncWebCrawler.arun = capped_arun +# ───────────────────── FastAPI lifespan ────────────────────── @asynccontextmanager -async def lifespan(app: FastAPI): - # Startup - logger.info("Starting up the server...") - if manager_config.enabled: - logger.info("Initializing Crawler Manager...") - await crawler_manager.initialize() - app.state.crawler_manager = crawler_manager # Store manager in app state - logger.info("Crawler Manager is enabled.") - else: - logger.warning("Crawler Manager is disabled.") - app.state.crawler_manager = None # Indicate disabled state - - yield # Server runs here - - # Shutdown - logger.info("Shutting down server...") - if app.state.crawler_manager: - logger.info("Shutting down Crawler Manager...") - await app.state.crawler_manager.shutdown() - logger.info("Crawler Manager shut down.") - logger.info("Server shut down.") +async def lifespan(_: FastAPI): + await get_crawler(BrowserConfig( + extra_args=config["crawler"]["browser"].get("extra_args", []), + **config["crawler"]["browser"].get("kwargs", {}), + )) # warm‑up + app.state.janitor = asyncio.create_task(janitor()) # idle GC + yield + app.state.janitor.cancel() + await close_all() +# ───────────────────── FastAPI instance ────────────────────── app = FastAPI( title=config["app"]["title"], version=config["app"]["version"], lifespan=lifespan, ) -# Configure middleware -def setup_security_middleware(app, config): - sec_config = config.get("security", {}) - if sec_config.get("enabled", False): - if sec_config.get("https_redirect", False): - app.add_middleware(HTTPSRedirectMiddleware) - if sec_config.get("trusted_hosts", []) != ["*"]: - app.add_middleware(TrustedHostMiddleware, - allowed_hosts=sec_config["trusted_hosts"]) +# ── static playground ────────────────────────────────────── +STATIC_DIR = pathlib.Path(__file__).parent / "static" / "playground" +if not STATIC_DIR.exists(): + raise RuntimeError(f"Playground assets not found at {STATIC_DIR}") +app.mount( + "/playground", + StaticFiles(directory=STATIC_DIR, html=True), + name="play", +) +# Optional nice‑to‑have: opening the root shows the playground +@app.get("/") +async def root(): + return RedirectResponse("/playground") -setup_security_middleware(app, config) +# ─────────────────── infra / middleware ───────────────────── +redis = aioredis.from_url(config["redis"].get("uri", "redis://localhost")) + +limiter = Limiter( + key_func=get_remote_address, + default_limits=[config["rate_limiting"]["default_limit"]], + storage_uri=config["rate_limiting"]["storage_uri"], +) + +def _setup_security(app_: FastAPI): + sec = config["security"] + if not sec["enabled"]: + return + if sec.get("https_redirect"): + app_.add_middleware(HTTPSRedirectMiddleware) + if sec.get("trusted_hosts", []) != ["*"]: + app_.add_middleware( + TrustedHostMiddleware, allowed_hosts=sec["trusted_hosts"] + ) +_setup_security(app) -# Prometheus instrumentation if config["observability"]["prometheus"]["enabled"]: Instrumentator().instrument(app).expose(app) -# Get token dependency based on config -token_dependency = get_token_dependency(config) - -# Middleware for security headers - +token_dep = get_token_dependency(config) @app.middleware("http") async def add_security_headers(request: Request, call_next): - response = await call_next(request) + resp = await call_next(request) if config["security"]["enabled"]: - response.headers.update(config["security"]["headers"]) - return response + resp.headers.update(config["security"]["headers"]) + return resp + +# ───────────────── safe config‑dump helper ───────────────── +ALLOWED_TYPES = { + "CrawlerRunConfig": CrawlerRunConfig, + "BrowserConfig": BrowserConfig, +} + +def _safe_eval_config(expr: str) -> dict: + """ + Accept exactly one top‑level call to CrawlerRunConfig(...) or BrowserConfig(...). + Whatever is inside the parentheses is fine *except* further function calls + (so no __import__('os') stuff). All public names from crawl4ai are available + when we eval. + """ + tree = ast.parse(expr, mode="eval") + + # must be a single call + if not isinstance(tree.body, ast.Call): + raise ValueError("Expression must be a single constructor call") + + call = tree.body + if not (isinstance(call.func, ast.Name) and call.func.id in {"CrawlerRunConfig", "BrowserConfig"}): + raise ValueError("Only CrawlerRunConfig(...) or BrowserConfig(...) are allowed") + + # forbid nested calls to keep the surface tiny + for node in ast.walk(call): + if isinstance(node, ast.Call) and node is not call: + raise ValueError("Nested function calls are not permitted") + + # expose everything that crawl4ai exports, nothing else + safe_env = {name: getattr(_c4, name) for name in dir(_c4) if not name.startswith("_")} + obj = eval(compile(tree, "", "eval"), {"__builtins__": {}}, safe_env) + return obj.dump() -async def get_manager() -> CrawlerManager: - # Ensure manager exists and is enabled before yielding - if not hasattr(app.state, 'crawler_manager') or app.state.crawler_manager is None: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Crawler service is disabled or not initialized" - ) - if not app.state.crawler_manager.is_enabled(): - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Crawler service is currently disabled" - ) - return app.state.crawler_manager - -# Token endpoint (always available, but usage depends on config) +# ───────────────────────── Schemas ─────────────────────────── +class CrawlRequest(BaseModel): + urls: List[str] = Field(min_length=1, max_length=100) + browser_config: Optional[Dict] = Field(default_factory=dict) + crawler_config: Optional[Dict] = Field(default_factory=dict) +class RawCode(BaseModel): + code: str +# ──────────────────────── Endpoints ────────────────────────── @app.post("/token") -async def get_token(request_data: TokenRequest): - if not verify_email_domain(request_data.email): - raise HTTPException(status_code=400, detail="Invalid email domain") - token = create_access_token({"sub": request_data.email}) - return {"email": request_data.email, "access_token": token, "token_type": "bearer"} +async def get_token(req: TokenRequest): + if not verify_email_domain(req.email): + raise HTTPException(400, "Invalid email domain") + token = create_access_token({"sub": req.email}) + return {"email": req.email, "access_token": token, "token_type": "bearer"} -# Endpoints with conditional auth +@app.post("/config/dump") +async def config_dump(raw: RawCode): + try: + return JSONResponse(_safe_eval_config(raw.code.strip())) + except Exception as e: + raise HTTPException(400, str(e)) @app.get("/md/{url:path}") @@ -171,230 +204,83 @@ async def get_markdown( url: str, f: FilterType = FilterType.FIT, q: Optional[str] = None, - c: Optional[str] = "0", - token_data: Optional[Dict] = Depends(token_dependency) + c: str = "0", + _td: Dict = Depends(token_dep), ): - result = await handle_markdown_request(url, f, q, c, config) - return PlainTextResponse(result) + md = await handle_markdown_request(url, f, q, c, config) + return PlainTextResponse(md) - -@app.get("/llm/{url:path}", description="URL should be without http/https prefix") +@app.get("/llm/{url:path}") async def llm_endpoint( request: Request, url: str = Path(...), q: Optional[str] = Query(None), - token_data: Optional[Dict] = Depends(token_dependency) + _td: Dict = Depends(token_dep), ): if not q: - raise HTTPException( - status_code=400, detail="Query parameter 'q' is required") - if not url.startswith(('http://', 'https://')): - url = 'https://' + url - try: - answer = await handle_llm_qa(url, q, config) - return JSONResponse({"answer": answer}) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - + raise HTTPException(400, "Query parameter 'q' is required") + if not url.startswith(("http://", "https://")): + url = "https://" + url + answer = await handle_llm_qa(url, q, config) + return JSONResponse({"answer": answer}) @app.get("/schema") async def get_schema(): from crawl4ai import BrowserConfig, CrawlerRunConfig - return {"browser": BrowserConfig().dump(), "crawler": CrawlerRunConfig().dump()} - + return {"browser": BrowserConfig().dump(), + "crawler": CrawlerRunConfig().dump()} @app.get(config["observability"]["health_check"]["endpoint"]) async def health(): return {"status": "ok", "timestamp": time.time(), "version": __version__} - @app.get(config["observability"]["prometheus"]["endpoint"]) async def metrics(): - return RedirectResponse(url=config["observability"]["prometheus"]["endpoint"]) - - -@app.get("/browswers") -# Optional dependency -async def health(manager: Optional[CrawlerManager] = Depends(get_manager, use_cache=False)): - base_status = {"status": "ok", "timestamp": time.time(), - "version": __version__} - if manager: - try: - manager_status = await manager.get_status() - base_status["crawler_manager"] = manager_status - except Exception as e: - base_status["crawler_manager"] = { - "status": "error", "detail": str(e)} - else: - base_status["crawler_manager"] = {"status": "disabled"} - return base_status - + return RedirectResponse(config["observability"]["prometheus"]["endpoint"]) @app.post("/crawl") @limiter.limit(config["rate_limiting"]["default_limit"]) async def crawl( request: Request, crawl_request: CrawlRequest, - manager: CrawlerManager = Depends(get_manager), # Use dependency - token_data: Optional[Dict] = Depends(token_dependency) # Keep auth + _td: Dict = Depends(token_dep), ): if not crawl_request.urls: - raise HTTPException( - status_code=400, detail="At least one URL required") - - try: - # Use the manager's context to get a crawler instance - async with manager.get_crawler() as active_crawler: - # Call the actual handler from api.py, passing the acquired crawler - results_dict = await handle_crawl_request( - crawler=active_crawler, # Pass the live crawler instance - urls=crawl_request.urls, - # Pass user-provided configs, these might override pool defaults if needed - # Or the manager/handler could decide how to merge them - browser_config=crawl_request.browser_config or {}, # Ensure dict - crawler_config=crawl_request.crawler_config or {}, # Ensure dict - config=config # Pass the global server config - ) - return JSONResponse(results_dict) - - except PoolTimeoutError as e: - logger.warning(f"Request rejected due to pool timeout: {e}") - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, # Or 429 - detail=f"Crawler resources busy. Please try again later. Timeout: {e}" - ) - except NoHealthyCrawlerError as e: - logger.error(f"Request failed as no healthy crawler available: {e}") - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"Crawler service temporarily unavailable: {e}" - ) - except HTTPException: # Re-raise HTTP exceptions from handler - raise - except Exception as e: - logger.error( - f"Unexpected error during batch crawl processing: {e}", exc_info=True) - # Return generic error, details might be logged by handle_crawl_request - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An unexpected error occurred: {e}" - ) - + raise HTTPException(400, "At least one URL required") + res = await handle_crawl_request( + urls=crawl_request.urls, + browser_config=crawl_request.browser_config, + crawler_config=crawl_request.crawler_config, + config=config, + ) + return JSONResponse(res) @app.post("/crawl/stream") @limiter.limit(config["rate_limiting"]["default_limit"]) async def crawl_stream( request: Request, crawl_request: CrawlRequest, - manager: CrawlerManager = Depends(get_manager), - token_data: Optional[Dict] = Depends(token_dependency) + _td: Dict = Depends(token_dep), ): if not crawl_request.urls: - raise HTTPException( - status_code=400, detail="At least one URL required") - - try: - # THIS IS A BIT WORK OF ART RATHER THAN ENGINEERING - # Acquire the crawler context from the manager - # IMPORTANT: The context needs to be active for the *duration* of the stream - # This structure might be tricky with FastAPI's StreamingResponse which consumes - # the generator *after* the endpoint function returns. - - # --- Option A: Acquire crawler, pass to handler, handler yields --- - # (Requires handler NOT to be async generator itself, but return one) - # async with manager.get_crawler() as active_crawler: - # # Handler returns the generator - # _, results_gen = await handle_stream_crawl_request( - # crawler=active_crawler, - # urls=crawl_request.urls, - # browser_config=crawl_request.browser_config or {}, - # crawler_config=crawl_request.crawler_config or {}, - # config=config - # ) - # # PROBLEM: `active_crawler` context exits before StreamingResponse uses results_gen - # # This releases the semaphore too early. - - # --- Option B: Pass manager to handler, handler uses context internally --- - # (Requires modifying handle_stream_crawl_request signature/logic) - # This seems cleaner. Let's assume api.py is adapted for this. - # We need a way for the generator yielded by stream_results to know when - # to release the semaphore. - - # --- Option C: Create a wrapper generator that handles context --- - async def stream_wrapper(manager: CrawlerManager, crawl_request: CrawlRequest, config: dict) -> AsyncGenerator[bytes, None]: - active_crawler = None - try: - async with manager.get_crawler() as acquired_crawler: - active_crawler = acquired_crawler # Keep reference for cleanup - # Call the handler which returns the raw result generator - _crawler_ref, results_gen = await handle_stream_crawl_request( - crawler=acquired_crawler, - urls=crawl_request.urls, - browser_config=crawl_request.browser_config or {}, - crawler_config=crawl_request.crawler_config or {}, - config=config - ) - # Use the stream_results utility to format and yield - async for data_bytes in stream_results(_crawler_ref, results_gen): - yield data_bytes - except (PoolTimeoutError, NoHealthyCrawlerError) as e: - # Yield a final error message in the stream - error_payload = {"status": "error", "detail": str(e)} - yield (json.dumps(error_payload) + "\n").encode('utf-8') - logger.warning(f"Stream request failed: {e}") - # Re-raise might be better if StreamingResponse handles it? Test needed. - except HTTPException as e: # Catch HTTP exceptions from handler setup - error_payload = {"status": "error", - "detail": e.detail, "status_code": e.status_code} - yield (json.dumps(error_payload) + "\n").encode('utf-8') - logger.warning( - f"Stream request failed with HTTPException: {e.detail}") - except Exception as e: - error_payload = {"status": "error", - "detail": f"Unexpected stream error: {e}"} - yield (json.dumps(error_payload) + "\n").encode('utf-8') - logger.error( - f"Unexpected error during stream processing: {e}", exc_info=True) - # finally: - # Ensure crawler cleanup if stream_results doesn't handle it? - # stream_results *should* call crawler.close(), but only on the - # instance it received. If we pass the *manager* instead, this gets complex. - # Let's stick to passing the acquired_crawler and rely on stream_results. - - # Create the generator using the wrapper - streaming_generator = stream_wrapper(manager, crawl_request, config) - - return StreamingResponse( - streaming_generator, # Use the wrapper - media_type='application/x-ndjson', - headers={'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', 'X-Stream-Status': 'active'} - ) - - except (PoolTimeoutError, NoHealthyCrawlerError) as e: - # These might occur if get_crawler fails *before* stream starts - # Or if the wrapper re-raises them. - logger.warning(f"Stream request rejected before starting: {e}") - status_code = status.HTTP_503_SERVICE_UNAVAILABLE # Or 429 for timeout - # Don't raise HTTPException here, let the wrapper yield the error message. - # If we want to return a non-200 initial status, need more complex handling. - # Return an *empty* stream with error headers? Or just let wrapper yield error. - - async def _error_stream(): - error_payload = {"status": "error", "detail": str(e)} - yield (json.dumps(error_payload) + "\n").encode('utf-8') - return StreamingResponse(_error_stream(), status_code=status_code, media_type='application/x-ndjson') - - except HTTPException: # Re-raise HTTP exceptions from setup - raise - except Exception as e: - logger.error( - f"Unexpected error setting up stream crawl: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An unexpected error occurred setting up the stream: {e}" - ) + raise HTTPException(400, "At least one URL required") + crawler, gen = await handle_stream_crawl_request( + urls=crawl_request.urls, + browser_config=crawl_request.browser_config, + crawler_config=crawl_request.crawler_config, + config=config, + ) + return StreamingResponse( + stream_results(crawler, gen), + media_type="application/x-ndjson", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Stream-Status": "active", + }, + ) +# ────────────────────────── cli ────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run( @@ -402,5 +288,6 @@ if __name__ == "__main__": host=config["app"]["host"], port=config["app"]["port"], reload=config["app"]["reload"], - timeout_keep_alive=config["app"]["timeout_keep_alive"] + timeout_keep_alive=config["app"]["timeout_keep_alive"], ) +# ───────────────────────────────────────────────────────────── diff --git a/deploy/docker/static/playground/index.html b/deploy/docker/static/playground/index.html new file mode 100644 index 00000000..8c2b3fb9 --- /dev/null +++ b/deploy/docker/static/playground/index.html @@ -0,0 +1,813 @@ + + + + + + + Crawl4AI Playground + + + + + + + + + + + + + + + + + + + + +
+

+ 🚀🤖 Crawl4AI Playground + + + + GitHub stars + GitHub forks + + + + + Docs + + + + + + + + @unclecode + +

+ +
+ + +
+
+ + +
+ +
+
+

Request Builder

+ +
+
+ + + +
+ Advanced Config (Python → auto‑JSON) + + +
+ + + + + + + + + + Docs + + + +
+ + +
+
+ +
+ + +
+
+
+ + + + + + +
+
+ + + +
+
+ +
+
+ +
+
{}
+
+ + + + + + +
+
+
+ + + + + + + + \ No newline at end of file diff --git a/tests/memory/cap_test.py b/tests/memory/cap_test.py new file mode 100644 index 00000000..56d7b261 --- /dev/null +++ b/tests/memory/cap_test.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +Hammer /crawl with many concurrent requests to prove GLOBAL_SEM works. +""" + +import asyncio, httpx, json, uuid, argparse + +API = "http://localhost:8020/crawl" +URLS_PER_CALL = 1 # keep it minimal so each arun() == 1 page +CONCURRENT_CALLS = 20 # way above your cap + +payload_template = { + "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + "crawler_config": { + "type": "CrawlerRunConfig", + "params": {"cache_mode": "BYPASS", "verbose": False}, + } +} + +async def one_call(client): + payload = payload_template.copy() + payload["urls"] = [f"https://httpbin.org/anything/{uuid.uuid4()}"] + r = await client.post(API, json=payload) + r.raise_for_status() + return r.json()["server_peak_memory_mb"] + +async def main(): + async with httpx.AsyncClient(timeout=60) as client: + tasks = [asyncio.create_task(one_call(client)) for _ in range(CONCURRENT_CALLS)] + mem_usages = await asyncio.gather(*tasks) + print("Calls finished OK, server peaks reported:", mem_usages) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/memory/test_docker_congif_gen.py b/tests/memory/test_docker_congif_gen.py new file mode 100644 index 00000000..2da26078 --- /dev/null +++ b/tests/memory/test_docker_congif_gen.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +""" +Quick sanity‑check for /config/dump endpoint. + +Usage: + python test_config_dump.py [http://localhost:8020] + +If the server isn’t running, start it first: + uvicorn deploy.docker.server:app --port 8020 +""" + +import sys, json, textwrap, requests + +BASE = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:8020" +URL = f"{BASE.rstrip('/')}/config/dump" + +CASES = [ + # --- CrawlRunConfig variants --- + "CrawlerRunConfig()", + "CrawlerRunConfig(stream=True, cache_mode=CacheMode.BYPASS)", + "CrawlerRunConfig(js_only=True, wait_until='networkidle')", + + # --- BrowserConfig variants --- + "BrowserConfig()", + "BrowserConfig(headless=False, extra_args=['--disable-gpu'])", + "BrowserConfig(browser_mode='builtin', proxy='http://1.2.3.4:8080')", +] + +for code in CASES: + print("\n=== POST:", code) + resp = requests.post(URL, json={"code": code}, timeout=15) + if resp.ok: + print(json.dumps(resp.json(), indent=2)[:400] + "...") + else: + print("ERROR", resp.status_code, resp.text[:200]) diff --git a/tests/memory/test_stress_api.py b/tests/memory/test_stress_api.py index 232964c1..1b4f1a9c 100644 --- a/tests/memory/test_stress_api.py +++ b/tests/memory/test_stress_api.py @@ -24,13 +24,13 @@ from rich.panel import Panel from rich.syntax import Syntax # --- Constants --- -# DEFAULT_API_URL = "http://localhost:11235" # Default port +DEFAULT_API_URL = "http://localhost:11235" # Default port DEFAULT_API_URL = "http://localhost:8020" # Default port -DEFAULT_URL_COUNT = 1000 -DEFAULT_MAX_CONCURRENT_REQUESTS = 5 +DEFAULT_URL_COUNT = 100 +DEFAULT_MAX_CONCURRENT_REQUESTS = 1 DEFAULT_CHUNK_SIZE = 10 DEFAULT_REPORT_PATH = "reports_api" -DEFAULT_STREAM_MODE = False +DEFAULT_STREAM_MODE = True REQUEST_TIMEOUT = 180.0 # Initialize Rich console @@ -77,6 +77,10 @@ class ApiStressTest: self.report_path = pathlib.Path(report_path) self.report_path.mkdir(parents=True, exist_ok=True) self.stream_mode = stream_mode + + # Ignore repo path and set it to current file path + self.repo_path = pathlib.Path(__file__).parent.resolve() + self.test_id = time.strftime("%Y%m%d_%H%M%S") self.results_summary = { diff --git a/tests/memory/test_stress_api_xs.py b/tests/memory/test_stress_api_xs.py new file mode 100644 index 00000000..27248883 --- /dev/null +++ b/tests/memory/test_stress_api_xs.py @@ -0,0 +1,203 @@ +"""Lite Crawl4AI API stress‑tester. + +✔ batch or stream mode (single unified path) +✔ global stats + JSON summary +✔ rich table progress +✔ Typer CLI with presets (quick / soak) + +Usage examples: + python api_stress_test.py # uses quick preset + python api_stress_test.py soak # 5 K URLs stress run + python api_stress_test.py --urls 200 --concurrent 10 --chunk 20 +""" + +from __future__ import annotations + +import asyncio, json, time, uuid, pathlib, statistics +from typing import List, Dict, Optional + +import httpx, typer +from rich.console import Console +from rich.table import Table + +# ───────────────────────── defaults / presets ────────────────────────── +PRESETS = { + "quick": dict(urls=1, concurrent=1, chunk=1, stream=False), + "debug": dict(urls=10, concurrent=2, chunk=5, stream=False), + "soak": dict(urls=5000, concurrent=20, chunk=50, stream=True), +} + +API_HEALTH_ENDPOINT = "/health" +REQUEST_TIMEOUT = 180.0 + +console = Console() +app = typer.Typer(add_completion=False, rich_markup_mode="rich") + +# ───────────────────────── helpers ───────────────────────────────────── +async def _check_health(client: httpx.AsyncClient) -> None: + resp = await client.get(API_HEALTH_ENDPOINT, timeout=10) + resp.raise_for_status() + console.print(f"[green]Server healthy — version {resp.json().get('version','?')}[/]") + +async def _iter_results(resp: httpx.Response, stream: bool): + """Yield result dicts from batch JSON or ND‑JSON stream.""" + if stream: + async for line in resp.aiter_lines(): + if not line: + continue + rec = json.loads(line) + if rec.get("status") == "completed": + break + yield rec + else: + data = resp.json() + for rec in data.get("results", []): + yield rec, data # rec + whole payload for memory delta/peak + +async def _consume_stream(resp: httpx.Response) -> Dict: + stats = {"success_urls": 0, "failed_urls": 0, "mem_metric": 0.0} + async for line in resp.aiter_lines(): + if not line: + continue + rec = json.loads(line) + if rec.get("status") == "completed": + break + if rec.get("success"): + stats["success_urls"] += 1 + else: + stats["failed_urls"] += 1 + mem = rec.get("server_memory_mb") + if mem is not None: + stats["mem_metric"] = max(stats["mem_metric"], float(mem)) + return stats + +def _consume_batch(body: Dict) -> Dict: + stats = {"success_urls": 0, "failed_urls": 0} + for rec in body.get("results", []): + if rec.get("success"): + stats["success_urls"] += 1 + else: + stats["failed_urls"] += 1 + stats["mem_metric"] = body.get("server_memory_delta_mb") + stats["peak"] = body.get("server_peak_memory_mb") + return stats + +async def _fetch_chunk( + client: httpx.AsyncClient, + urls: List[str], + stream: bool, + semaphore: asyncio.Semaphore, +) -> Dict: + endpoint = "/crawl/stream" if stream else "/crawl" + payload = { + "urls": urls, + "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + "crawler_config": {"type": "CrawlerRunConfig", + "params": {"cache_mode": "BYPASS", "stream": stream}}, + } + + async with semaphore: + start = time.perf_counter() + + if stream: + # ---- streaming request ---- + async with client.stream("POST", endpoint, json=payload) as resp: + resp.raise_for_status() + stats = await _consume_stream(resp) + else: + # ---- batch request ---- + resp = await client.post(endpoint, json=payload) + resp.raise_for_status() + stats = _consume_batch(resp.json()) + + stats["elapsed"] = time.perf_counter() - start + return stats + + +# ───────────────────────── core runner ───────────────────────────────── +async def _run(api: str, urls: int, concurrent: int, chunk: int, stream: bool, report: pathlib.Path): + client = httpx.AsyncClient(base_url=api, timeout=REQUEST_TIMEOUT, limits=httpx.Limits(max_connections=concurrent+5)) + await _check_health(client) + + url_list = [f"https://httpbin.org/anything/{uuid.uuid4()}" for _ in range(urls)] + chunks = [url_list[i:i+chunk] for i in range(0, len(url_list), chunk)] + sem = asyncio.Semaphore(concurrent) + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("Batch", style="dim", width=6) + table.add_column("Success/Fail", width=12) + table.add_column("Mem", width=14) + table.add_column("Time (s)") + + agg_success = agg_fail = 0 + deltas, peaks = [], [] + + start = time.perf_counter() + tasks = [asyncio.create_task(_fetch_chunk(client, c, stream, sem)) for c in chunks] + for idx, coro in enumerate(asyncio.as_completed(tasks), 1): + res = await coro + agg_success += res["success_urls"] + agg_fail += res["failed_urls"] + if res["mem_metric"] is not None: + deltas.append(res["mem_metric"]) + if res["peak"] is not None: + peaks.append(res["peak"]) + + mem_txt = f"{res['mem_metric']:.1f}" if res["mem_metric"] is not None else "‑" + if res["peak"] is not None: + mem_txt = f"{res['peak']:.1f}/{mem_txt}" + + table.add_row(str(idx), f"{res['success_urls']}/{res['failed_urls']}", mem_txt, f"{res['elapsed']:.2f}") + + console.print(table) + total_time = time.perf_counter() - start + + summary = { + "urls": urls, + "concurrent": concurrent, + "chunk": chunk, + "stream": stream, + "success_urls": agg_success, + "failed_urls": agg_fail, + "elapsed_sec": round(total_time, 2), + "avg_mem": round(statistics.mean(deltas), 2) if deltas else None, + "max_mem": max(deltas) if deltas else None, + "avg_peak": round(statistics.mean(peaks), 2) if peaks else None, + "max_peak": max(peaks) if peaks else None, + } + console.print("\n[bold green]Done:[/]" , summary) + + report.mkdir(parents=True, exist_ok=True) + path = report / f"api_test_{int(time.time())}.json" + path.write_text(json.dumps(summary, indent=2)) + console.print(f"[green]Summary → {path}") + + await client.aclose() + +# ───────────────────────── Typer CLI ────────────────────────────────── +@app.command() +def main( + preset: str = typer.Argument("quick", help="quick / debug / soak or custom"), + api_url: str = typer.Option("http://localhost:8020", show_default=True), + urls: int = typer.Option(None, help="Total URLs to crawl"), + concurrent: int = typer.Option(None, help="Concurrent API requests"), + chunk: int = typer.Option(None, help="URLs per request"), + stream: bool = typer.Option(None, help="Use /crawl/stream"), + report: pathlib.Path = typer.Option("reports_api", help="Where to save JSON summary"), +): + """Run a stress test against a running Crawl4AI API server.""" + if preset not in PRESETS and any(v is None for v in (urls, concurrent, chunk, stream)): + console.print(f"[red]Unknown preset '{preset}' and custom params missing[/]") + raise typer.Exit(1) + + cfg = PRESETS.get(preset, {}) + urls = urls or cfg.get("urls") + concurrent = concurrent or cfg.get("concurrent") + chunk = chunk or cfg.get("chunk") + stream = stream if stream is not None else cfg.get("stream", False) + + console.print(f"[cyan]API:[/] {api_url} | URLs: {urls} | Concurrency: {concurrent} | Chunk: {chunk} | Stream: {stream}") + asyncio.run(_run(api_url, urls, concurrent, chunk, stream, report)) + +if __name__ == "__main__": + app()