From 1630fbdafe7d5c081b44bf50641f053cdf83d767 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Wed, 12 Mar 2025 19:05:24 +0800 Subject: [PATCH 1/7] feat(monitor): add real-time crawler monitoring system with memory management Implements a comprehensive monitoring and visualization system for tracking web crawler operations in real-time. The system includes: - Terminal-based dashboard with rich UI for displaying task statuses - Memory pressure monitoring and adaptive dispatch control - Queue statistics and performance metrics tracking - Detailed task progress visualization - Stress testing framework for memory management This addition helps operators track crawler performance and manage memory usage more effectively. --- .gitignore | 3 + crawl4ai/__init__.py | 5 +- crawl4ai/async_dispatcher.py | 635 +++++++++-------- crawl4ai/components/crawler_monitor.py | 837 +++++++++++++++++++++++ crawl4ai/models.py | 10 + docs/examples/crawler_monitor_example.py | 209 ++++++ tests/memory/test_crawler_monitor.py | 168 +++++ tests/memory/test_dispatcher_stress.py | 410 +++++++++++ 8 files changed, 1956 insertions(+), 321 deletions(-) create mode 100644 crawl4ai/components/crawler_monitor.py create mode 100644 docs/examples/crawler_monitor_example.py create mode 100644 tests/memory/test_crawler_monitor.py create mode 100644 tests/memory/test_dispatcher_stress.py diff --git a/.gitignore b/.gitignore index db833e57..a290ab7d 100644 --- a/.gitignore +++ b/.gitignore @@ -255,3 +255,6 @@ continue_config.json .llm.env .private/ + +CLAUDE_MONITOR.md +CLAUDE.md \ No newline at end of file diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index ff238964..0ab808f3 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -33,13 +33,12 @@ from .content_filter_strategy import ( LLMContentFilter, RelevantContentFilter, ) -from .models import CrawlResult, MarkdownGenerationResult +from .models import CrawlResult, MarkdownGenerationResult, DisplayMode +from .components.crawler_monitor import CrawlerMonitor from .async_dispatcher import ( MemoryAdaptiveDispatcher, SemaphoreDispatcher, RateLimiter, - CrawlerMonitor, - DisplayMode, BaseDispatcher, ) from .docker_client import Crawl4aiDockerClient diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index b587d011..b97d59a7 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -4,17 +4,15 @@ from .models import ( CrawlResult, CrawlerTaskResult, CrawlStatus, - DisplayMode, - CrawlStats, DomainState, ) -from rich.live import Live -from rich.table import Table -from rich.console import Console -from rich import box -from datetime import timedelta, datetime +from .components.crawler_monitor import CrawlerMonitor + +from .types import AsyncWebCrawler + from collections.abc import AsyncGenerator + import time import psutil import asyncio @@ -24,8 +22,6 @@ from urllib.parse import urlparse import random from abc import ABC, abstractmethod -from math import inf as infinity - class RateLimiter: def __init__( @@ -87,201 +83,6 @@ class RateLimiter: return True -class CrawlerMonitor: - def __init__( - self, - max_visible_rows: int = 15, - display_mode: DisplayMode = DisplayMode.DETAILED, - ): - self.console = Console() - self.max_visible_rows = max_visible_rows - self.display_mode = display_mode - self.stats: Dict[str, CrawlStats] = {} - self.process = psutil.Process() - self.start_time = time.time() - self.live = Live(self._create_table(), refresh_per_second=2) - - def start(self): - self.live.start() - - def stop(self): - self.live.stop() - - def add_task(self, task_id: str, url: str): - self.stats[task_id] = CrawlStats( - task_id=task_id, url=url, status=CrawlStatus.QUEUED - ) - self.live.update(self._create_table()) - - def update_task(self, task_id: str, **kwargs): - if task_id in self.stats: - for key, value in kwargs.items(): - setattr(self.stats[task_id], key, value) - self.live.update(self._create_table()) - - def _create_aggregated_table(self) -> Table: - """Creates a compact table showing only aggregated statistics""" - table = Table( - box=box.ROUNDED, - title="Crawler Status Overview", - title_style="bold magenta", - header_style="bold blue", - show_lines=True, - ) - - # Calculate statistics - total_tasks = len(self.stats) - queued = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED - ) - in_progress = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - # Memory statistics - current_memory = self.process.memory_info().rss / (1024 * 1024) - total_task_memory = sum(stat.memory_usage for stat in self.stats.values()) - peak_memory = max( - (stat.peak_memory for stat in self.stats.values()), default=0.0 - ) - - # Duration - duration = time.time() - self.start_time - - # Create status row - table.add_column("Status", style="bold cyan") - table.add_column("Count", justify="right") - table.add_column("Percentage", justify="right") - - table.add_row("Total Tasks", str(total_tasks), "100%") - table.add_row( - "[yellow]In Queue[/yellow]", - str(queued), - f"{(queued / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[blue]In Progress[/blue]", - str(in_progress), - f"{(in_progress / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[green]Completed[/green]", - str(completed), - f"{(completed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[red]Failed[/red]", - str(failed), - f"{(failed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - - # Add memory information - table.add_section() - table.add_row( - "[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", "" - ) - table.add_row( - "[yellow]Runtime[/yellow]", - str(timedelta(seconds=int(duration))), - "", - ) - - return table - - def _create_detailed_table(self) -> Table: - table = Table( - box=box.ROUNDED, - title="Crawler Performance Monitor", - title_style="bold magenta", - header_style="bold blue", - ) - - # Add columns - table.add_column("Task ID", style="cyan", no_wrap=True) - table.add_column("URL", style="cyan", no_wrap=True) - table.add_column("Status", style="bold") - table.add_column("Memory (MB)", justify="right") - table.add_column("Peak (MB)", justify="right") - table.add_column("Duration", justify="right") - table.add_column("Info", style="italic") - - # Add summary row - total_memory = sum(stat.memory_usage for stat in self.stats.values()) - active_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - table.add_row( - "[bold yellow]SUMMARY", - f"Total: {len(self.stats)}", - f"Active: {active_count}", - f"{total_memory:.1f}", - f"{self.process.memory_info().rss / (1024 * 1024):.1f}", - str( - timedelta( - seconds=int(time.time() - self.start_time) - ) - ), - f"✓{completed_count} ✗{failed_count}", - style="bold", - ) - - table.add_section() - - # Add rows for each task - visible_stats = sorted( - self.stats.values(), - key=lambda x: ( - x.status != CrawlStatus.IN_PROGRESS, - x.status != CrawlStatus.QUEUED, - x.end_time or infinity, - ), - )[: self.max_visible_rows] - - for stat in visible_stats: - status_style = { - CrawlStatus.QUEUED: "white", - CrawlStatus.IN_PROGRESS: "yellow", - CrawlStatus.COMPLETED: "green", - CrawlStatus.FAILED: "red", - }[stat.status] - - table.add_row( - stat.task_id[:8], # Show first 8 chars of task ID - stat.url[:40] + "..." if len(stat.url) > 40 else stat.url, - f"[{status_style}]{stat.status.value}[/{status_style}]", - f"{stat.memory_usage:.1f}", - f"{stat.peak_memory:.1f}", - stat.duration, - stat.error_message[:40] if stat.error_message else "", - ) - - return table - - def _create_table(self) -> Table: - """Creates the appropriate table based on display mode""" - if self.display_mode == DisplayMode.AGGREGATED: - return self._create_aggregated_table() - return self._create_detailed_table() - class BaseDispatcher(ABC): def __init__( @@ -309,7 +110,7 @@ class BaseDispatcher(ABC): async def run_urls( self, urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, # noqa: F821 config: CrawlerRunConfig, monitor: Optional[CrawlerMonitor] = None, ) -> List[CrawlerTaskResult]: @@ -320,71 +121,144 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): def __init__( self, memory_threshold_percent: float = 90.0, + critical_threshold_percent: float = 95.0, # New critical threshold + recovery_threshold_percent: float = 85.0, # New recovery threshold check_interval: float = 1.0, max_session_permit: int = 20, - memory_wait_timeout: float = 300.0, # 5 minutes default timeout + fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs rate_limiter: Optional[RateLimiter] = None, monitor: Optional[CrawlerMonitor] = None, ): super().__init__(rate_limiter, monitor) self.memory_threshold_percent = memory_threshold_percent + self.critical_threshold_percent = critical_threshold_percent + self.recovery_threshold_percent = recovery_threshold_percent self.check_interval = check_interval self.max_session_permit = max_session_permit - self.memory_wait_timeout = memory_wait_timeout - self.result_queue = asyncio.Queue() # Queue for storing results - + self.fairness_timeout = fairness_timeout + self.result_queue = asyncio.Queue() + self.task_queue = asyncio.PriorityQueue() # Priority queue for better management + self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode + self.current_memory_percent = 0.0 # Track current memory usage + + async def _memory_monitor_task(self): + """Background task to continuously monitor memory usage and update state""" + while True: + self.current_memory_percent = psutil.virtual_memory().percent + + # Enter memory pressure mode if we cross the threshold + if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent: + self.memory_pressure_mode = True + if self.monitor: + self.monitor.update_memory_status("PRESSURE") + + # Exit memory pressure mode if we go below recovery threshold + elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent: + self.memory_pressure_mode = False + if self.monitor: + self.monitor.update_memory_status("NORMAL") + + # In critical mode, we might need to take more drastic action + if self.current_memory_percent >= self.critical_threshold_percent: + if self.monitor: + self.monitor.update_memory_status("CRITICAL") + # We could implement additional memory-saving measures here + + await asyncio.sleep(self.check_interval) + + def _get_priority_score(self, wait_time: float, retry_count: int) -> float: + """Calculate priority score (lower is higher priority) + - URLs waiting longer than fairness_timeout get higher priority + - More retry attempts decreases priority + """ + if wait_time > self.fairness_timeout: + # High priority for long-waiting URLs + return -wait_time + # Standard priority based on retries + return retry_count + async def crawl_url( self, url: str, config: CrawlerRunConfig, task_id: str, + retry_count: int = 0, ) -> CrawlerTaskResult: start_time = time.time() error_message = "" memory_usage = peak_memory = 0.0 - + + # Get starting memory for accurate measurement + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + try: if self.monitor: self.monitor.update_task( - task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time + task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + retry_count=retry_count ) + self.concurrent_sessions += 1 - + if self.rate_limiter: await self.rate_limiter.wait_if_needed(url) - - process = psutil.Process() - start_memory = process.memory_info().rss / (1024 * 1024) + + # Check if we're in critical memory state + if self.current_memory_percent >= self.critical_threshold_percent: + # Requeue this task with increased priority and retry count + enqueue_time = time.time() + priority = self._get_priority_score(enqueue_time - start_time, retry_count + 1) + await self.task_queue.put((priority, (url, task_id, retry_count + 1, enqueue_time))) + + # Update monitoring + if self.monitor: + self.monitor.update_task( + task_id, + status=CrawlStatus.QUEUED, + error_message="Requeued due to critical memory pressure" + ) + + # Return placeholder result with requeued status + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=CrawlResult( + url=url, html="", metadata={"status": "requeued"}, + success=False, error_message="Requeued due to critical memory pressure" + ), + memory_usage=0, + peak_memory=0, + start_time=start_time, + end_time=time.time(), + error_message="Requeued due to critical memory pressure", + retry_count=retry_count + 1 + ) + + # Execute the crawl result = await self.crawler.arun(url, config=config, session_id=task_id) + + # Measure memory usage end_memory = process.memory_info().rss / (1024 * 1024) - memory_usage = peak_memory = end_memory - start_memory - + + # Handle rate limiting if self.rate_limiter and result.status_code: if not self.rate_limiter.update_delay(url, result.status_code): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - result = CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=time.time(), - error_message=error_message, - ) - await self.result_queue.put(result) - return result - + + # Update status based on result if not result.success: error_message = result.error_message if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) elif self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) - + except Exception as e: error_message = str(e) if self.monitor: @@ -392,7 +266,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): result = CrawlResult( url=url, html="", metadata={}, success=False, error_message=str(e) ) - + finally: end_time = time.time() if self.monitor: @@ -402,9 +276,10 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): memory_usage=memory_usage, peak_memory=peak_memory, error_message=error_message, + retry_count=retry_count ) self.concurrent_sessions -= 1 - + return CrawlerTaskResult( task_id=task_id, url=url, @@ -414,116 +289,240 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): start_time=start_time, end_time=end_time, error_message=error_message, + retry_count=retry_count ) - + async def run_urls( self, urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, config: CrawlerRunConfig, ) -> List[CrawlerTaskResult]: self.crawler = crawler - + + # Start the memory monitor task + memory_monitor = asyncio.create_task(self._memory_monitor_task()) + if self.monitor: self.monitor.start() - + + results = [] + try: - pending_tasks = [] - active_tasks = [] - task_queue = [] - - for url in urls: - task_id = str(uuid.uuid4()) - if self.monitor: - self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) - - while task_queue or active_tasks: - wait_start_time = time.time() - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - # Check if we've exceeded the timeout - if time.time() - wait_start_time > self.memory_wait_timeout: - raise MemoryError( - f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" - ) - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks: - await asyncio.sleep(self.check_interval) - continue - - done, pending = await asyncio.wait( - active_tasks, return_when=asyncio.FIRST_COMPLETED - ) - - pending_tasks.extend(done) - active_tasks = list(pending) - - return await asyncio.gather(*pending_tasks) - finally: - if self.monitor: - self.monitor.stop() - - async def run_urls_stream( - self, - urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 - config: CrawlerRunConfig, - ) -> AsyncGenerator[CrawlerTaskResult, None]: - self.crawler = crawler - if self.monitor: - self.monitor.start() - - try: - active_tasks = [] - task_queue = [] - completed_count = 0 - total_urls = len(urls) - # Initialize task queue for url in urls: task_id = str(uuid.uuid4()) if self.monitor: self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) - - while completed_count < total_urls: - # Start new tasks if memory permits - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks and not task_queue: - break - - # Wait for any task to complete and yield results + # Add to queue with initial priority 0, retry count 0, and current time + await self.task_queue.put((0, (url, task_id, 0, time.time()))) + + active_tasks = [] + + # Process until both queues are empty + while not self.task_queue.empty() or active_tasks: + # If memory pressure is low, start new tasks + if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: + try: + # Try to get a task with timeout to avoid blocking indefinitely + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + except asyncio.TimeoutError: + # No tasks in queue, that's fine + pass + + # Wait for completion even if queue is starved if active_tasks: done, pending = await asyncio.wait( active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED ) + + # Process completed tasks for completed_task in done: result = await completed_task - completed_count += 1 - yield result + results.append(result) + + # Update active tasks list active_tasks = list(pending) else: - await asyncio.sleep(self.check_interval) + # If no active tasks but still waiting, sleep briefly + await asyncio.sleep(self.check_interval / 2) + + # Update priorities for waiting tasks if needed + await self._update_queue_priorities() + + return results + except Exception as e: + if self.monitor: + self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}") + finally: + # Clean up + memory_monitor.cancel() if self.monitor: self.monitor.stop() - + + async def _update_queue_priorities(self): + """Periodically update priorities of items in the queue to prevent starvation""" + # Skip if queue is empty + if self.task_queue.empty(): + return + + # Use a drain-and-refill approach to update all priorities + temp_items = [] + + # Drain the queue (with a safety timeout to prevent blocking) + try: + drain_start = time.time() + while not self.task_queue.empty() and time.time() - drain_start < 5.0: # 5 second safety timeout + try: + # Get item from queue with timeout + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Calculate new priority based on current wait time + current_time = time.time() + wait_time = current_time - enqueue_time + new_priority = self._get_priority_score(wait_time, retry_count) + + # Store with updated priority + temp_items.append((new_priority, (url, task_id, retry_count, enqueue_time))) + + # Update monitoring stats for this task + if self.monitor and task_id in self.monitor.stats: + self.monitor.update_task(task_id, wait_time=wait_time) + + except asyncio.TimeoutError: + # Queue might be empty or very slow + break + except Exception as e: + # If anything goes wrong, make sure we refill the queue with what we've got + self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}") + + # Calculate queue statistics + if temp_items and self.monitor: + total_queued = len(temp_items) + wait_times = [item[1][3] for item in temp_items] + highest_wait_time = time.time() - min(wait_times) if wait_times else 0 + avg_wait_time = sum(time.time() - t for t in wait_times) / len(wait_times) if wait_times else 0 + + # Update queue statistics in monitor + self.monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + + # Sort by priority (lowest number = highest priority) + temp_items.sort(key=lambda x: x[0]) + + # Refill the queue with updated priorities + for item in temp_items: + await self.task_queue.put(item) + + async def run_urls_stream( + self, + urls: List[str], + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlerTaskResult, None]: + self.crawler = crawler + + # Start the memory monitor task + memory_monitor = asyncio.create_task(self._memory_monitor_task()) + + if self.monitor: + self.monitor.start() + + try: + # Initialize task queue + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + # Add to queue with initial priority 0, retry count 0, and current time + await self.task_queue.put((0, (url, task_id, 0, time.time()))) + + active_tasks = [] + completed_count = 0 + total_urls = len(urls) + + while completed_count < total_urls: + # If memory pressure is low, start new tasks + if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: + try: + # Try to get a task with timeout + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + except asyncio.TimeoutError: + # No tasks in queue, that's fine + pass + + # Process completed tasks and yield results + if active_tasks: + done, pending = await asyncio.wait( + active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED + ) + + for completed_task in done: + result = await completed_task + + # Only count as completed if it wasn't requeued + if "requeued" not in result.error_message: + completed_count += 1 + yield result + + # Update active tasks list + active_tasks = list(pending) + else: + # If no active tasks but still waiting, sleep briefly + await asyncio.sleep(self.check_interval / 2) + + # Update priorities for waiting tasks if needed + await self._update_queue_priorities() + + finally: + # Clean up + memory_monitor.cancel() + if self.monitor: + self.monitor.stop() + class SemaphoreDispatcher(BaseDispatcher): def __init__( @@ -620,7 +619,7 @@ class SemaphoreDispatcher(BaseDispatcher): async def run_urls( self, - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, # noqa: F821 urls: List[str], config: CrawlerRunConfig, ) -> List[CrawlerTaskResult]: @@ -644,4 +643,4 @@ class SemaphoreDispatcher(BaseDispatcher): return await asyncio.gather(*tasks, return_exceptions=True) finally: if self.monitor: - self.monitor.stop() + self.monitor.stop() \ No newline at end of file diff --git a/crawl4ai/components/crawler_monitor.py b/crawl4ai/components/crawler_monitor.py new file mode 100644 index 00000000..49bf9a15 --- /dev/null +++ b/crawl4ai/components/crawler_monitor.py @@ -0,0 +1,837 @@ +import time +import uuid +import threading +import psutil +from datetime import datetime, timedelta +from typing import Dict, Optional, List +import threading +from rich.console import Console +from rich.layout import Layout +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from rich.live import Live +from rich import box +from ..models import CrawlStatus + +class TerminalUI: + """Terminal user interface for CrawlerMonitor using rich library.""" + + def __init__(self, refresh_rate: float = 1.0, max_width: int = 120): + """ + Initialize the terminal UI. + + Args: + refresh_rate: How often to refresh the UI (in seconds) + max_width: Maximum width of the UI in characters + """ + self.console = Console(width=max_width) + self.layout = Layout() + self.refresh_rate = refresh_rate + self.stop_event = threading.Event() + self.ui_thread = None + self.monitor = None # Will be set by CrawlerMonitor + self.max_width = max_width + + # Setup layout - vertical layout (top to bottom) + self.layout.split( + Layout(name="header", size=3), + Layout(name="pipeline_status", size=10), + Layout(name="task_details", ratio=1), + Layout(name="footer", size=3) # Increased footer size to fit all content + ) + + def start(self, monitor): + """Start the UI thread.""" + self.monitor = monitor + self.stop_event.clear() + self.ui_thread = threading.Thread(target=self._ui_loop) + self.ui_thread.daemon = True + self.ui_thread.start() + + def stop(self): + """Stop the UI thread.""" + if self.ui_thread and self.ui_thread.is_alive(): + self.stop_event.set() + # Only try to join if we're not in the UI thread + # This prevents "cannot join current thread" errors + if threading.current_thread() != self.ui_thread: + self.ui_thread.join(timeout=5.0) + + def _ui_loop(self): + """Main UI rendering loop.""" + import sys + import select + import termios + import tty + + # Setup terminal for non-blocking input + old_settings = termios.tcgetattr(sys.stdin) + try: + tty.setcbreak(sys.stdin.fileno()) + + # Use Live display to render the UI + with Live(self.layout, refresh_per_second=1/self.refresh_rate, screen=True) as live: + self.live = live # Store the live display for updates + + # Main UI loop + while not self.stop_event.is_set(): + self._update_display() + + # Check for key press (non-blocking) + if select.select([sys.stdin], [], [], 0)[0]: + key = sys.stdin.read(1) + # Check for 'q' to quit + if key == 'q': + # Signal stop but don't call monitor.stop() from UI thread + # as it would cause the thread to try to join itself + self.stop_event.set() + self.monitor.is_running = False + break + + time.sleep(self.refresh_rate) + + # Just check if the monitor was stopped + if not self.monitor.is_running: + break + finally: + # Restore terminal settings + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) + + def _update_display(self): + """Update the terminal display with current statistics.""" + if not self.monitor: + return + + # Update crawler status panel + self.layout["header"].update(self._create_status_panel()) + + # Update pipeline status panel and task details panel + self.layout["pipeline_status"].update(self._create_pipeline_panel()) + self.layout["task_details"].update(self._create_task_details_panel()) + + # Update footer + self.layout["footer"].update(self._create_footer()) + + def _create_status_panel(self) -> Panel: + """Create the crawler status panel.""" + summary = self.monitor.get_summary() + + # Format memory status with icon + memory_status = self.monitor.get_memory_status() + memory_icon = "🟢" # Default NORMAL + if memory_status == "PRESSURE": + memory_icon = "🟠" + elif memory_status == "CRITICAL": + memory_icon = "🔴" + + # Get current memory usage + current_memory = psutil.Process().memory_info().rss / (1024 * 1024) # MB + memory_percent = (current_memory / psutil.virtual_memory().total) * 100 + + # Format runtime + runtime = self.monitor._format_time(time.time() - self.monitor.start_time if self.monitor.start_time else 0) + + # Create the status text + status_text = Text() + status_text.append(f"Web Crawler Dashboard | Runtime: {runtime} | Memory: {memory_percent:.1f}% {memory_icon}\n") + status_text.append(f"Status: {memory_status} | URLs: {summary['urls_completed']}/{summary['urls_total']} | ") + status_text.append(f"Peak Mem: {summary['peak_memory_percent']:.1f}% at {self.monitor._format_time(summary['peak_memory_time'])}") + + return Panel(status_text, title="Crawler Status", border_style="blue") + + def _create_pipeline_panel(self) -> Panel: + """Create the pipeline status panel.""" + summary = self.monitor.get_summary() + queue_stats = self.monitor.get_queue_stats() + + # Create a table for status counts + table = Table(show_header=True, box=None) + table.add_column("Status", style="cyan") + table.add_column("Count", justify="right") + table.add_column("Percentage", justify="right") + table.add_column("Stat", style="cyan") + table.add_column("Value", justify="right") + + # Calculate overall progress + progress = f"{summary['urls_completed']}/{summary['urls_total']}" + progress_percent = f"{summary['completion_percentage']:.1f}%" + + # Add rows for each status + table.add_row( + "Overall Progress", + progress, + progress_percent, + "Est. Completion", + summary.get('estimated_completion_time', "N/A") + ) + + # Add rows for each status + status_counts = summary['status_counts'] + total = summary['urls_total'] or 1 # Avoid division by zero + + # Status rows + table.add_row( + "Completed", + str(status_counts.get(CrawlStatus.COMPLETED.name, 0)), + f"{status_counts.get(CrawlStatus.COMPLETED.name, 0) / total * 100:.1f}%", + "Avg. Time/URL", + f"{summary.get('avg_task_duration', 0):.2f}s" + ) + + table.add_row( + "Failed", + str(status_counts.get(CrawlStatus.FAILED.name, 0)), + f"{status_counts.get(CrawlStatus.FAILED.name, 0) / total * 100:.1f}%", + "Concurrent Tasks", + str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0)) + ) + + table.add_row( + "In Progress", + str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0)), + f"{status_counts.get(CrawlStatus.IN_PROGRESS.name, 0) / total * 100:.1f}%", + "Queue Size", + str(queue_stats['total_queued']) + ) + + table.add_row( + "Queued", + str(status_counts.get(CrawlStatus.QUEUED.name, 0)), + f"{status_counts.get(CrawlStatus.QUEUED.name, 0) / total * 100:.1f}%", + "Max Wait Time", + f"{queue_stats['highest_wait_time']:.1f}s" + ) + + # Requeued is a special case as it's not a status + requeued_count = summary.get('requeued_count', 0) + table.add_row( + "Requeued", + str(requeued_count), + f"{summary.get('requeue_rate', 0):.1f}%", + "Avg Wait Time", + f"{queue_stats['avg_wait_time']:.1f}s" + ) + + # Add empty row for spacing + table.add_row( + "", + "", + "", + "Requeue Rate", + f"{summary.get('requeue_rate', 0):.1f}%" + ) + + return Panel(table, title="Pipeline Status", border_style="green") + + def _create_task_details_panel(self) -> Panel: + """Create the task details panel.""" + # Create a table for task details + table = Table(show_header=True, expand=True) + table.add_column("Task ID", style="cyan", no_wrap=True, width=10) + table.add_column("URL", style="blue", ratio=3) + table.add_column("Status", style="green", width=15) + table.add_column("Memory", justify="right", width=8) + table.add_column("Peak", justify="right", width=8) + table.add_column("Duration", justify="right", width=10) + + # Get all task stats + task_stats = self.monitor.get_all_task_stats() + + # Add summary row + active_tasks = sum(1 for stats in task_stats.values() + if stats['status'] == CrawlStatus.IN_PROGRESS.name) + + total_memory = sum(stats['memory_usage'] for stats in task_stats.values()) + total_peak = sum(stats['peak_memory'] for stats in task_stats.values()) + + # Summary row with separators + table.add_row( + "SUMMARY", + f"Total: {len(task_stats)}", + f"Active: {active_tasks}", + f"{total_memory:.1f}", + f"{total_peak:.1f}", + "N/A" + ) + + # Add a separator + table.add_row("—" * 10, "—" * 20, "—" * 10, "—" * 8, "—" * 8, "—" * 10) + + # Status icons + status_icons = { + CrawlStatus.QUEUED.name: "⏳", + CrawlStatus.IN_PROGRESS.name: "🔄", + CrawlStatus.COMPLETED.name: "✅", + CrawlStatus.FAILED.name: "❌" + } + + # Calculate how many rows we can display based on available space + # We can display more rows now that we have a dedicated panel + display_count = min(len(task_stats), 20) # Display up to 20 tasks + + # Add rows for each task + for task_id, stats in sorted( + list(task_stats.items())[:display_count], + # Sort: 1. IN_PROGRESS first, 2. QUEUED, 3. COMPLETED/FAILED by recency + key=lambda x: ( + 0 if x[1]['status'] == CrawlStatus.IN_PROGRESS.name else + 1 if x[1]['status'] == CrawlStatus.QUEUED.name else + 2, + -1 * (x[1].get('end_time', 0) or 0) # Most recent first + ) + ): + # Truncate task_id and URL for display + short_id = task_id[:8] + url = stats['url'] + if len(url) > 50: # Allow longer URLs in the dedicated panel + url = url[:47] + "..." + + # Format status with icon + status = f"{status_icons.get(stats['status'], '?')} {stats['status']}" + + # Add row + table.add_row( + short_id, + url, + status, + f"{stats['memory_usage']:.1f}", + f"{stats['peak_memory']:.1f}", + stats['duration'] if 'duration' in stats else "0:00" + ) + + return Panel(table, title="Task Details", border_style="yellow") + + def _create_footer(self) -> Panel: + """Create the footer panel.""" + from rich.columns import Columns + from rich.align import Align + + memory_status = self.monitor.get_memory_status() + memory_icon = "🟢" # Default NORMAL + if memory_status == "PRESSURE": + memory_icon = "🟠" + elif memory_status == "CRITICAL": + memory_icon = "🔴" + + # Left section - memory status + left_text = Text() + left_text.append("Memory Status: ", style="bold") + status_style = "green" if memory_status == "NORMAL" else "yellow" if memory_status == "PRESSURE" else "red bold" + left_text.append(f"{memory_icon} {memory_status}", style=status_style) + + # Center section - copyright + center_text = Text("© Crawl4AI 2025 | Made by UnclecCode", style="cyan italic") + + # Right section - quit instruction + right_text = Text() + right_text.append("Press ", style="bold") + right_text.append("q", style="white on blue") + right_text.append(" to quit", style="bold") + + # Create columns with the three sections + footer_content = Columns( + [ + Align.left(left_text), + Align.center(center_text), + Align.right(right_text) + ], + expand=True + ) + + # Create a more visible footer panel + return Panel( + footer_content, + border_style="white", + padding=(0, 1) # Add padding for better visibility + ) + + +class CrawlerMonitor: + """ + Comprehensive monitoring and visualization system for tracking web crawler operations in real-time. + Provides a terminal-based dashboard that displays task statuses, memory usage, queue statistics, + and performance metrics. + """ + + def __init__( + self, + urls_total: int = 0, + refresh_rate: float = 1.0, + enable_ui: bool = True, + max_width: int = 120 + ): + """ + Initialize the CrawlerMonitor. + + Args: + urls_total: Total number of URLs to be crawled + refresh_rate: How often to refresh the UI (in seconds) + enable_ui: Whether to display the terminal UI + max_width: Maximum width of the UI in characters + """ + # Core monitoring attributes + self.stats = {} # Task ID -> stats dict + self.memory_status = "NORMAL" + self.start_time = None + self.end_time = None + self.is_running = False + self.queue_stats = { + "total_queued": 0, + "highest_wait_time": 0.0, + "avg_wait_time": 0.0 + } + self.urls_total = urls_total + self.urls_completed = 0 + self.peak_memory_percent = 0.0 + self.peak_memory_time = 0.0 + + # Status counts + self.status_counts = { + CrawlStatus.QUEUED.name: 0, + CrawlStatus.IN_PROGRESS.name: 0, + CrawlStatus.COMPLETED.name: 0, + CrawlStatus.FAILED.name: 0 + } + + # Requeue tracking + self.requeued_count = 0 + + # Thread-safety + self._lock = threading.RLock() + + # Terminal UI + self.enable_ui = enable_ui + self.terminal_ui = TerminalUI( + refresh_rate=refresh_rate, + max_width=max_width + ) if enable_ui else None + + def start(self): + """ + Start the monitoring session. + + - Initializes the start_time + - Sets is_running to True + - Starts the terminal UI if enabled + """ + with self._lock: + self.start_time = time.time() + self.is_running = True + + # Start the terminal UI + if self.enable_ui and self.terminal_ui: + self.terminal_ui.start(self) + + def stop(self): + """ + Stop the monitoring session. + + - Records end_time + - Sets is_running to False + - Stops the terminal UI + - Generates final summary statistics + """ + with self._lock: + self.end_time = time.time() + self.is_running = False + + # Stop the terminal UI + if self.enable_ui and self.terminal_ui: + self.terminal_ui.stop() + + def add_task(self, task_id: str, url: str): + """ + Register a new task with the monitor. + + Args: + task_id: Unique identifier for the task + url: URL being crawled + + The task is initialized with: + - status: QUEUED + - url: The URL to crawl + - enqueue_time: Current time + - memory_usage: 0 + - peak_memory: 0 + - wait_time: 0 + - retry_count: 0 + """ + with self._lock: + self.stats[task_id] = { + "task_id": task_id, + "url": url, + "status": CrawlStatus.QUEUED.name, + "enqueue_time": time.time(), + "start_time": None, + "end_time": None, + "memory_usage": 0.0, + "peak_memory": 0.0, + "error_message": "", + "wait_time": 0.0, + "retry_count": 0, + "duration": "0:00", + "counted_requeue": False + } + + # Update status counts + self.status_counts[CrawlStatus.QUEUED.name] += 1 + + def update_task( + self, + task_id: str, + status: Optional[CrawlStatus] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + memory_usage: Optional[float] = None, + peak_memory: Optional[float] = None, + error_message: Optional[str] = None, + retry_count: Optional[int] = None, + wait_time: Optional[float] = None + ): + """ + Update statistics for a specific task. + + Args: + task_id: Unique identifier for the task + status: New status (QUEUED, IN_PROGRESS, COMPLETED, FAILED) + start_time: When task execution started + end_time: When task execution ended + memory_usage: Current memory usage in MB + peak_memory: Maximum memory usage in MB + error_message: Error description if failed + retry_count: Number of retry attempts + wait_time: Time spent in queue + + Updates task statistics and updates status counts. + If status changes, decrements old status count and + increments new status count. + """ + with self._lock: + # Check if task exists + if task_id not in self.stats: + return + + task_stats = self.stats[task_id] + + # Update status counts if status is changing + old_status = task_stats["status"] + if status and status.name != old_status: + self.status_counts[old_status] -= 1 + self.status_counts[status.name] += 1 + + # Track completion + if status == CrawlStatus.COMPLETED: + self.urls_completed += 1 + + # Track requeues + if old_status in [CrawlStatus.COMPLETED.name, CrawlStatus.FAILED.name] and not task_stats.get("counted_requeue", False): + self.requeued_count += 1 + task_stats["counted_requeue"] = True + + # Update task statistics + if status: + task_stats["status"] = status.name + if start_time is not None: + task_stats["start_time"] = start_time + if end_time is not None: + task_stats["end_time"] = end_time + if memory_usage is not None: + task_stats["memory_usage"] = memory_usage + + # Update peak memory if necessary + current_percent = (memory_usage / psutil.virtual_memory().total) * 100 + if current_percent > self.peak_memory_percent: + self.peak_memory_percent = current_percent + self.peak_memory_time = time.time() + + if peak_memory is not None: + task_stats["peak_memory"] = peak_memory + if error_message is not None: + task_stats["error_message"] = error_message + if retry_count is not None: + task_stats["retry_count"] = retry_count + if wait_time is not None: + task_stats["wait_time"] = wait_time + + # Calculate duration + if task_stats["start_time"]: + end = task_stats["end_time"] or time.time() + duration = end - task_stats["start_time"] + task_stats["duration"] = self._format_time(duration) + + def update_memory_status(self, status: str): + """ + Update the current memory status. + + Args: + status: Memory status (NORMAL, PRESSURE, CRITICAL, or custom) + + Also updates the UI to reflect the new status. + """ + with self._lock: + self.memory_status = status + + def update_queue_statistics( + self, + total_queued: int, + highest_wait_time: float, + avg_wait_time: float + ): + """ + Update statistics related to the task queue. + + Args: + total_queued: Number of tasks currently in queue + highest_wait_time: Longest wait time of any queued task + avg_wait_time: Average wait time across all queued tasks + """ + with self._lock: + self.queue_stats = { + "total_queued": total_queued, + "highest_wait_time": highest_wait_time, + "avg_wait_time": avg_wait_time + } + + def get_task_stats(self, task_id: str) -> Dict: + """ + Get statistics for a specific task. + + Args: + task_id: Unique identifier for the task + + Returns: + Dictionary containing all task statistics + """ + with self._lock: + return self.stats.get(task_id, {}).copy() + + def get_all_task_stats(self) -> Dict[str, Dict]: + """ + Get statistics for all tasks. + + Returns: + Dictionary mapping task_ids to their statistics + """ + with self._lock: + return self.stats.copy() + + def get_memory_status(self) -> str: + """ + Get the current memory status. + + Returns: + Current memory status string + """ + with self._lock: + return self.memory_status + + def get_queue_stats(self) -> Dict: + """ + Get current queue statistics. + + Returns: + Dictionary with queue statistics including: + - total_queued: Number of tasks in queue + - highest_wait_time: Longest wait time + - avg_wait_time: Average wait time + """ + with self._lock: + return self.queue_stats.copy() + + def get_summary(self) -> Dict: + """ + Get a summary of all crawler statistics. + + Returns: + Dictionary containing: + - runtime: Total runtime in seconds + - urls_total: Total URLs to process + - urls_completed: Number of completed URLs + - completion_percentage: Percentage complete + - status_counts: Count of tasks in each status + - memory_status: Current memory status + - peak_memory_percent: Highest memory usage + - peak_memory_time: When peak memory occurred + - avg_task_duration: Average task processing time + - estimated_completion_time: Projected finish time + - requeue_rate: Percentage of tasks requeued + """ + with self._lock: + # Calculate runtime + current_time = time.time() + runtime = current_time - (self.start_time or current_time) + + # Calculate completion percentage + completion_percentage = 0 + if self.urls_total > 0: + completion_percentage = (self.urls_completed / self.urls_total) * 100 + + # Calculate average task duration for completed tasks + completed_tasks = [ + task for task in self.stats.values() + if task["status"] == CrawlStatus.COMPLETED.name and task.get("start_time") and task.get("end_time") + ] + + avg_task_duration = 0 + if completed_tasks: + total_duration = sum(task["end_time"] - task["start_time"] for task in completed_tasks) + avg_task_duration = total_duration / len(completed_tasks) + + # Calculate requeue rate + requeue_rate = 0 + if len(self.stats) > 0: + requeue_rate = (self.requeued_count / len(self.stats)) * 100 + + # Calculate estimated completion time + estimated_completion_time = "N/A" + if avg_task_duration > 0 and self.urls_total > 0 and self.urls_completed > 0: + remaining_tasks = self.urls_total - self.urls_completed + estimated_seconds = remaining_tasks * avg_task_duration + estimated_completion_time = self._format_time(estimated_seconds) + + return { + "runtime": runtime, + "urls_total": self.urls_total, + "urls_completed": self.urls_completed, + "completion_percentage": completion_percentage, + "status_counts": self.status_counts.copy(), + "memory_status": self.memory_status, + "peak_memory_percent": self.peak_memory_percent, + "peak_memory_time": self.peak_memory_time, + "avg_task_duration": avg_task_duration, + "estimated_completion_time": estimated_completion_time, + "requeue_rate": requeue_rate, + "requeued_count": self.requeued_count + } + + def render(self): + """ + Render the terminal UI. + + This is the main UI rendering loop that: + 1. Updates all statistics + 2. Formats the display + 3. Renders the ASCII interface + 4. Handles keyboard input + + Note: The actual rendering is handled by the TerminalUI class + which uses the rich library's Live display. + """ + if self.enable_ui and self.terminal_ui: + # Force an update of the UI + if hasattr(self.terminal_ui, '_update_display'): + self.terminal_ui._update_display() + + def _format_time(self, seconds: float) -> str: + """ + Format time in hours:minutes:seconds. + + Args: + seconds: Time in seconds + + Returns: + Formatted time string (e.g., "1:23:45") + """ + delta = timedelta(seconds=int(seconds)) + hours, remainder = divmod(delta.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + if hours > 0: + return f"{hours}:{minutes:02}:{seconds:02}" + else: + return f"{minutes}:{seconds:02}" + + def _calculate_estimated_completion(self) -> str: + """ + Calculate estimated completion time based on current progress. + + Returns: + Formatted time string + """ + summary = self.get_summary() + return summary.get("estimated_completion_time", "N/A") + + +# Example code for testing +if __name__ == "__main__": + # Initialize the monitor + monitor = CrawlerMonitor(urls_total=100) + + # Start monitoring + monitor.start() + + try: + # Simulate some tasks + for i in range(20): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + monitor.add_task(task_id, url) + + # Simulate 20% of tasks are already running + if i < 4: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time() - 30, # Started 30 seconds ago + memory_usage=10.5 + ) + + # Simulate 10% of tasks are completed + if i >= 4 and i < 6: + start_time = time.time() - 60 + end_time = time.time() - 15 + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + memory_usage=8.2 + ) + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=end_time, + memory_usage=0, + peak_memory=15.7 + ) + + # Simulate 5% of tasks fail + if i >= 6 and i < 7: + start_time = time.time() - 45 + end_time = time.time() - 20 + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + memory_usage=12.3 + ) + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=end_time, + memory_usage=0, + peak_memory=18.2, + error_message="Connection timeout" + ) + + # Simulate memory pressure + monitor.update_memory_status("PRESSURE") + + # Simulate queue statistics + monitor.update_queue_statistics( + total_queued=16, # 20 - 4 (in progress) + highest_wait_time=120.5, + avg_wait_time=60.2 + ) + + # Keep the monitor running for a demonstration + print("Crawler Monitor is running. Press 'q' to exit.") + while monitor.is_running: + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nExiting crawler monitor...") + finally: + # Stop the monitor + monitor.stop() + print("Crawler monitor exited successfully.") \ No newline at end of file diff --git a/crawl4ai/models.py b/crawl4ai/models.py index 474e679e..a904e385 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -28,6 +28,12 @@ class CrawlerTaskResult: start_time: Union[datetime, float] end_time: Union[datetime, float] error_message: str = "" + retry_count: int = 0 + wait_time: float = 0.0 + + @property + def success(self) -> bool: + return self.result.success class CrawlStatus(Enum): @@ -67,6 +73,9 @@ class CrawlStats: memory_usage: float = 0.0 peak_memory: float = 0.0 error_message: str = "" + wait_time: float = 0.0 + retry_count: int = 0 + counted_requeue: bool = False @property def duration(self) -> str: @@ -87,6 +96,7 @@ class CrawlStats: duration = end - start return str(timedelta(seconds=int(duration.total_seconds()))) + class DisplayMode(Enum): DETAILED = "DETAILED" AGGREGATED = "AGGREGATED" diff --git a/docs/examples/crawler_monitor_example.py b/docs/examples/crawler_monitor_example.py new file mode 100644 index 00000000..85d80ae6 --- /dev/null +++ b/docs/examples/crawler_monitor_example.py @@ -0,0 +1,209 @@ +""" +CrawlerMonitor Example + +This example demonstrates how to use the CrawlerMonitor component +to visualize and track web crawler operations in real-time. +""" + +import time +import uuid +import random +import threading +from crawl4ai.components.crawler_monitor import CrawlerMonitor +from crawl4ai.models import CrawlStatus + +def simulate_webcrawler_operations(monitor, num_tasks=20): + """ + Simulates a web crawler's operations with multiple tasks and different states. + + Args: + monitor: The CrawlerMonitor instance + num_tasks: Number of tasks to simulate + """ + print(f"Starting simulation with {num_tasks} tasks...") + + # Create and register all tasks first + task_ids = [] + for i in range(num_tasks): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + monitor.add_task(task_id, url) + task_ids.append((task_id, url)) + + # Small delay between task creation + time.sleep(0.2) + + # Process tasks with a variety of different behaviors + threads = [] + for i, (task_id, url) in enumerate(task_ids): + # Create a thread for each task + thread = threading.Thread( + target=process_task, + args=(monitor, task_id, url, i) + ) + thread.daemon = True + threads.append(thread) + + # Start threads in batches to simulate concurrent processing + batch_size = 4 # Process 4 tasks at a time + for i in range(0, len(threads), batch_size): + batch = threads[i:i+batch_size] + for thread in batch: + thread.start() + time.sleep(0.5) # Stagger thread start times + + # Wait a bit before starting next batch + time.sleep(random.uniform(1.0, 3.0)) + + # Update queue statistics + update_queue_stats(monitor) + + # Simulate memory pressure changes + active_threads = [t for t in threads if t.is_alive()] + if len(active_threads) > 8: + monitor.update_memory_status("CRITICAL") + elif len(active_threads) > 4: + monitor.update_memory_status("PRESSURE") + else: + monitor.update_memory_status("NORMAL") + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Final updates + update_queue_stats(monitor) + monitor.update_memory_status("NORMAL") + + print("Simulation completed!") + +def process_task(monitor, task_id, url, index): + """Simulate processing of a single task.""" + # Tasks start in queued state (already added) + + # Simulate waiting in queue + wait_time = random.uniform(0.5, 3.0) + time.sleep(wait_time) + + # Start processing - move to IN_PROGRESS + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time(), + wait_time=wait_time + ) + + # Simulate task processing with memory usage changes + total_process_time = random.uniform(2.0, 10.0) + step_time = total_process_time / 5 # Update in 5 steps + + for step in range(5): + # Simulate increasing then decreasing memory usage + if step < 3: # First 3 steps - increasing + memory_usage = random.uniform(5.0, 20.0) * (step + 1) + else: # Last 2 steps - decreasing + memory_usage = random.uniform(5.0, 20.0) * (5 - step) + + # Update peak memory if this is higher + peak = max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0)) + + monitor.update_task( + task_id=task_id, + memory_usage=memory_usage, + peak_memory=peak + ) + + time.sleep(step_time) + + # Determine final state - 80% success, 20% failure + if index % 5 == 0: # Every 5th task fails + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=time.time(), + memory_usage=0.0, + error_message="Connection timeout" + ) + else: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=time.time(), + memory_usage=0.0 + ) + +def update_queue_stats(monitor): + """Update queue statistics based on current tasks.""" + task_stats = monitor.get_all_task_stats() + + # Count queued tasks + queued_tasks = [ + stats for stats in task_stats.values() + if stats["status"] == CrawlStatus.QUEUED.name + ] + + total_queued = len(queued_tasks) + + if total_queued > 0: + current_time = time.time() + # Calculate wait times + wait_times = [ + current_time - stats.get("enqueue_time", current_time) + for stats in queued_tasks + ] + highest_wait_time = max(wait_times) if wait_times else 0.0 + avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0 + else: + highest_wait_time = 0.0 + avg_wait_time = 0.0 + + # Update monitor + monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + +def main(): + # Initialize the monitor + monitor = CrawlerMonitor( + urls_total=20, # Total URLs to process + refresh_rate=0.5, # Update UI twice per second + enable_ui=True, # Enable terminal UI + max_width=120 # Set maximum width to 120 characters + ) + + # Start the monitor + monitor.start() + + try: + # Run simulation + simulate_webcrawler_operations(monitor) + + # Keep monitor running a bit to see final state + print("Waiting to view final state...") + time.sleep(5) + + except KeyboardInterrupt: + print("\nExample interrupted by user") + finally: + # Stop the monitor + monitor.stop() + print("Example completed!") + + # Print some statistics + summary = monitor.get_summary() + print("\nCrawler Statistics Summary:") + print(f"Total URLs: {summary['urls_total']}") + print(f"Completed: {summary['urls_completed']}") + print(f"Completion percentage: {summary['completion_percentage']:.1f}%") + print(f"Peak memory usage: {summary['peak_memory_percent']:.1f}%") + + # Print task status counts + status_counts = summary['status_counts'] + print("\nTask Status Counts:") + for status, count in status_counts.items(): + print(f" {status}: {count}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/memory/test_crawler_monitor.py b/tests/memory/test_crawler_monitor.py new file mode 100644 index 00000000..89cc08b8 --- /dev/null +++ b/tests/memory/test_crawler_monitor.py @@ -0,0 +1,168 @@ +""" +Test script for the CrawlerMonitor component. +This script simulates a crawler with multiple tasks to demonstrate the real-time monitoring capabilities. +""" + +import time +import uuid +import random +import threading +import sys +import os + +# Add the parent directory to the path to import crawl4ai +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from crawl4ai.components.crawler_monitor import CrawlerMonitor +from crawl4ai.models import CrawlStatus + +def simulate_crawler_task(monitor, task_id, url, simulate_failure=False): + """Simulate a crawler task with different states.""" + # Task starts in the QUEUED state + wait_time = random.uniform(0.5, 3.0) + time.sleep(wait_time) + + # Update to IN_PROGRESS state + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time(), + wait_time=wait_time + ) + + # Simulate task running + process_time = random.uniform(1.0, 5.0) + for i in range(int(process_time * 2)): + # Simulate memory usage changes + memory_usage = random.uniform(5.0, 25.0) + monitor.update_task( + task_id=task_id, + memory_usage=memory_usage, + peak_memory=max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0)) + ) + time.sleep(0.5) + + # Update to COMPLETED or FAILED state + if simulate_failure and random.random() < 0.8: # 80% chance of failure if simulate_failure is True + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=time.time(), + error_message="Simulated failure: Connection timeout", + memory_usage=0.0 + ) + else: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=time.time(), + memory_usage=0.0 + ) + +def update_queue_stats(monitor, num_queued_tasks): + """Update queue statistics periodically.""" + while monitor.is_running: + queued_tasks = [ + task for task_id, task in monitor.get_all_task_stats().items() + if task["status"] == CrawlStatus.QUEUED.name + ] + + total_queued = len(queued_tasks) + + if total_queued > 0: + current_time = time.time() + wait_times = [ + current_time - task.get("enqueue_time", current_time) + for task in queued_tasks + ] + highest_wait_time = max(wait_times) if wait_times else 0.0 + avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0 + else: + highest_wait_time = 0.0 + avg_wait_time = 0.0 + + monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + + # Simulate memory pressure based on number of active tasks + active_tasks = len([ + task for task_id, task in monitor.get_all_task_stats().items() + if task["status"] == CrawlStatus.IN_PROGRESS.name + ]) + + if active_tasks > 8: + monitor.update_memory_status("CRITICAL") + elif active_tasks > 4: + monitor.update_memory_status("PRESSURE") + else: + monitor.update_memory_status("NORMAL") + + time.sleep(1.0) + +def test_crawler_monitor(): + """Test the CrawlerMonitor with simulated crawler tasks.""" + # Total number of URLs to crawl + total_urls = 50 + + # Initialize the monitor + monitor = CrawlerMonitor(urls_total=total_urls, refresh_rate=0.5) + + # Start the monitor + monitor.start() + + # Start thread to update queue statistics + queue_stats_thread = threading.Thread(target=update_queue_stats, args=(monitor, total_urls)) + queue_stats_thread.daemon = True + queue_stats_thread.start() + + try: + # Create task threads + threads = [] + for i in range(total_urls): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + + # Add task to monitor + monitor.add_task(task_id, url) + + # Determine if this task should simulate failure + simulate_failure = (i % 10 == 0) # Every 10th task + + # Create and start thread for this task + thread = threading.Thread( + target=simulate_crawler_task, + args=(monitor, task_id, url, simulate_failure) + ) + thread.daemon = True + threads.append(thread) + + # Start threads with delay to simulate tasks being added over time + batch_size = 5 + for i in range(0, len(threads), batch_size): + batch = threads[i:i+batch_size] + for thread in batch: + thread.start() + time.sleep(0.5) # Small delay between starting threads + + # Wait a bit before starting the next batch + time.sleep(2.0) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Keep monitor running a bit longer to see the final state + time.sleep(5.0) + + except KeyboardInterrupt: + print("\nTest interrupted by user") + finally: + # Stop the monitor + monitor.stop() + print("\nCrawler monitor test completed") + +if __name__ == "__main__": + test_crawler_monitor() \ No newline at end of file diff --git a/tests/memory/test_dispatcher_stress.py b/tests/memory/test_dispatcher_stress.py new file mode 100644 index 00000000..f81f78f6 --- /dev/null +++ b/tests/memory/test_dispatcher_stress.py @@ -0,0 +1,410 @@ +import asyncio +import time +import psutil +import logging +import random +from typing import List, Dict +import uuid +import sys +import os + +# Import your crawler components +from crawl4ai.models import DisplayMode, CrawlStatus, CrawlResult +from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig, CacheMode +from crawl4ai import AsyncWebCrawler +from crawl4ai import MemoryAdaptiveDispatcher, CrawlerMonitor + +# Global configuration +STREAM = False # Toggle between streaming and non-streaming modes + +# Configure logging to file only (to avoid breaking the rich display) +os.makedirs("logs", exist_ok=True) +file_handler = logging.FileHandler("logs/memory_stress_test.log") +file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) + +# Root logger - only to file, not console +root_logger = logging.getLogger() +root_logger.setLevel(logging.INFO) +root_logger.addHandler(file_handler) + +# Our test logger also writes to file only +logger = logging.getLogger("memory_stress_test") +logger.setLevel(logging.INFO) +logger.addHandler(file_handler) +logger.propagate = False # Don't propagate to root logger + +# Create a memory restrictor to simulate limited memory environment +class MemorySimulator: + def __init__(self, target_percent: float = 85.0, aggressive: bool = False): + """Simulates memory pressure by allocating memory""" + self.target_percent = target_percent + self.memory_blocks: List[bytearray] = [] + self.aggressive = aggressive + + def apply_pressure(self, additional_percent: float = 0.0): + """Fill memory until we reach target percentage""" + current_percent = psutil.virtual_memory().percent + target = self.target_percent + additional_percent + + if current_percent >= target: + return # Already at target + + logger.info(f"Current memory: {current_percent}%, target: {target}%") + + # Calculate how much memory we need to allocate + total_memory = psutil.virtual_memory().total + target_usage = (target / 100.0) * total_memory + current_usage = (current_percent / 100.0) * total_memory + bytes_to_allocate = int(target_usage - current_usage) + + if bytes_to_allocate <= 0: + return + + # Allocate in smaller chunks to avoid overallocation + if self.aggressive: + # Use larger chunks for faster allocation in aggressive mode + chunk_size = min(bytes_to_allocate, 200 * 1024 * 1024) # 200MB chunks + else: + chunk_size = min(bytes_to_allocate, 50 * 1024 * 1024) # 50MB chunks + + try: + logger.info(f"Allocating {chunk_size / (1024 * 1024):.1f}MB to reach target memory usage") + self.memory_blocks.append(bytearray(chunk_size)) + time.sleep(0.5) # Give system time to register the allocation + except MemoryError: + logger.warning("Unable to allocate more memory") + + def release_pressure(self, percent: float = None): + """ + Release allocated memory + If percent is specified, release that percentage of blocks + """ + if not self.memory_blocks: + return + + if percent is None: + # Release all + logger.info(f"Releasing all {len(self.memory_blocks)} memory blocks") + self.memory_blocks.clear() + else: + # Release specified percentage + blocks_to_release = int(len(self.memory_blocks) * (percent / 100.0)) + if blocks_to_release > 0: + logger.info(f"Releasing {blocks_to_release} of {len(self.memory_blocks)} memory blocks ({percent}%)") + self.memory_blocks = self.memory_blocks[blocks_to_release:] + + def spike_pressure(self, duration: float = 5.0): + """ + Create a temporary spike in memory pressure then release + Useful for forcing requeues + """ + logger.info(f"Creating memory pressure spike for {duration} seconds") + # Save current blocks count + initial_blocks = len(self.memory_blocks) + + # Create spike with extra 5% + self.apply_pressure(additional_percent=5.0) + + # Schedule release after duration + asyncio.create_task(self._delayed_release(duration, initial_blocks)) + + async def _delayed_release(self, delay: float, target_blocks: int): + """Helper for spike_pressure - releases extra blocks after delay""" + await asyncio.sleep(delay) + + # Remove blocks added since spike started + if len(self.memory_blocks) > target_blocks: + logger.info(f"Releasing memory spike ({len(self.memory_blocks) - target_blocks} blocks)") + self.memory_blocks = self.memory_blocks[:target_blocks] + +# Test statistics collector +class TestResults: + def __init__(self): + self.start_time = time.time() + self.completed_urls: List[str] = [] + self.failed_urls: List[str] = [] + self.requeued_count = 0 + self.memory_warnings = 0 + self.max_memory_usage = 0.0 + self.max_queue_size = 0 + self.max_wait_time = 0.0 + self.url_to_attempt: Dict[str, int] = {} # Track retries per URL + + def log_summary(self): + duration = time.time() - self.start_time + logger.info("===== TEST SUMMARY =====") + logger.info(f"Stream mode: {'ON' if STREAM else 'OFF'}") + logger.info(f"Total duration: {duration:.1f} seconds") + logger.info(f"Completed URLs: {len(self.completed_urls)}") + logger.info(f"Failed URLs: {len(self.failed_urls)}") + logger.info(f"Requeue events: {self.requeued_count}") + logger.info(f"Memory warnings: {self.memory_warnings}") + logger.info(f"Max memory usage: {self.max_memory_usage:.1f}%") + logger.info(f"Max queue size: {self.max_queue_size}") + logger.info(f"Max wait time: {self.max_wait_time:.1f} seconds") + + # Log URLs with multiple attempts + retried_urls = {url: count for url, count in self.url_to_attempt.items() if count > 1} + if retried_urls: + logger.info(f"URLs with retries: {len(retried_urls)}") + # Log the top 5 most retried + top_retries = sorted(retried_urls.items(), key=lambda x: x[1], reverse=True)[:5] + for url, count in top_retries: + logger.info(f" URL {url[-30:]} had {count} attempts") + + # Write summary to a separate human-readable file + with open("logs/test_summary.txt", "w") as f: + f.write(f"Stream mode: {'ON' if STREAM else 'OFF'}\n") + f.write(f"Total duration: {duration:.1f} seconds\n") + f.write(f"Completed URLs: {len(self.completed_urls)}\n") + f.write(f"Failed URLs: {len(self.failed_urls)}\n") + f.write(f"Requeue events: {self.requeued_count}\n") + f.write(f"Memory warnings: {self.memory_warnings}\n") + f.write(f"Max memory usage: {self.max_memory_usage:.1f}%\n") + f.write(f"Max queue size: {self.max_queue_size}\n") + f.write(f"Max wait time: {self.max_wait_time:.1f} seconds\n") + +# Custom monitor with stats tracking +# Custom monitor that extends CrawlerMonitor with test-specific tracking +class StressTestMonitor(CrawlerMonitor): + def __init__(self, test_results: TestResults, **kwargs): + # Initialize the parent CrawlerMonitor + super().__init__(**kwargs) + self.test_results = test_results + + def update_memory_status(self, status: str): + if status != self.memory_status: + logger.info(f"Memory status changed: {self.memory_status} -> {status}") + if "CRITICAL" in status or "PRESSURE" in status: + self.test_results.memory_warnings += 1 + + # Track peak memory usage in test results + current_memory = psutil.virtual_memory().percent + self.test_results.max_memory_usage = max(self.test_results.max_memory_usage, current_memory) + + # Call parent method to update the dashboard + super().update_memory_status(status) + + def update_queue_statistics(self, total_queued: int, highest_wait_time: float, avg_wait_time: float): + # Track queue metrics in test results + self.test_results.max_queue_size = max(self.test_results.max_queue_size, total_queued) + self.test_results.max_wait_time = max(self.test_results.max_wait_time, highest_wait_time) + + # Call parent method to update the dashboard + super().update_queue_statistics(total_queued, highest_wait_time, avg_wait_time) + + def update_task(self, task_id: str, **kwargs): + # Track URL status changes for test results + if task_id in self.stats: + old_status = self.stats[task_id].status + + # If this is a requeue event (requeued due to memory pressure) + if 'error_message' in kwargs and 'requeued' in kwargs['error_message']: + if not hasattr(self.stats[task_id], 'counted_requeue') or not self.stats[task_id].counted_requeue: + self.test_results.requeued_count += 1 + self.stats[task_id].counted_requeue = True + + # Track completion status for test results + if 'status' in kwargs: + new_status = kwargs['status'] + if old_status != new_status: + if new_status == CrawlStatus.COMPLETED: + if task_id not in self.test_results.completed_urls: + self.test_results.completed_urls.append(task_id) + elif new_status == CrawlStatus.FAILED: + if task_id not in self.test_results.failed_urls: + self.test_results.failed_urls.append(task_id) + + # Call parent method to update the dashboard + super().update_task(task_id, **kwargs) + self.live.update(self._create_table()) + +# Generate test URLs - use example.com with unique paths to avoid browser caching +def generate_test_urls(count: int) -> List[str]: + urls = [] + for i in range(count): + # Add random path and query parameters to create unique URLs + path = f"/path/{uuid.uuid4()}" + query = f"?test={i}&random={random.randint(1, 100000)}" + urls.append(f"https://example.com{path}{query}") + return urls + +# Process result callback +async def process_result(result, test_results: TestResults): + # Track attempt counts + if result.url not in test_results.url_to_attempt: + test_results.url_to_attempt[result.url] = 1 + else: + test_results.url_to_attempt[result.url] += 1 + + if "requeued" in result.error_message: + test_results.requeued_count += 1 + logger.debug(f"Requeued due to memory pressure: {result.url}") + elif result.success: + test_results.completed_urls.append(result.url) + logger.debug(f"Successfully processed: {result.url}") + else: + test_results.failed_urls.append(result.url) + logger.warning(f"Failed to process: {result.url} - {result.error_message}") + +# Process multiple results (used in non-streaming mode) +async def process_results(results, test_results: TestResults): + for result in results: + await process_result(result, test_results) + +# Main test function for extreme memory pressure simulation +async def run_memory_stress_test( + url_count: int = 100, + target_memory_percent: float = 92.0, # Push to dangerous levels + chunk_size: int = 20, # Larger chunks for more chaos + aggressive: bool = False, + spikes: bool = True +): + test_results = TestResults() + memory_simulator = MemorySimulator(target_percent=target_memory_percent, aggressive=aggressive) + + logger.info(f"Starting stress test with {url_count} URLs in {'STREAM' if STREAM else 'NON-STREAM'} mode") + logger.info(f"Target memory usage: {target_memory_percent}%") + + # First, elevate memory usage to create pressure + logger.info("Creating initial memory pressure...") + memory_simulator.apply_pressure() + + # Create test URLs in chunks to simulate real-world crawling where URLs are discovered + all_urls = generate_test_urls(url_count) + url_chunks = [all_urls[i:i+chunk_size] for i in range(0, len(all_urls), chunk_size)] + + # Set up the crawler components - low memory thresholds to create more requeues + browser_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + verbose=False, + stream=STREAM # Use the global STREAM variable to set mode + ) + + # Create monitor with reference to test results + monitor = StressTestMonitor( + test_results=test_results, + display_mode=DisplayMode.DETAILED, + max_visible_rows=20, + total_urls=url_count # Pass total URLs count + ) + + # Create dispatcher with EXTREME settings - pure survival mode + # These settings are designed to create a memory battleground + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=63.0, # Start throttling at just 60% memory + critical_threshold_percent=70.0, # Start requeuing at 70% - incredibly aggressive + recovery_threshold_percent=55.0, # Only resume normal ops when plenty of memory available + check_interval=0.1, # Check extremely frequently (100ms) + max_session_permit=20 if aggressive else 10, # Double the concurrent sessions - pure chaos + fairness_timeout=10.0, # Extremely low timeout - rapid priority changes + monitor=monitor + ) + + # Set up spike schedule if enabled + if spikes: + spike_intervals = [] + # Create 3-5 random spike times + num_spikes = random.randint(3, 5) + for _ in range(num_spikes): + # Schedule spikes at random chunks + chunk_index = random.randint(1, len(url_chunks) - 1) + spike_intervals.append(chunk_index) + logger.info(f"Scheduled memory spikes at chunks: {spike_intervals}") + + try: + async with AsyncWebCrawler(config=browser_config) as crawler: + # Process URLs in chunks to simulate discovering URLs over time + for chunk_index, url_chunk in enumerate(url_chunks): + logger.info(f"Processing chunk {chunk_index+1}/{len(url_chunks)} ({len(url_chunk)} URLs)") + + # Regular pressure increases + if chunk_index % 2 == 0: + logger.info("Increasing memory pressure...") + memory_simulator.apply_pressure() + + # Memory spike if scheduled for this chunk + if spikes and chunk_index in spike_intervals: + logger.info(f"⚠️ CREATING MASSIVE MEMORY SPIKE at chunk {chunk_index+1} ⚠️") + # Create a nightmare scenario - multiple overlapping spikes + memory_simulator.spike_pressure(duration=10.0) # 10-second spike + + # 50% chance of double-spike (pure evil) + if random.random() < 0.5: + await asyncio.sleep(2.0) # Wait 2 seconds + logger.info("💀 DOUBLE SPIKE - EXTREME MEMORY PRESSURE 💀") + memory_simulator.spike_pressure(duration=8.0) # 8-second overlapping spike + + if STREAM: + # Stream mode - process results as they come in + async for result in dispatcher.run_urls_stream( + urls=url_chunk, + crawler=crawler, + config=run_config + ): + await process_result(result, test_results) + else: + # Non-stream mode - get all results at once + results = await dispatcher.run_urls( + urls=url_chunk, + crawler=crawler, + config=run_config + ) + await process_results(results, test_results) + + # Simulate discovering more URLs while others are still processing + await asyncio.sleep(1) + + # RARELY release pressure - make the system fight for resources + if chunk_index % 5 == 4: # Less frequent releases + release_percent = random.choice([10, 15, 20]) # Smaller, inconsistent releases + logger.info(f"Releasing {release_percent}% of memory blocks - brief respite") + memory_simulator.release_pressure(percent=release_percent) + + except Exception as e: + logger.error(f"Test error: {str(e)}") + raise + finally: + # Release memory pressure + memory_simulator.release_pressure() + # Log final results + test_results.log_summary() + + # Check for success criteria + if len(test_results.completed_urls) + len(test_results.failed_urls) < url_count: + logger.error(f"TEST FAILED: Not all URLs were processed. {url_count - len(test_results.completed_urls) - len(test_results.failed_urls)} URLs missing.") + return False + + logger.info("TEST PASSED: All URLs were processed without crashing.") + return True + +# Command-line entry point +if __name__ == "__main__": + # Parse command line arguments + url_count = int(sys.argv[1]) if len(sys.argv) > 1 else 100 + target_memory = float(sys.argv[2]) if len(sys.argv) > 2 else 85.0 + + # Check if stream mode is specified + if len(sys.argv) > 3: + STREAM = sys.argv[3].lower() in ('true', 'yes', '1', 'stream') + + # Check if aggressive mode is specified + aggressive = False + if len(sys.argv) > 4: + aggressive = sys.argv[4].lower() in ('true', 'yes', '1', 'aggressive') + + print(f"Starting test with {url_count} URLs, {target_memory}% memory target") + print(f"Stream mode: {STREAM}, Aggressive: {aggressive}") + print("Logs will be written to the logs directory") + print("Live display starting now...") + + # Run the test + result = asyncio.run(run_memory_stress_test( + url_count=url_count, + target_memory_percent=target_memory, + aggressive=aggressive + )) + + # Exit with status code + sys.exit(0 if result else 1) \ No newline at end of file From dc36997a08c9fa5212b766d034bf34a0d84cefd7 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Wed, 12 Mar 2025 22:40:46 +0800 Subject: [PATCH 2/7] feat(schema): improve HTML preprocessing for schema generation Add new preprocess_html_for_schema utility function to better handle HTML cleaning for schema generation. This replaces the previous optimize_html function in the GoogleSearchCrawler and includes smarter attribute handling and pattern detection. Other changes: - Update default provider to gpt-4o - Add DEFAULT_PROVIDER_API_KEY constant - Make LLMConfig creation more flexible with create_llm_config helper - Add new dependencies: zstandard and msgpack This change improves schema generation reliability while reducing noise in the processed HTML. --- crawl4ai/async_configs.py | 3 +- crawl4ai/config.py | 3 +- crawl4ai/crawlers/google_search/crawler.py | 5 +- crawl4ai/extraction_strategy.py | 7 +- crawl4ai/types.py | 8 +- crawl4ai/utils.py | 115 ++++++++++++++++++++- pyproject.toml | 4 +- tests/20241401/test_schema_builder.py | 1 + 8 files changed, 134 insertions(+), 12 deletions(-) diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 937ae4eb..edcb4b4e 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1,6 +1,7 @@ import os from .config import ( DEFAULT_PROVIDER, + DEFAULT_PROVIDER_API_KEY, MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, PROVIDER_MODELS, @@ -1080,7 +1081,7 @@ class LLMConfig: self.api_token = os.getenv(api_token[4:]) else: self.api_token = PROVIDER_MODELS.get(provider, "no-token") or os.getenv( - "OPENAI_API_KEY" + DEFAULT_PROVIDER_API_KEY ) self.base_url = base_url diff --git a/crawl4ai/config.py b/crawl4ai/config.py index 790ba6d0..866c7dc0 100644 --- a/crawl4ai/config.py +++ b/crawl4ai/config.py @@ -4,7 +4,8 @@ from dotenv import load_dotenv load_dotenv() # Load environment variables from .env file # Default provider, ONLY used when the extraction strategy is LLMExtractionStrategy -DEFAULT_PROVIDER = "openai/gpt-4o-mini" +DEFAULT_PROVIDER = "openai/gpt-4o" +DEFAULT_PROVIDER_API_KEY = "OPENAI_API_KEY" MODEL_REPO_BRANCH = "new-release-0.0.2" # Provider-model dictionary, ONLY used when the extraction strategy is LLMExtractionStrategy PROVIDER_MODELS = { diff --git a/crawl4ai/crawlers/google_search/crawler.py b/crawl4ai/crawlers/google_search/crawler.py index cae5f81d..e1288de1 100644 --- a/crawl4ai/crawlers/google_search/crawler.py +++ b/crawl4ai/crawlers/google_search/crawler.py @@ -1,6 +1,6 @@ from crawl4ai import BrowserConfig, AsyncWebCrawler, CrawlerRunConfig, CacheMode from crawl4ai.hub import BaseCrawler -from crawl4ai.utils import optimize_html, get_home_folder +from crawl4ai.utils import optimize_html, get_home_folder, preprocess_html_for_schema from crawl4ai.extraction_strategy import JsonCssExtractionStrategy from pathlib import Path import json @@ -68,7 +68,8 @@ class GoogleSearchCrawler(BaseCrawler): home_dir = get_home_folder() if not schema_cache_path else schema_cache_path os.makedirs(f"{home_dir}/schema", exist_ok=True) - cleaned_html = optimize_html(html, threshold=100) + # cleaned_html = optimize_html(html, threshold=100) + cleaned_html = preprocess_html_for_schema(html) organic_schema = None if os.path.exists(f"{home_dir}/schema/organic_schema.json"): diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index 97512bf3..0e0300fb 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -34,7 +34,7 @@ from .model_loader import ( calculate_batch_size ) -from .types import LLMConfig +from .types import LLMConfig, create_llm_config from functools import partial import numpy as np @@ -757,8 +757,6 @@ class LLMExtractionStrategy(ExtractionStrategy): ####################################################### # New extraction strategies for JSON-based extraction # ####################################################### - - class JsonElementExtractionStrategy(ExtractionStrategy): """ Abstract base class for extracting structured JSON from HTML content. @@ -1049,7 +1047,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy): schema_type: str = "CSS", # or XPATH query: str = None, target_json_example: str = None, - llm_config: 'LLMConfig' = None, + llm_config: 'LLMConfig' = create_llm_config(), provider: str = None, api_token: str = None, **kwargs @@ -1140,7 +1138,6 @@ In this scenario, use your best judgment to generate the schema. Try to maximize except Exception as e: raise Exception(f"Failed to generate schema: {str(e)}") - class JsonCssExtractionStrategy(JsonElementExtractionStrategy): """ Concrete implementation of `JsonElementExtractionStrategy` using CSS selectors. diff --git a/crawl4ai/types.py b/crawl4ai/types.py index 2f689e1c..63fd45ba 100644 --- a/crawl4ai/types.py +++ b/crawl4ai/types.py @@ -178,4 +178,10 @@ if TYPE_CHECKING: BestFirstCrawlingStrategy as BestFirstCrawlingStrategyType, DFSDeepCrawlStrategy as DFSDeepCrawlStrategyType, DeepCrawlDecorator as DeepCrawlDecoratorType, - ) \ No newline at end of file + ) + + + +def create_llm_config(*args, **kwargs) -> 'LLMConfigType': + from .async_configs import LLMConfig + return LLMConfig(*args, **kwargs) diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py index 146ce06c..acaf7933 100644 --- a/crawl4ai/utils.py +++ b/crawl4ai/utils.py @@ -26,7 +26,7 @@ import cProfile import pstats from functools import wraps import asyncio - +from lxml import etree, html as lhtml import sqlite3 import hashlib @@ -2617,3 +2617,116 @@ class HeadPeekr: def get_title(head_content: str): title_match = re.search(r'(.*?)', head_content, re.IGNORECASE | re.DOTALL) return title_match.group(1) if title_match else None + +def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_threshold=200, max_size=100000): + """ + Preprocess HTML to reduce size while preserving structure for schema generation. + + Args: + html_content (str): Raw HTML content + text_threshold (int): Maximum length for text nodes before truncation + attr_value_threshold (int): Maximum length for attribute values before truncation + max_size (int): Target maximum size for output HTML + + Returns: + str: Preprocessed HTML content + """ + try: + # Parse HTML with error recovery + parser = etree.HTMLParser(remove_comments=True, remove_blank_text=True) + tree = lhtml.fromstring(html_content, parser=parser) + + # 1. Remove HEAD section (keep only BODY) + head_elements = tree.xpath('//head') + for head in head_elements: + if head.getparent() is not None: + head.getparent().remove(head) + + # 2. Define tags to remove completely + tags_to_remove = [ + 'script', 'style', 'noscript', 'iframe', 'canvas', 'svg', + 'video', 'audio', 'source', 'track', 'map', 'area' + ] + + # Remove unwanted elements + for tag in tags_to_remove: + elements = tree.xpath(f'//{tag}') + for element in elements: + if element.getparent() is not None: + element.getparent().remove(element) + + # 3. Process remaining elements to clean attributes and truncate text + for element in tree.iter(): + # Skip if we're at the root level + if element.getparent() is None: + continue + + # Clean non-essential attributes but preserve structural ones + # attribs_to_keep = {'id', 'class', 'name', 'href', 'src', 'type', 'value', 'data-'} + + # This is more aggressive than the previous version + attribs_to_keep = {'id', 'class', 'name', 'type', 'value'} + + # attributes_hates_truncate = ['id', 'class', "data-"] + + # This means, I don't care, if an attribute is too long, truncate it, go and find a better css selector to build a schema + attributes_hates_truncate = [] + + # Process each attribute + for attrib in list(element.attrib.keys()): + # Keep if it's essential or starts with data- + if not (attrib in attribs_to_keep or attrib.startswith('data-')): + element.attrib.pop(attrib) + # Truncate long attribute values except for selectors + elif attrib not in attributes_hates_truncate and len(element.attrib[attrib]) > attr_value_threshold: + element.attrib[attrib] = element.attrib[attrib][:attr_value_threshold] + '...' + + # Truncate text content if it's too long + if element.text and len(element.text.strip()) > text_threshold: + element.text = element.text.strip()[:text_threshold] + '...' + + # Also truncate tail text if present + if element.tail and len(element.tail.strip()) > text_threshold: + element.tail = element.tail.strip()[:text_threshold] + '...' + + # 4. Find repeated patterns and keep only a few examples + # This is a simplistic approach - more sophisticated pattern detection could be implemented + pattern_elements = {} + for element in tree.xpath('//*[contains(@class, "")]'): + parent = element.getparent() + if parent is None: + continue + + # Create a signature based on tag and classes + classes = element.get('class', '') + if not classes: + continue + signature = f"{element.tag}.{classes}" + + if signature in pattern_elements: + pattern_elements[signature].append(element) + else: + pattern_elements[signature] = [element] + + # Keep only 3 examples of each repeating pattern + for signature, elements in pattern_elements.items(): + if len(elements) > 3: + # Keep the first 2 and last elements + for element in elements[2:-1]: + if element.getparent() is not None: + element.getparent().remove(element) + + # 5. Convert back to string + result = etree.tostring(tree, encoding='unicode', method='html') + + # If still over the size limit, apply more aggressive truncation + if len(result) > max_size: + return result[:max_size] + "..." + + return result + + except Exception as e: + # Fallback for parsing errors + return html_content[:max_size] if len(html_content) > max_size else html_content + + diff --git a/pyproject.toml b/pyproject.toml index b4fb392f..c3f03bfd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,9 @@ dependencies = [ "pyperclip>=1.8.2", "faust-cchardet>=2.1.19", "aiohttp>=3.11.11", - "humanize>=4.10.0" + "humanize>=4.10.0", + "zstandard>=0.23.0", + "msgpack>=1.1.0" ] classifiers = [ "Development Status :: 4 - Beta", diff --git a/tests/20241401/test_schema_builder.py b/tests/20241401/test_schema_builder.py index 431fb001..46d0e240 100644 --- a/tests/20241401/test_schema_builder.py +++ b/tests/20241401/test_schema_builder.py @@ -10,6 +10,7 @@ import asyncio from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, JsonXPathExtractionStrategy +from crawl4ai.utils import preprocess_html_for_schema, JsonXPathExtractionStrategy import json # Test HTML - A complex job board with companies, departments, and positions From b750542e6d2cecf09a7224ee2c14285fff99b265 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Thu, 13 Mar 2025 22:15:15 +0800 Subject: [PATCH 3/7] feat(crawler): optimize single URL handling and add performance comparison Add special handling for single URL requests in Docker API to use arun() instead of arun_many() Add new example script demonstrating performance differences between sequential and parallel crawling Update cache mode from aggressive to bypass in examples and tests Remove unused dependencies (zstandard, msgpack) BREAKING CHANGE: Changed default cache_mode from aggressive to bypass in examples --- deploy/docker/README.md | 2 +- deploy/docker/api.py | 18 ++++-- docs/examples/arun_vs_arun_many.py | 79 +++++++++++++++++++++++++ docs/examples/docker_python_rest_api.py | 2 +- pyproject.toml | 2 - tests/docker/test_server_token.py | 2 +- 6 files changed, 95 insertions(+), 10 deletions(-) create mode 100644 docs/examples/arun_vs_arun_many.py diff --git a/deploy/docker/README.md b/deploy/docker/README.md index c4582031..b4b6e414 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -554,7 +554,7 @@ async def test_stream_crawl(session, token: str): "https://example.com/page3", ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } # headers = {"Authorization": f"Bearer {token}"} # If JWT is enabled, more on this later diff --git a/deploy/docker/api.py b/deploy/docker/api.py index cc103905..4c7e17d2 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -388,11 +388,19 @@ async def handle_crawl_request( ) async with AsyncWebCrawler(config=browser_config) as crawler: - results = await crawler.arun_many( - urls=urls, - config=crawler_config, - dispatcher=dispatcher - ) + results = [] + if len(urls) == 1: + results = await crawler.arun( + url=urls[0], + config=crawler_config, + dispatcher=dispatcher + ) + else: + results = await crawler.arun_many( + urls=urls, + config=crawler_config, + dispatcher=dispatcher + ) return { "success": True, diff --git a/docs/examples/arun_vs_arun_many.py b/docs/examples/arun_vs_arun_many.py new file mode 100644 index 00000000..40bc4381 --- /dev/null +++ b/docs/examples/arun_vs_arun_many.py @@ -0,0 +1,79 @@ +import asyncio +import time +from crawl4ai.async_webcrawler import AsyncWebCrawler, CacheMode +from crawl4ai.async_configs import CrawlerRunConfig +from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher, RateLimiter + +VERBOSE = False + +async def crawl_sequential(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + results = [] + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + for url in urls: + result_container = await crawler.arun(url=url, config=config) + results.append(result_container[0]) + total_time = time.perf_counter() - start_time + return total_time, results + +async def crawl_parallel_dispatcher(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + # Dispatcher with rate limiter enabled (default behavior) + dispatcher = MemoryAdaptiveDispatcher( + rate_limiter=RateLimiter(base_delay=(1.0, 3.0), max_delay=60.0, max_retries=3), + max_session_permit=50, + ) + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher) + results = [] + if isinstance(result_container, list): + results = result_container + else: + async for res in result_container: + results.append(res) + total_time = time.perf_counter() - start_time + return total_time, results + +async def crawl_parallel_no_rate_limit(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + # Dispatcher with no rate limiter and a high session permit to avoid queuing + dispatcher = MemoryAdaptiveDispatcher( + rate_limiter=None, + max_session_permit=len(urls) # allow all URLs concurrently + ) + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher) + results = [] + if isinstance(result_container, list): + results = result_container + else: + async for res in result_container: + results.append(res) + total_time = time.perf_counter() - start_time + return total_time, results + +async def main(): + urls = ["https://example.com"] * 100 + print(f"Crawling {len(urls)} URLs sequentially...") + seq_time, seq_results = await crawl_sequential(urls) + print(f"Sequential crawling took: {seq_time:.2f} seconds\n") + + print(f"Crawling {len(urls)} URLs in parallel using arun_many with dispatcher (with rate limit)...") + disp_time, disp_results = await crawl_parallel_dispatcher(urls) + print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds\n") + + print(f"Crawling {len(urls)} URLs in parallel using dispatcher with no rate limiter...") + no_rl_time, no_rl_results = await crawl_parallel_no_rate_limit(urls) + print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds\n") + + print("Crawl4ai - Crawling Comparison") + print("--------------------------------------------------------") + print(f"Sequential crawling took: {seq_time:.2f} seconds") + print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds") + print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/examples/docker_python_rest_api.py b/docs/examples/docker_python_rest_api.py index 000d6464..6650f8d5 100644 --- a/docs/examples/docker_python_rest_api.py +++ b/docs/examples/docker_python_rest_api.py @@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str): # "https://news.ycombinator.com/news" ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } headers = {"Authorization": f"Bearer {token}"} print(f"\nTesting Streaming Crawl: {url}") diff --git a/pyproject.toml b/pyproject.toml index c3f03bfd..ad07548d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,8 +43,6 @@ dependencies = [ "faust-cchardet>=2.1.19", "aiohttp>=3.11.11", "humanize>=4.10.0", - "zstandard>=0.23.0", - "msgpack>=1.1.0" ] classifiers = [ "Development Status :: 4 - Beta", diff --git a/tests/docker/test_server_token.py b/tests/docker/test_server_token.py index d8c7df89..220b6ca2 100644 --- a/tests/docker/test_server_token.py +++ b/tests/docker/test_server_token.py @@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str): # "https://news.ycombinator.com/news" ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } headers = {"Authorization": f"Bearer {token}"} print(f"\nTesting Streaming Crawl: {url}") From 6e3c0483286b2902ab814630ab199449249176f6 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Thu, 13 Mar 2025 22:30:38 +0800 Subject: [PATCH 4/7] feat(api): refactor crawl request handling to streamline single and multiple URL processing --- deploy/docker/api.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 4c7e17d2..305e8a31 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -2,6 +2,7 @@ import os import json import asyncio from typing import List, Tuple +from functools import partial import logging from typing import Optional, AsyncGenerator @@ -389,19 +390,9 @@ async def handle_crawl_request( async with AsyncWebCrawler(config=browser_config) as crawler: results = [] - if len(urls) == 1: - results = await crawler.arun( - url=urls[0], - config=crawler_config, - dispatcher=dispatcher - ) - else: - results = await crawler.arun_many( - urls=urls, - config=crawler_config, - dispatcher=dispatcher - ) - + func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many") + partial_func = partial(func, urls[0] if len(urls) == 1 else urls, config=crawler_config, dispatcher=dispatcher) + results = await partial_func() return { "success": True, "results": [result.model_dump() for result in results] From 7884a98be7fe891963e9c82525b25660ff86c26e Mon Sep 17 00:00:00 2001 From: UncleCode Date: Fri, 14 Mar 2025 14:39:24 +0800 Subject: [PATCH 5/7] feat(crawler): add experimental parameters support and optimize browser handling Add experimental parameters dictionary to CrawlerRunConfig to support beta features Make CSP nonce headers optional via experimental config Remove default cookie injection Clean up browser context creation code Improve code formatting in API handler BREAKING CHANGE: Default cookie injection has been removed from page initialization --- crawl4ai/async_configs.py | 14 ++++++++++++++ crawl4ai/async_crawler_strategy.py | 23 +++++++++++++---------- crawl4ai/browser_manager.py | 14 +------------- deploy/docker/api.py | 5 ++++- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index edcb4b4e..0e39b551 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -650,6 +650,12 @@ class CrawlerRunConfig(): user_agent_generator_config (dict or None): Configuration for user agent generation if user_agent_mode is set. Default: None. + # Experimental Parameters + experimental (dict): Dictionary containing experimental parameters that are in beta phase. + This allows passing temporary features that are not yet fully integrated + into the main parameter set. + Default: None. + url: str = None # This is not a compulsory parameter """ @@ -732,6 +738,8 @@ class CrawlerRunConfig(): user_agent_generator_config: dict = {}, # Deep Crawl Parameters deep_crawl_strategy: Optional[DeepCrawlStrategy] = None, + # Experimental Parameters + experimental: Dict[str, Any] = None, ): # TODO: Planning to set properties dynamically based on the __init__ signature self.url = url @@ -845,6 +853,9 @@ class CrawlerRunConfig(): # Deep Crawl Parameters self.deep_crawl_strategy = deep_crawl_strategy + + # Experimental Parameters + self.experimental = experimental or {} def __getattr__(self, name): @@ -953,6 +964,8 @@ class CrawlerRunConfig(): # Deep Crawl Parameters deep_crawl_strategy=kwargs.get("deep_crawl_strategy"), url=kwargs.get("url"), + # Experimental Parameters + experimental=kwargs.get("experimental"), ) # Create a funciton returns dict of the object @@ -1037,6 +1050,7 @@ class CrawlerRunConfig(): "user_agent_generator_config": self.user_agent_generator_config, "deep_crawl_strategy": self.deep_crawl_strategy, "url": self.url, + "experimental": self.experimental, } def clone(self, **kwargs): diff --git a/crawl4ai/async_crawler_strategy.py b/crawl4ai/async_crawler_strategy.py index 960c2d6f..37aa0962 100644 --- a/crawl4ai/async_crawler_strategy.py +++ b/crawl4ai/async_crawler_strategy.py @@ -507,10 +507,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): # Get page for session page, context = await self.browser_manager.get_page(crawlerRunConfig=config) + # await page.goto(URL) + # Add default cookie - await context.add_cookies( - [{"name": "cookiesEnabled", "value": "true", "url": url}] - ) + # await context.add_cookies( + # [{"name": "cookiesEnabled", "value": "true", "url": url}] + # ) # Handle navigator overrides if config.override_navigator or config.simulate_user or config.magic: @@ -562,14 +564,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): try: # Generate a unique nonce for this request - nonce = hashlib.sha256(os.urandom(32)).hexdigest() + if config.experimental.get("use_csp_nonce", False): + nonce = hashlib.sha256(os.urandom(32)).hexdigest() - # Add CSP headers to the request - await page.set_extra_http_headers( - { - "Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'" - } - ) + # Add CSP headers to the request + await page.set_extra_http_headers( + { + "Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'" + } + ) response = await page.goto( url, wait_until=config.wait_until, timeout=config.page_timeout diff --git a/crawl4ai/browser_manager.py b/crawl4ai/browser_manager.py index 38f87d9a..06b36a32 100644 --- a/crawl4ai/browser_manager.py +++ b/crawl4ai/browser_manager.py @@ -443,19 +443,6 @@ class BrowserManager: self.default_context = contexts[0] else: self.default_context = await self.create_browser_context() - # self.default_context = await self.browser.new_context( - # viewport={ - # "width": self.config.viewport_width, - # "height": self.config.viewport_height, - # }, - # storage_state=self.config.storage_state, - # user_agent=self.config.headers.get( - # "User-Agent", self.config.user_agent - # ), - # accept_downloads=self.config.accept_downloads, - # ignore_https_errors=self.config.ignore_https_errors, - # java_script_enabled=self.config.java_script_enabled, - # ) await self.setup_context(self.default_context) else: browser_args = self._build_browser_args() @@ -470,6 +457,7 @@ class BrowserManager: self.default_context = self.browser + def _build_browser_args(self) -> dict: """Build browser launch arguments from config.""" args = [ diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 305e8a31..33802772 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -391,7 +391,10 @@ async def handle_crawl_request( async with AsyncWebCrawler(config=browser_config) as crawler: results = [] func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many") - partial_func = partial(func, urls[0] if len(urls) == 1 else urls, config=crawler_config, dispatcher=dispatcher) + partial_func = partial(func, + urls[0] if len(urls) == 1 else urls, + config=crawler_config, + dispatcher=dispatcher) results = await partial_func() return { "success": True, From a31d7b86bebac9a8935671945b011ba2f734e68b Mon Sep 17 00:00:00 2001 From: UncleCode Date: Fri, 14 Mar 2025 15:26:37 +0800 Subject: [PATCH 6/7] feat(changelog): update CHANGELOG for version 0.5.0.post5 with new features, changes, fixes, and breaking changes --- CHANGELOG.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96b1eb0f..61161f92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,39 @@ All notable changes to Crawl4AI will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Version 0.5.0.post5 (2025-03-14) + +### Added + +- *(crawler)* Add experimental parameters dictionary to CrawlerRunConfig to support beta features +- *(tables)* Add comprehensive table detection and extraction functionality with scoring system +- *(monitor)* Add real-time crawler monitoring system with memory management +- *(content)* Add target_elements parameter for selective content extraction +- *(browser)* Add standalone CDP browser launch capability +- *(schema)* Add preprocess_html_for_schema utility for better HTML cleaning +- *(api)* Add special handling for single URL requests in Docker API + +### Changed + +- *(filters)* Add reverse option to URLPatternFilter for inverting filter logic +- *(browser)* Make CSP nonce headers optional via experimental config +- *(browser)* Remove default cookie injection from page initialization +- *(crawler)* Optimize response handling for single-URL processing +- *(api)* Refactor crawl request handling to streamline processing +- *(config)* Update default provider to gpt-4o +- *(cache)* Change default cache_mode from aggressive to bypass in examples + +### Fixed + +- *(browser)* Clean up browser context creation code +- *(api)* Improve code formatting in API handler + +### Breaking Changes + +- WebScrapingStrategy no longer returns 'scraped_html' in its output dictionary +- Table extraction logic has been modified to better handle thead/tbody structures +- Default cookie injection has been removed from page initialization + ## Version 0.5.0 (2025-03-02) ### Added From a24799918c7c765af0f4b712b8337d1fa1e16f3a Mon Sep 17 00:00:00 2001 From: UncleCode Date: Fri, 14 Mar 2025 21:36:23 +0800 Subject: [PATCH 7/7] feat(llm): add additional LLM configuration parameters Extend LLMConfig class to support more fine-grained control over LLM behavior by adding: - temperature control - max tokens limit - top_p sampling - frequency and presence penalties - stop sequences - number of completions These parameters allow for better customization of LLM responses. --- crawl4ai/async_configs.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 0e39b551..fc4c059c 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1086,6 +1086,13 @@ class LLMConfig: provider: str = DEFAULT_PROVIDER, api_token: Optional[str] = None, base_url: Optional[str] = None, + temprature: Optional[float] = None, + max_tokens: Optional[int] = None, + top_p: Optional[float] = None, + frequency_penalty: Optional[float] = None, + presence_penalty: Optional[float] = None, + stop: Optional[List[str]] = None, + n: Optional[int] = None, ): """Configuaration class for LLM provider and API token.""" self.provider = provider @@ -1098,7 +1105,13 @@ class LLMConfig: DEFAULT_PROVIDER_API_KEY ) self.base_url = base_url - + self.temprature = temprature + self.max_tokens = max_tokens + self.top_p = top_p + self.frequency_penalty = frequency_penalty + self.presence_penalty = presence_penalty + self.stop = stop + self.n = n @staticmethod def from_kwargs(kwargs: dict) -> "LLMConfig": @@ -1106,13 +1119,27 @@ class LLMConfig: provider=kwargs.get("provider", DEFAULT_PROVIDER), api_token=kwargs.get("api_token"), base_url=kwargs.get("base_url"), + temprature=kwargs.get("temprature"), + max_tokens=kwargs.get("max_tokens"), + top_p=kwargs.get("top_p"), + frequency_penalty=kwargs.get("frequency_penalty"), + presence_penalty=kwargs.get("presence_penalty"), + stop=kwargs.get("stop"), + n=kwargs.get("n") ) def to_dict(self): return { "provider": self.provider, "api_token": self.api_token, - "base_url": self.base_url + "base_url": self.base_url, + "temprature": self.temprature, + "max_tokens": self.max_tokens, + "top_p": self.top_p, + "frequency_penalty": self.frequency_penalty, + "presence_penalty": self.presence_penalty, + "stop": self.stop, + "n": self.n } def clone(self, **kwargs):