diff --git a/.gitignore b/.gitignore index db833e57..a290ab7d 100644 --- a/.gitignore +++ b/.gitignore @@ -255,3 +255,6 @@ continue_config.json .llm.env .private/ + +CLAUDE_MONITOR.md +CLAUDE.md \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 96b1eb0f..61161f92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,39 @@ All notable changes to Crawl4AI will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Version 0.5.0.post5 (2025-03-14) + +### Added + +- *(crawler)* Add experimental parameters dictionary to CrawlerRunConfig to support beta features +- *(tables)* Add comprehensive table detection and extraction functionality with scoring system +- *(monitor)* Add real-time crawler monitoring system with memory management +- *(content)* Add target_elements parameter for selective content extraction +- *(browser)* Add standalone CDP browser launch capability +- *(schema)* Add preprocess_html_for_schema utility for better HTML cleaning +- *(api)* Add special handling for single URL requests in Docker API + +### Changed + +- *(filters)* Add reverse option to URLPatternFilter for inverting filter logic +- *(browser)* Make CSP nonce headers optional via experimental config +- *(browser)* Remove default cookie injection from page initialization +- *(crawler)* Optimize response handling for single-URL processing +- *(api)* Refactor crawl request handling to streamline processing +- *(config)* Update default provider to gpt-4o +- *(cache)* Change default cache_mode from aggressive to bypass in examples + +### Fixed + +- *(browser)* Clean up browser context creation code +- *(api)* Improve code formatting in API handler + +### Breaking Changes + +- WebScrapingStrategy no longer returns 'scraped_html' in its output dictionary +- Table extraction logic has been modified to better handle thead/tbody structures +- Default cookie injection has been removed from page initialization + ## Version 0.5.0 (2025-03-02) ### Added diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index ff238964..0ab808f3 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -33,13 +33,12 @@ from .content_filter_strategy import ( LLMContentFilter, RelevantContentFilter, ) -from .models import CrawlResult, MarkdownGenerationResult +from .models import CrawlResult, MarkdownGenerationResult, DisplayMode +from .components.crawler_monitor import CrawlerMonitor from .async_dispatcher import ( MemoryAdaptiveDispatcher, SemaphoreDispatcher, RateLimiter, - CrawlerMonitor, - DisplayMode, BaseDispatcher, ) from .docker_client import Crawl4aiDockerClient diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 937ae4eb..fc4c059c 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1,6 +1,7 @@ import os from .config import ( DEFAULT_PROVIDER, + DEFAULT_PROVIDER_API_KEY, MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, PROVIDER_MODELS, @@ -649,6 +650,12 @@ class CrawlerRunConfig(): user_agent_generator_config (dict or None): Configuration for user agent generation if user_agent_mode is set. Default: None. + # Experimental Parameters + experimental (dict): Dictionary containing experimental parameters that are in beta phase. + This allows passing temporary features that are not yet fully integrated + into the main parameter set. + Default: None. + url: str = None # This is not a compulsory parameter """ @@ -731,6 +738,8 @@ class CrawlerRunConfig(): user_agent_generator_config: dict = {}, # Deep Crawl Parameters deep_crawl_strategy: Optional[DeepCrawlStrategy] = None, + # Experimental Parameters + experimental: Dict[str, Any] = None, ): # TODO: Planning to set properties dynamically based on the __init__ signature self.url = url @@ -844,6 +853,9 @@ class CrawlerRunConfig(): # Deep Crawl Parameters self.deep_crawl_strategy = deep_crawl_strategy + + # Experimental Parameters + self.experimental = experimental or {} def __getattr__(self, name): @@ -952,6 +964,8 @@ class CrawlerRunConfig(): # Deep Crawl Parameters deep_crawl_strategy=kwargs.get("deep_crawl_strategy"), url=kwargs.get("url"), + # Experimental Parameters + experimental=kwargs.get("experimental"), ) # Create a funciton returns dict of the object @@ -1036,6 +1050,7 @@ class CrawlerRunConfig(): "user_agent_generator_config": self.user_agent_generator_config, "deep_crawl_strategy": self.deep_crawl_strategy, "url": self.url, + "experimental": self.experimental, } def clone(self, **kwargs): @@ -1071,6 +1086,13 @@ class LLMConfig: provider: str = DEFAULT_PROVIDER, api_token: Optional[str] = None, base_url: Optional[str] = None, + temprature: Optional[float] = None, + max_tokens: Optional[int] = None, + top_p: Optional[float] = None, + frequency_penalty: Optional[float] = None, + presence_penalty: Optional[float] = None, + stop: Optional[List[str]] = None, + n: Optional[int] = None, ): """Configuaration class for LLM provider and API token.""" self.provider = provider @@ -1080,10 +1102,16 @@ class LLMConfig: self.api_token = os.getenv(api_token[4:]) else: self.api_token = PROVIDER_MODELS.get(provider, "no-token") or os.getenv( - "OPENAI_API_KEY" + DEFAULT_PROVIDER_API_KEY ) self.base_url = base_url - + self.temprature = temprature + self.max_tokens = max_tokens + self.top_p = top_p + self.frequency_penalty = frequency_penalty + self.presence_penalty = presence_penalty + self.stop = stop + self.n = n @staticmethod def from_kwargs(kwargs: dict) -> "LLMConfig": @@ -1091,13 +1119,27 @@ class LLMConfig: provider=kwargs.get("provider", DEFAULT_PROVIDER), api_token=kwargs.get("api_token"), base_url=kwargs.get("base_url"), + temprature=kwargs.get("temprature"), + max_tokens=kwargs.get("max_tokens"), + top_p=kwargs.get("top_p"), + frequency_penalty=kwargs.get("frequency_penalty"), + presence_penalty=kwargs.get("presence_penalty"), + stop=kwargs.get("stop"), + n=kwargs.get("n") ) def to_dict(self): return { "provider": self.provider, "api_token": self.api_token, - "base_url": self.base_url + "base_url": self.base_url, + "temprature": self.temprature, + "max_tokens": self.max_tokens, + "top_p": self.top_p, + "frequency_penalty": self.frequency_penalty, + "presence_penalty": self.presence_penalty, + "stop": self.stop, + "n": self.n } def clone(self, **kwargs): diff --git a/crawl4ai/async_crawler_strategy.py b/crawl4ai/async_crawler_strategy.py index 960c2d6f..37aa0962 100644 --- a/crawl4ai/async_crawler_strategy.py +++ b/crawl4ai/async_crawler_strategy.py @@ -507,10 +507,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): # Get page for session page, context = await self.browser_manager.get_page(crawlerRunConfig=config) + # await page.goto(URL) + # Add default cookie - await context.add_cookies( - [{"name": "cookiesEnabled", "value": "true", "url": url}] - ) + # await context.add_cookies( + # [{"name": "cookiesEnabled", "value": "true", "url": url}] + # ) # Handle navigator overrides if config.override_navigator or config.simulate_user or config.magic: @@ -562,14 +564,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): try: # Generate a unique nonce for this request - nonce = hashlib.sha256(os.urandom(32)).hexdigest() + if config.experimental.get("use_csp_nonce", False): + nonce = hashlib.sha256(os.urandom(32)).hexdigest() - # Add CSP headers to the request - await page.set_extra_http_headers( - { - "Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'" - } - ) + # Add CSP headers to the request + await page.set_extra_http_headers( + { + "Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'" + } + ) response = await page.goto( url, wait_until=config.wait_until, timeout=config.page_timeout diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index b587d011..b97d59a7 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -4,17 +4,15 @@ from .models import ( CrawlResult, CrawlerTaskResult, CrawlStatus, - DisplayMode, - CrawlStats, DomainState, ) -from rich.live import Live -from rich.table import Table -from rich.console import Console -from rich import box -from datetime import timedelta, datetime +from .components.crawler_monitor import CrawlerMonitor + +from .types import AsyncWebCrawler + from collections.abc import AsyncGenerator + import time import psutil import asyncio @@ -24,8 +22,6 @@ from urllib.parse import urlparse import random from abc import ABC, abstractmethod -from math import inf as infinity - class RateLimiter: def __init__( @@ -87,201 +83,6 @@ class RateLimiter: return True -class CrawlerMonitor: - def __init__( - self, - max_visible_rows: int = 15, - display_mode: DisplayMode = DisplayMode.DETAILED, - ): - self.console = Console() - self.max_visible_rows = max_visible_rows - self.display_mode = display_mode - self.stats: Dict[str, CrawlStats] = {} - self.process = psutil.Process() - self.start_time = time.time() - self.live = Live(self._create_table(), refresh_per_second=2) - - def start(self): - self.live.start() - - def stop(self): - self.live.stop() - - def add_task(self, task_id: str, url: str): - self.stats[task_id] = CrawlStats( - task_id=task_id, url=url, status=CrawlStatus.QUEUED - ) - self.live.update(self._create_table()) - - def update_task(self, task_id: str, **kwargs): - if task_id in self.stats: - for key, value in kwargs.items(): - setattr(self.stats[task_id], key, value) - self.live.update(self._create_table()) - - def _create_aggregated_table(self) -> Table: - """Creates a compact table showing only aggregated statistics""" - table = Table( - box=box.ROUNDED, - title="Crawler Status Overview", - title_style="bold magenta", - header_style="bold blue", - show_lines=True, - ) - - # Calculate statistics - total_tasks = len(self.stats) - queued = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED - ) - in_progress = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - # Memory statistics - current_memory = self.process.memory_info().rss / (1024 * 1024) - total_task_memory = sum(stat.memory_usage for stat in self.stats.values()) - peak_memory = max( - (stat.peak_memory for stat in self.stats.values()), default=0.0 - ) - - # Duration - duration = time.time() - self.start_time - - # Create status row - table.add_column("Status", style="bold cyan") - table.add_column("Count", justify="right") - table.add_column("Percentage", justify="right") - - table.add_row("Total Tasks", str(total_tasks), "100%") - table.add_row( - "[yellow]In Queue[/yellow]", - str(queued), - f"{(queued / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[blue]In Progress[/blue]", - str(in_progress), - f"{(in_progress / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[green]Completed[/green]", - str(completed), - f"{(completed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - table.add_row( - "[red]Failed[/red]", - str(failed), - f"{(failed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%", - ) - - # Add memory information - table.add_section() - table.add_row( - "[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", "" - ) - table.add_row( - "[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", "" - ) - table.add_row( - "[yellow]Runtime[/yellow]", - str(timedelta(seconds=int(duration))), - "", - ) - - return table - - def _create_detailed_table(self) -> Table: - table = Table( - box=box.ROUNDED, - title="Crawler Performance Monitor", - title_style="bold magenta", - header_style="bold blue", - ) - - # Add columns - table.add_column("Task ID", style="cyan", no_wrap=True) - table.add_column("URL", style="cyan", no_wrap=True) - table.add_column("Status", style="bold") - table.add_column("Memory (MB)", justify="right") - table.add_column("Peak (MB)", justify="right") - table.add_column("Duration", justify="right") - table.add_column("Info", style="italic") - - # Add summary row - total_memory = sum(stat.memory_usage for stat in self.stats.values()) - active_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS - ) - completed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED - ) - failed_count = sum( - 1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED - ) - - table.add_row( - "[bold yellow]SUMMARY", - f"Total: {len(self.stats)}", - f"Active: {active_count}", - f"{total_memory:.1f}", - f"{self.process.memory_info().rss / (1024 * 1024):.1f}", - str( - timedelta( - seconds=int(time.time() - self.start_time) - ) - ), - f"✓{completed_count} ✗{failed_count}", - style="bold", - ) - - table.add_section() - - # Add rows for each task - visible_stats = sorted( - self.stats.values(), - key=lambda x: ( - x.status != CrawlStatus.IN_PROGRESS, - x.status != CrawlStatus.QUEUED, - x.end_time or infinity, - ), - )[: self.max_visible_rows] - - for stat in visible_stats: - status_style = { - CrawlStatus.QUEUED: "white", - CrawlStatus.IN_PROGRESS: "yellow", - CrawlStatus.COMPLETED: "green", - CrawlStatus.FAILED: "red", - }[stat.status] - - table.add_row( - stat.task_id[:8], # Show first 8 chars of task ID - stat.url[:40] + "..." if len(stat.url) > 40 else stat.url, - f"[{status_style}]{stat.status.value}[/{status_style}]", - f"{stat.memory_usage:.1f}", - f"{stat.peak_memory:.1f}", - stat.duration, - stat.error_message[:40] if stat.error_message else "", - ) - - return table - - def _create_table(self) -> Table: - """Creates the appropriate table based on display mode""" - if self.display_mode == DisplayMode.AGGREGATED: - return self._create_aggregated_table() - return self._create_detailed_table() - class BaseDispatcher(ABC): def __init__( @@ -309,7 +110,7 @@ class BaseDispatcher(ABC): async def run_urls( self, urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, # noqa: F821 config: CrawlerRunConfig, monitor: Optional[CrawlerMonitor] = None, ) -> List[CrawlerTaskResult]: @@ -320,71 +121,144 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): def __init__( self, memory_threshold_percent: float = 90.0, + critical_threshold_percent: float = 95.0, # New critical threshold + recovery_threshold_percent: float = 85.0, # New recovery threshold check_interval: float = 1.0, max_session_permit: int = 20, - memory_wait_timeout: float = 300.0, # 5 minutes default timeout + fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs rate_limiter: Optional[RateLimiter] = None, monitor: Optional[CrawlerMonitor] = None, ): super().__init__(rate_limiter, monitor) self.memory_threshold_percent = memory_threshold_percent + self.critical_threshold_percent = critical_threshold_percent + self.recovery_threshold_percent = recovery_threshold_percent self.check_interval = check_interval self.max_session_permit = max_session_permit - self.memory_wait_timeout = memory_wait_timeout - self.result_queue = asyncio.Queue() # Queue for storing results - + self.fairness_timeout = fairness_timeout + self.result_queue = asyncio.Queue() + self.task_queue = asyncio.PriorityQueue() # Priority queue for better management + self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode + self.current_memory_percent = 0.0 # Track current memory usage + + async def _memory_monitor_task(self): + """Background task to continuously monitor memory usage and update state""" + while True: + self.current_memory_percent = psutil.virtual_memory().percent + + # Enter memory pressure mode if we cross the threshold + if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent: + self.memory_pressure_mode = True + if self.monitor: + self.monitor.update_memory_status("PRESSURE") + + # Exit memory pressure mode if we go below recovery threshold + elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent: + self.memory_pressure_mode = False + if self.monitor: + self.monitor.update_memory_status("NORMAL") + + # In critical mode, we might need to take more drastic action + if self.current_memory_percent >= self.critical_threshold_percent: + if self.monitor: + self.monitor.update_memory_status("CRITICAL") + # We could implement additional memory-saving measures here + + await asyncio.sleep(self.check_interval) + + def _get_priority_score(self, wait_time: float, retry_count: int) -> float: + """Calculate priority score (lower is higher priority) + - URLs waiting longer than fairness_timeout get higher priority + - More retry attempts decreases priority + """ + if wait_time > self.fairness_timeout: + # High priority for long-waiting URLs + return -wait_time + # Standard priority based on retries + return retry_count + async def crawl_url( self, url: str, config: CrawlerRunConfig, task_id: str, + retry_count: int = 0, ) -> CrawlerTaskResult: start_time = time.time() error_message = "" memory_usage = peak_memory = 0.0 - + + # Get starting memory for accurate measurement + process = psutil.Process() + start_memory = process.memory_info().rss / (1024 * 1024) + try: if self.monitor: self.monitor.update_task( - task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time + task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + retry_count=retry_count ) + self.concurrent_sessions += 1 - + if self.rate_limiter: await self.rate_limiter.wait_if_needed(url) - - process = psutil.Process() - start_memory = process.memory_info().rss / (1024 * 1024) + + # Check if we're in critical memory state + if self.current_memory_percent >= self.critical_threshold_percent: + # Requeue this task with increased priority and retry count + enqueue_time = time.time() + priority = self._get_priority_score(enqueue_time - start_time, retry_count + 1) + await self.task_queue.put((priority, (url, task_id, retry_count + 1, enqueue_time))) + + # Update monitoring + if self.monitor: + self.monitor.update_task( + task_id, + status=CrawlStatus.QUEUED, + error_message="Requeued due to critical memory pressure" + ) + + # Return placeholder result with requeued status + return CrawlerTaskResult( + task_id=task_id, + url=url, + result=CrawlResult( + url=url, html="", metadata={"status": "requeued"}, + success=False, error_message="Requeued due to critical memory pressure" + ), + memory_usage=0, + peak_memory=0, + start_time=start_time, + end_time=time.time(), + error_message="Requeued due to critical memory pressure", + retry_count=retry_count + 1 + ) + + # Execute the crawl result = await self.crawler.arun(url, config=config, session_id=task_id) + + # Measure memory usage end_memory = process.memory_info().rss / (1024 * 1024) - memory_usage = peak_memory = end_memory - start_memory - + + # Handle rate limiting if self.rate_limiter and result.status_code: if not self.rate_limiter.update_delay(url, result.status_code): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - result = CrawlerTaskResult( - task_id=task_id, - url=url, - result=result, - memory_usage=memory_usage, - peak_memory=peak_memory, - start_time=start_time, - end_time=time.time(), - error_message=error_message, - ) - await self.result_queue.put(result) - return result - + + # Update status based on result if not result.success: error_message = result.error_message if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) elif self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED) - + except Exception as e: error_message = str(e) if self.monitor: @@ -392,7 +266,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): result = CrawlResult( url=url, html="", metadata={}, success=False, error_message=str(e) ) - + finally: end_time = time.time() if self.monitor: @@ -402,9 +276,10 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): memory_usage=memory_usage, peak_memory=peak_memory, error_message=error_message, + retry_count=retry_count ) self.concurrent_sessions -= 1 - + return CrawlerTaskResult( task_id=task_id, url=url, @@ -414,116 +289,240 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): start_time=start_time, end_time=end_time, error_message=error_message, + retry_count=retry_count ) - + async def run_urls( self, urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, config: CrawlerRunConfig, ) -> List[CrawlerTaskResult]: self.crawler = crawler - + + # Start the memory monitor task + memory_monitor = asyncio.create_task(self._memory_monitor_task()) + if self.monitor: self.monitor.start() - + + results = [] + try: - pending_tasks = [] - active_tasks = [] - task_queue = [] - - for url in urls: - task_id = str(uuid.uuid4()) - if self.monitor: - self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) - - while task_queue or active_tasks: - wait_start_time = time.time() - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - # Check if we've exceeded the timeout - if time.time() - wait_start_time > self.memory_wait_timeout: - raise MemoryError( - f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds" - ) - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks: - await asyncio.sleep(self.check_interval) - continue - - done, pending = await asyncio.wait( - active_tasks, return_when=asyncio.FIRST_COMPLETED - ) - - pending_tasks.extend(done) - active_tasks = list(pending) - - return await asyncio.gather(*pending_tasks) - finally: - if self.monitor: - self.monitor.stop() - - async def run_urls_stream( - self, - urls: List[str], - crawler: "AsyncWebCrawler", # noqa: F821 - config: CrawlerRunConfig, - ) -> AsyncGenerator[CrawlerTaskResult, None]: - self.crawler = crawler - if self.monitor: - self.monitor.start() - - try: - active_tasks = [] - task_queue = [] - completed_count = 0 - total_urls = len(urls) - # Initialize task queue for url in urls: task_id = str(uuid.uuid4()) if self.monitor: self.monitor.add_task(task_id, url) - task_queue.append((url, task_id)) - - while completed_count < total_urls: - # Start new tasks if memory permits - while len(active_tasks) < self.max_session_permit and task_queue: - if psutil.virtual_memory().percent >= self.memory_threshold_percent: - await asyncio.sleep(self.check_interval) - continue - - url, task_id = task_queue.pop(0) - task = asyncio.create_task(self.crawl_url(url, config, task_id)) - active_tasks.append(task) - - if not active_tasks and not task_queue: - break - - # Wait for any task to complete and yield results + # Add to queue with initial priority 0, retry count 0, and current time + await self.task_queue.put((0, (url, task_id, 0, time.time()))) + + active_tasks = [] + + # Process until both queues are empty + while not self.task_queue.empty() or active_tasks: + # If memory pressure is low, start new tasks + if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: + try: + # Try to get a task with timeout to avoid blocking indefinitely + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + except asyncio.TimeoutError: + # No tasks in queue, that's fine + pass + + # Wait for completion even if queue is starved if active_tasks: done, pending = await asyncio.wait( active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED ) + + # Process completed tasks for completed_task in done: result = await completed_task - completed_count += 1 - yield result + results.append(result) + + # Update active tasks list active_tasks = list(pending) else: - await asyncio.sleep(self.check_interval) + # If no active tasks but still waiting, sleep briefly + await asyncio.sleep(self.check_interval / 2) + + # Update priorities for waiting tasks if needed + await self._update_queue_priorities() + + return results + except Exception as e: + if self.monitor: + self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}") + finally: + # Clean up + memory_monitor.cancel() if self.monitor: self.monitor.stop() - + + async def _update_queue_priorities(self): + """Periodically update priorities of items in the queue to prevent starvation""" + # Skip if queue is empty + if self.task_queue.empty(): + return + + # Use a drain-and-refill approach to update all priorities + temp_items = [] + + # Drain the queue (with a safety timeout to prevent blocking) + try: + drain_start = time.time() + while not self.task_queue.empty() and time.time() - drain_start < 5.0: # 5 second safety timeout + try: + # Get item from queue with timeout + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Calculate new priority based on current wait time + current_time = time.time() + wait_time = current_time - enqueue_time + new_priority = self._get_priority_score(wait_time, retry_count) + + # Store with updated priority + temp_items.append((new_priority, (url, task_id, retry_count, enqueue_time))) + + # Update monitoring stats for this task + if self.monitor and task_id in self.monitor.stats: + self.monitor.update_task(task_id, wait_time=wait_time) + + except asyncio.TimeoutError: + # Queue might be empty or very slow + break + except Exception as e: + # If anything goes wrong, make sure we refill the queue with what we've got + self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}") + + # Calculate queue statistics + if temp_items and self.monitor: + total_queued = len(temp_items) + wait_times = [item[1][3] for item in temp_items] + highest_wait_time = time.time() - min(wait_times) if wait_times else 0 + avg_wait_time = sum(time.time() - t for t in wait_times) / len(wait_times) if wait_times else 0 + + # Update queue statistics in monitor + self.monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + + # Sort by priority (lowest number = highest priority) + temp_items.sort(key=lambda x: x[0]) + + # Refill the queue with updated priorities + for item in temp_items: + await self.task_queue.put(item) + + async def run_urls_stream( + self, + urls: List[str], + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlerTaskResult, None]: + self.crawler = crawler + + # Start the memory monitor task + memory_monitor = asyncio.create_task(self._memory_monitor_task()) + + if self.monitor: + self.monitor.start() + + try: + # Initialize task queue + for url in urls: + task_id = str(uuid.uuid4()) + if self.monitor: + self.monitor.add_task(task_id, url) + # Add to queue with initial priority 0, retry count 0, and current time + await self.task_queue.put((0, (url, task_id, 0, time.time()))) + + active_tasks = [] + completed_count = 0 + total_urls = len(urls) + + while completed_count < total_urls: + # If memory pressure is low, start new tasks + if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: + try: + # Try to get a task with timeout + priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( + self.task_queue.get(), timeout=0.1 + ) + + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + except asyncio.TimeoutError: + # No tasks in queue, that's fine + pass + + # Process completed tasks and yield results + if active_tasks: + done, pending = await asyncio.wait( + active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED + ) + + for completed_task in done: + result = await completed_task + + # Only count as completed if it wasn't requeued + if "requeued" not in result.error_message: + completed_count += 1 + yield result + + # Update active tasks list + active_tasks = list(pending) + else: + # If no active tasks but still waiting, sleep briefly + await asyncio.sleep(self.check_interval / 2) + + # Update priorities for waiting tasks if needed + await self._update_queue_priorities() + + finally: + # Clean up + memory_monitor.cancel() + if self.monitor: + self.monitor.stop() + class SemaphoreDispatcher(BaseDispatcher): def __init__( @@ -620,7 +619,7 @@ class SemaphoreDispatcher(BaseDispatcher): async def run_urls( self, - crawler: "AsyncWebCrawler", # noqa: F821 + crawler: AsyncWebCrawler, # noqa: F821 urls: List[str], config: CrawlerRunConfig, ) -> List[CrawlerTaskResult]: @@ -644,4 +643,4 @@ class SemaphoreDispatcher(BaseDispatcher): return await asyncio.gather(*tasks, return_exceptions=True) finally: if self.monitor: - self.monitor.stop() + self.monitor.stop() \ No newline at end of file diff --git a/crawl4ai/browser_manager.py b/crawl4ai/browser_manager.py index 38f87d9a..06b36a32 100644 --- a/crawl4ai/browser_manager.py +++ b/crawl4ai/browser_manager.py @@ -443,19 +443,6 @@ class BrowserManager: self.default_context = contexts[0] else: self.default_context = await self.create_browser_context() - # self.default_context = await self.browser.new_context( - # viewport={ - # "width": self.config.viewport_width, - # "height": self.config.viewport_height, - # }, - # storage_state=self.config.storage_state, - # user_agent=self.config.headers.get( - # "User-Agent", self.config.user_agent - # ), - # accept_downloads=self.config.accept_downloads, - # ignore_https_errors=self.config.ignore_https_errors, - # java_script_enabled=self.config.java_script_enabled, - # ) await self.setup_context(self.default_context) else: browser_args = self._build_browser_args() @@ -470,6 +457,7 @@ class BrowserManager: self.default_context = self.browser + def _build_browser_args(self) -> dict: """Build browser launch arguments from config.""" args = [ diff --git a/crawl4ai/components/crawler_monitor.py b/crawl4ai/components/crawler_monitor.py new file mode 100644 index 00000000..49bf9a15 --- /dev/null +++ b/crawl4ai/components/crawler_monitor.py @@ -0,0 +1,837 @@ +import time +import uuid +import threading +import psutil +from datetime import datetime, timedelta +from typing import Dict, Optional, List +import threading +from rich.console import Console +from rich.layout import Layout +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from rich.live import Live +from rich import box +from ..models import CrawlStatus + +class TerminalUI: + """Terminal user interface for CrawlerMonitor using rich library.""" + + def __init__(self, refresh_rate: float = 1.0, max_width: int = 120): + """ + Initialize the terminal UI. + + Args: + refresh_rate: How often to refresh the UI (in seconds) + max_width: Maximum width of the UI in characters + """ + self.console = Console(width=max_width) + self.layout = Layout() + self.refresh_rate = refresh_rate + self.stop_event = threading.Event() + self.ui_thread = None + self.monitor = None # Will be set by CrawlerMonitor + self.max_width = max_width + + # Setup layout - vertical layout (top to bottom) + self.layout.split( + Layout(name="header", size=3), + Layout(name="pipeline_status", size=10), + Layout(name="task_details", ratio=1), + Layout(name="footer", size=3) # Increased footer size to fit all content + ) + + def start(self, monitor): + """Start the UI thread.""" + self.monitor = monitor + self.stop_event.clear() + self.ui_thread = threading.Thread(target=self._ui_loop) + self.ui_thread.daemon = True + self.ui_thread.start() + + def stop(self): + """Stop the UI thread.""" + if self.ui_thread and self.ui_thread.is_alive(): + self.stop_event.set() + # Only try to join if we're not in the UI thread + # This prevents "cannot join current thread" errors + if threading.current_thread() != self.ui_thread: + self.ui_thread.join(timeout=5.0) + + def _ui_loop(self): + """Main UI rendering loop.""" + import sys + import select + import termios + import tty + + # Setup terminal for non-blocking input + old_settings = termios.tcgetattr(sys.stdin) + try: + tty.setcbreak(sys.stdin.fileno()) + + # Use Live display to render the UI + with Live(self.layout, refresh_per_second=1/self.refresh_rate, screen=True) as live: + self.live = live # Store the live display for updates + + # Main UI loop + while not self.stop_event.is_set(): + self._update_display() + + # Check for key press (non-blocking) + if select.select([sys.stdin], [], [], 0)[0]: + key = sys.stdin.read(1) + # Check for 'q' to quit + if key == 'q': + # Signal stop but don't call monitor.stop() from UI thread + # as it would cause the thread to try to join itself + self.stop_event.set() + self.monitor.is_running = False + break + + time.sleep(self.refresh_rate) + + # Just check if the monitor was stopped + if not self.monitor.is_running: + break + finally: + # Restore terminal settings + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) + + def _update_display(self): + """Update the terminal display with current statistics.""" + if not self.monitor: + return + + # Update crawler status panel + self.layout["header"].update(self._create_status_panel()) + + # Update pipeline status panel and task details panel + self.layout["pipeline_status"].update(self._create_pipeline_panel()) + self.layout["task_details"].update(self._create_task_details_panel()) + + # Update footer + self.layout["footer"].update(self._create_footer()) + + def _create_status_panel(self) -> Panel: + """Create the crawler status panel.""" + summary = self.monitor.get_summary() + + # Format memory status with icon + memory_status = self.monitor.get_memory_status() + memory_icon = "🟢" # Default NORMAL + if memory_status == "PRESSURE": + memory_icon = "🟠" + elif memory_status == "CRITICAL": + memory_icon = "🔴" + + # Get current memory usage + current_memory = psutil.Process().memory_info().rss / (1024 * 1024) # MB + memory_percent = (current_memory / psutil.virtual_memory().total) * 100 + + # Format runtime + runtime = self.monitor._format_time(time.time() - self.monitor.start_time if self.monitor.start_time else 0) + + # Create the status text + status_text = Text() + status_text.append(f"Web Crawler Dashboard | Runtime: {runtime} | Memory: {memory_percent:.1f}% {memory_icon}\n") + status_text.append(f"Status: {memory_status} | URLs: {summary['urls_completed']}/{summary['urls_total']} | ") + status_text.append(f"Peak Mem: {summary['peak_memory_percent']:.1f}% at {self.monitor._format_time(summary['peak_memory_time'])}") + + return Panel(status_text, title="Crawler Status", border_style="blue") + + def _create_pipeline_panel(self) -> Panel: + """Create the pipeline status panel.""" + summary = self.monitor.get_summary() + queue_stats = self.monitor.get_queue_stats() + + # Create a table for status counts + table = Table(show_header=True, box=None) + table.add_column("Status", style="cyan") + table.add_column("Count", justify="right") + table.add_column("Percentage", justify="right") + table.add_column("Stat", style="cyan") + table.add_column("Value", justify="right") + + # Calculate overall progress + progress = f"{summary['urls_completed']}/{summary['urls_total']}" + progress_percent = f"{summary['completion_percentage']:.1f}%" + + # Add rows for each status + table.add_row( + "Overall Progress", + progress, + progress_percent, + "Est. Completion", + summary.get('estimated_completion_time', "N/A") + ) + + # Add rows for each status + status_counts = summary['status_counts'] + total = summary['urls_total'] or 1 # Avoid division by zero + + # Status rows + table.add_row( + "Completed", + str(status_counts.get(CrawlStatus.COMPLETED.name, 0)), + f"{status_counts.get(CrawlStatus.COMPLETED.name, 0) / total * 100:.1f}%", + "Avg. Time/URL", + f"{summary.get('avg_task_duration', 0):.2f}s" + ) + + table.add_row( + "Failed", + str(status_counts.get(CrawlStatus.FAILED.name, 0)), + f"{status_counts.get(CrawlStatus.FAILED.name, 0) / total * 100:.1f}%", + "Concurrent Tasks", + str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0)) + ) + + table.add_row( + "In Progress", + str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0)), + f"{status_counts.get(CrawlStatus.IN_PROGRESS.name, 0) / total * 100:.1f}%", + "Queue Size", + str(queue_stats['total_queued']) + ) + + table.add_row( + "Queued", + str(status_counts.get(CrawlStatus.QUEUED.name, 0)), + f"{status_counts.get(CrawlStatus.QUEUED.name, 0) / total * 100:.1f}%", + "Max Wait Time", + f"{queue_stats['highest_wait_time']:.1f}s" + ) + + # Requeued is a special case as it's not a status + requeued_count = summary.get('requeued_count', 0) + table.add_row( + "Requeued", + str(requeued_count), + f"{summary.get('requeue_rate', 0):.1f}%", + "Avg Wait Time", + f"{queue_stats['avg_wait_time']:.1f}s" + ) + + # Add empty row for spacing + table.add_row( + "", + "", + "", + "Requeue Rate", + f"{summary.get('requeue_rate', 0):.1f}%" + ) + + return Panel(table, title="Pipeline Status", border_style="green") + + def _create_task_details_panel(self) -> Panel: + """Create the task details panel.""" + # Create a table for task details + table = Table(show_header=True, expand=True) + table.add_column("Task ID", style="cyan", no_wrap=True, width=10) + table.add_column("URL", style="blue", ratio=3) + table.add_column("Status", style="green", width=15) + table.add_column("Memory", justify="right", width=8) + table.add_column("Peak", justify="right", width=8) + table.add_column("Duration", justify="right", width=10) + + # Get all task stats + task_stats = self.monitor.get_all_task_stats() + + # Add summary row + active_tasks = sum(1 for stats in task_stats.values() + if stats['status'] == CrawlStatus.IN_PROGRESS.name) + + total_memory = sum(stats['memory_usage'] for stats in task_stats.values()) + total_peak = sum(stats['peak_memory'] for stats in task_stats.values()) + + # Summary row with separators + table.add_row( + "SUMMARY", + f"Total: {len(task_stats)}", + f"Active: {active_tasks}", + f"{total_memory:.1f}", + f"{total_peak:.1f}", + "N/A" + ) + + # Add a separator + table.add_row("—" * 10, "—" * 20, "—" * 10, "—" * 8, "—" * 8, "—" * 10) + + # Status icons + status_icons = { + CrawlStatus.QUEUED.name: "⏳", + CrawlStatus.IN_PROGRESS.name: "🔄", + CrawlStatus.COMPLETED.name: "✅", + CrawlStatus.FAILED.name: "❌" + } + + # Calculate how many rows we can display based on available space + # We can display more rows now that we have a dedicated panel + display_count = min(len(task_stats), 20) # Display up to 20 tasks + + # Add rows for each task + for task_id, stats in sorted( + list(task_stats.items())[:display_count], + # Sort: 1. IN_PROGRESS first, 2. QUEUED, 3. COMPLETED/FAILED by recency + key=lambda x: ( + 0 if x[1]['status'] == CrawlStatus.IN_PROGRESS.name else + 1 if x[1]['status'] == CrawlStatus.QUEUED.name else + 2, + -1 * (x[1].get('end_time', 0) or 0) # Most recent first + ) + ): + # Truncate task_id and URL for display + short_id = task_id[:8] + url = stats['url'] + if len(url) > 50: # Allow longer URLs in the dedicated panel + url = url[:47] + "..." + + # Format status with icon + status = f"{status_icons.get(stats['status'], '?')} {stats['status']}" + + # Add row + table.add_row( + short_id, + url, + status, + f"{stats['memory_usage']:.1f}", + f"{stats['peak_memory']:.1f}", + stats['duration'] if 'duration' in stats else "0:00" + ) + + return Panel(table, title="Task Details", border_style="yellow") + + def _create_footer(self) -> Panel: + """Create the footer panel.""" + from rich.columns import Columns + from rich.align import Align + + memory_status = self.monitor.get_memory_status() + memory_icon = "🟢" # Default NORMAL + if memory_status == "PRESSURE": + memory_icon = "🟠" + elif memory_status == "CRITICAL": + memory_icon = "🔴" + + # Left section - memory status + left_text = Text() + left_text.append("Memory Status: ", style="bold") + status_style = "green" if memory_status == "NORMAL" else "yellow" if memory_status == "PRESSURE" else "red bold" + left_text.append(f"{memory_icon} {memory_status}", style=status_style) + + # Center section - copyright + center_text = Text("© Crawl4AI 2025 | Made by UnclecCode", style="cyan italic") + + # Right section - quit instruction + right_text = Text() + right_text.append("Press ", style="bold") + right_text.append("q", style="white on blue") + right_text.append(" to quit", style="bold") + + # Create columns with the three sections + footer_content = Columns( + [ + Align.left(left_text), + Align.center(center_text), + Align.right(right_text) + ], + expand=True + ) + + # Create a more visible footer panel + return Panel( + footer_content, + border_style="white", + padding=(0, 1) # Add padding for better visibility + ) + + +class CrawlerMonitor: + """ + Comprehensive monitoring and visualization system for tracking web crawler operations in real-time. + Provides a terminal-based dashboard that displays task statuses, memory usage, queue statistics, + and performance metrics. + """ + + def __init__( + self, + urls_total: int = 0, + refresh_rate: float = 1.0, + enable_ui: bool = True, + max_width: int = 120 + ): + """ + Initialize the CrawlerMonitor. + + Args: + urls_total: Total number of URLs to be crawled + refresh_rate: How often to refresh the UI (in seconds) + enable_ui: Whether to display the terminal UI + max_width: Maximum width of the UI in characters + """ + # Core monitoring attributes + self.stats = {} # Task ID -> stats dict + self.memory_status = "NORMAL" + self.start_time = None + self.end_time = None + self.is_running = False + self.queue_stats = { + "total_queued": 0, + "highest_wait_time": 0.0, + "avg_wait_time": 0.0 + } + self.urls_total = urls_total + self.urls_completed = 0 + self.peak_memory_percent = 0.0 + self.peak_memory_time = 0.0 + + # Status counts + self.status_counts = { + CrawlStatus.QUEUED.name: 0, + CrawlStatus.IN_PROGRESS.name: 0, + CrawlStatus.COMPLETED.name: 0, + CrawlStatus.FAILED.name: 0 + } + + # Requeue tracking + self.requeued_count = 0 + + # Thread-safety + self._lock = threading.RLock() + + # Terminal UI + self.enable_ui = enable_ui + self.terminal_ui = TerminalUI( + refresh_rate=refresh_rate, + max_width=max_width + ) if enable_ui else None + + def start(self): + """ + Start the monitoring session. + + - Initializes the start_time + - Sets is_running to True + - Starts the terminal UI if enabled + """ + with self._lock: + self.start_time = time.time() + self.is_running = True + + # Start the terminal UI + if self.enable_ui and self.terminal_ui: + self.terminal_ui.start(self) + + def stop(self): + """ + Stop the monitoring session. + + - Records end_time + - Sets is_running to False + - Stops the terminal UI + - Generates final summary statistics + """ + with self._lock: + self.end_time = time.time() + self.is_running = False + + # Stop the terminal UI + if self.enable_ui and self.terminal_ui: + self.terminal_ui.stop() + + def add_task(self, task_id: str, url: str): + """ + Register a new task with the monitor. + + Args: + task_id: Unique identifier for the task + url: URL being crawled + + The task is initialized with: + - status: QUEUED + - url: The URL to crawl + - enqueue_time: Current time + - memory_usage: 0 + - peak_memory: 0 + - wait_time: 0 + - retry_count: 0 + """ + with self._lock: + self.stats[task_id] = { + "task_id": task_id, + "url": url, + "status": CrawlStatus.QUEUED.name, + "enqueue_time": time.time(), + "start_time": None, + "end_time": None, + "memory_usage": 0.0, + "peak_memory": 0.0, + "error_message": "", + "wait_time": 0.0, + "retry_count": 0, + "duration": "0:00", + "counted_requeue": False + } + + # Update status counts + self.status_counts[CrawlStatus.QUEUED.name] += 1 + + def update_task( + self, + task_id: str, + status: Optional[CrawlStatus] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + memory_usage: Optional[float] = None, + peak_memory: Optional[float] = None, + error_message: Optional[str] = None, + retry_count: Optional[int] = None, + wait_time: Optional[float] = None + ): + """ + Update statistics for a specific task. + + Args: + task_id: Unique identifier for the task + status: New status (QUEUED, IN_PROGRESS, COMPLETED, FAILED) + start_time: When task execution started + end_time: When task execution ended + memory_usage: Current memory usage in MB + peak_memory: Maximum memory usage in MB + error_message: Error description if failed + retry_count: Number of retry attempts + wait_time: Time spent in queue + + Updates task statistics and updates status counts. + If status changes, decrements old status count and + increments new status count. + """ + with self._lock: + # Check if task exists + if task_id not in self.stats: + return + + task_stats = self.stats[task_id] + + # Update status counts if status is changing + old_status = task_stats["status"] + if status and status.name != old_status: + self.status_counts[old_status] -= 1 + self.status_counts[status.name] += 1 + + # Track completion + if status == CrawlStatus.COMPLETED: + self.urls_completed += 1 + + # Track requeues + if old_status in [CrawlStatus.COMPLETED.name, CrawlStatus.FAILED.name] and not task_stats.get("counted_requeue", False): + self.requeued_count += 1 + task_stats["counted_requeue"] = True + + # Update task statistics + if status: + task_stats["status"] = status.name + if start_time is not None: + task_stats["start_time"] = start_time + if end_time is not None: + task_stats["end_time"] = end_time + if memory_usage is not None: + task_stats["memory_usage"] = memory_usage + + # Update peak memory if necessary + current_percent = (memory_usage / psutil.virtual_memory().total) * 100 + if current_percent > self.peak_memory_percent: + self.peak_memory_percent = current_percent + self.peak_memory_time = time.time() + + if peak_memory is not None: + task_stats["peak_memory"] = peak_memory + if error_message is not None: + task_stats["error_message"] = error_message + if retry_count is not None: + task_stats["retry_count"] = retry_count + if wait_time is not None: + task_stats["wait_time"] = wait_time + + # Calculate duration + if task_stats["start_time"]: + end = task_stats["end_time"] or time.time() + duration = end - task_stats["start_time"] + task_stats["duration"] = self._format_time(duration) + + def update_memory_status(self, status: str): + """ + Update the current memory status. + + Args: + status: Memory status (NORMAL, PRESSURE, CRITICAL, or custom) + + Also updates the UI to reflect the new status. + """ + with self._lock: + self.memory_status = status + + def update_queue_statistics( + self, + total_queued: int, + highest_wait_time: float, + avg_wait_time: float + ): + """ + Update statistics related to the task queue. + + Args: + total_queued: Number of tasks currently in queue + highest_wait_time: Longest wait time of any queued task + avg_wait_time: Average wait time across all queued tasks + """ + with self._lock: + self.queue_stats = { + "total_queued": total_queued, + "highest_wait_time": highest_wait_time, + "avg_wait_time": avg_wait_time + } + + def get_task_stats(self, task_id: str) -> Dict: + """ + Get statistics for a specific task. + + Args: + task_id: Unique identifier for the task + + Returns: + Dictionary containing all task statistics + """ + with self._lock: + return self.stats.get(task_id, {}).copy() + + def get_all_task_stats(self) -> Dict[str, Dict]: + """ + Get statistics for all tasks. + + Returns: + Dictionary mapping task_ids to their statistics + """ + with self._lock: + return self.stats.copy() + + def get_memory_status(self) -> str: + """ + Get the current memory status. + + Returns: + Current memory status string + """ + with self._lock: + return self.memory_status + + def get_queue_stats(self) -> Dict: + """ + Get current queue statistics. + + Returns: + Dictionary with queue statistics including: + - total_queued: Number of tasks in queue + - highest_wait_time: Longest wait time + - avg_wait_time: Average wait time + """ + with self._lock: + return self.queue_stats.copy() + + def get_summary(self) -> Dict: + """ + Get a summary of all crawler statistics. + + Returns: + Dictionary containing: + - runtime: Total runtime in seconds + - urls_total: Total URLs to process + - urls_completed: Number of completed URLs + - completion_percentage: Percentage complete + - status_counts: Count of tasks in each status + - memory_status: Current memory status + - peak_memory_percent: Highest memory usage + - peak_memory_time: When peak memory occurred + - avg_task_duration: Average task processing time + - estimated_completion_time: Projected finish time + - requeue_rate: Percentage of tasks requeued + """ + with self._lock: + # Calculate runtime + current_time = time.time() + runtime = current_time - (self.start_time or current_time) + + # Calculate completion percentage + completion_percentage = 0 + if self.urls_total > 0: + completion_percentage = (self.urls_completed / self.urls_total) * 100 + + # Calculate average task duration for completed tasks + completed_tasks = [ + task for task in self.stats.values() + if task["status"] == CrawlStatus.COMPLETED.name and task.get("start_time") and task.get("end_time") + ] + + avg_task_duration = 0 + if completed_tasks: + total_duration = sum(task["end_time"] - task["start_time"] for task in completed_tasks) + avg_task_duration = total_duration / len(completed_tasks) + + # Calculate requeue rate + requeue_rate = 0 + if len(self.stats) > 0: + requeue_rate = (self.requeued_count / len(self.stats)) * 100 + + # Calculate estimated completion time + estimated_completion_time = "N/A" + if avg_task_duration > 0 and self.urls_total > 0 and self.urls_completed > 0: + remaining_tasks = self.urls_total - self.urls_completed + estimated_seconds = remaining_tasks * avg_task_duration + estimated_completion_time = self._format_time(estimated_seconds) + + return { + "runtime": runtime, + "urls_total": self.urls_total, + "urls_completed": self.urls_completed, + "completion_percentage": completion_percentage, + "status_counts": self.status_counts.copy(), + "memory_status": self.memory_status, + "peak_memory_percent": self.peak_memory_percent, + "peak_memory_time": self.peak_memory_time, + "avg_task_duration": avg_task_duration, + "estimated_completion_time": estimated_completion_time, + "requeue_rate": requeue_rate, + "requeued_count": self.requeued_count + } + + def render(self): + """ + Render the terminal UI. + + This is the main UI rendering loop that: + 1. Updates all statistics + 2. Formats the display + 3. Renders the ASCII interface + 4. Handles keyboard input + + Note: The actual rendering is handled by the TerminalUI class + which uses the rich library's Live display. + """ + if self.enable_ui and self.terminal_ui: + # Force an update of the UI + if hasattr(self.terminal_ui, '_update_display'): + self.terminal_ui._update_display() + + def _format_time(self, seconds: float) -> str: + """ + Format time in hours:minutes:seconds. + + Args: + seconds: Time in seconds + + Returns: + Formatted time string (e.g., "1:23:45") + """ + delta = timedelta(seconds=int(seconds)) + hours, remainder = divmod(delta.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + if hours > 0: + return f"{hours}:{minutes:02}:{seconds:02}" + else: + return f"{minutes}:{seconds:02}" + + def _calculate_estimated_completion(self) -> str: + """ + Calculate estimated completion time based on current progress. + + Returns: + Formatted time string + """ + summary = self.get_summary() + return summary.get("estimated_completion_time", "N/A") + + +# Example code for testing +if __name__ == "__main__": + # Initialize the monitor + monitor = CrawlerMonitor(urls_total=100) + + # Start monitoring + monitor.start() + + try: + # Simulate some tasks + for i in range(20): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + monitor.add_task(task_id, url) + + # Simulate 20% of tasks are already running + if i < 4: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time() - 30, # Started 30 seconds ago + memory_usage=10.5 + ) + + # Simulate 10% of tasks are completed + if i >= 4 and i < 6: + start_time = time.time() - 60 + end_time = time.time() - 15 + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + memory_usage=8.2 + ) + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=end_time, + memory_usage=0, + peak_memory=15.7 + ) + + # Simulate 5% of tasks fail + if i >= 6 and i < 7: + start_time = time.time() - 45 + end_time = time.time() - 20 + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=start_time, + memory_usage=12.3 + ) + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=end_time, + memory_usage=0, + peak_memory=18.2, + error_message="Connection timeout" + ) + + # Simulate memory pressure + monitor.update_memory_status("PRESSURE") + + # Simulate queue statistics + monitor.update_queue_statistics( + total_queued=16, # 20 - 4 (in progress) + highest_wait_time=120.5, + avg_wait_time=60.2 + ) + + # Keep the monitor running for a demonstration + print("Crawler Monitor is running. Press 'q' to exit.") + while monitor.is_running: + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nExiting crawler monitor...") + finally: + # Stop the monitor + monitor.stop() + print("Crawler monitor exited successfully.") \ No newline at end of file diff --git a/crawl4ai/config.py b/crawl4ai/config.py index 790ba6d0..866c7dc0 100644 --- a/crawl4ai/config.py +++ b/crawl4ai/config.py @@ -4,7 +4,8 @@ from dotenv import load_dotenv load_dotenv() # Load environment variables from .env file # Default provider, ONLY used when the extraction strategy is LLMExtractionStrategy -DEFAULT_PROVIDER = "openai/gpt-4o-mini" +DEFAULT_PROVIDER = "openai/gpt-4o" +DEFAULT_PROVIDER_API_KEY = "OPENAI_API_KEY" MODEL_REPO_BRANCH = "new-release-0.0.2" # Provider-model dictionary, ONLY used when the extraction strategy is LLMExtractionStrategy PROVIDER_MODELS = { diff --git a/crawl4ai/crawlers/google_search/crawler.py b/crawl4ai/crawlers/google_search/crawler.py index cae5f81d..e1288de1 100644 --- a/crawl4ai/crawlers/google_search/crawler.py +++ b/crawl4ai/crawlers/google_search/crawler.py @@ -1,6 +1,6 @@ from crawl4ai import BrowserConfig, AsyncWebCrawler, CrawlerRunConfig, CacheMode from crawl4ai.hub import BaseCrawler -from crawl4ai.utils import optimize_html, get_home_folder +from crawl4ai.utils import optimize_html, get_home_folder, preprocess_html_for_schema from crawl4ai.extraction_strategy import JsonCssExtractionStrategy from pathlib import Path import json @@ -68,7 +68,8 @@ class GoogleSearchCrawler(BaseCrawler): home_dir = get_home_folder() if not schema_cache_path else schema_cache_path os.makedirs(f"{home_dir}/schema", exist_ok=True) - cleaned_html = optimize_html(html, threshold=100) + # cleaned_html = optimize_html(html, threshold=100) + cleaned_html = preprocess_html_for_schema(html) organic_schema = None if os.path.exists(f"{home_dir}/schema/organic_schema.json"): diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index 97512bf3..0e0300fb 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -34,7 +34,7 @@ from .model_loader import ( calculate_batch_size ) -from .types import LLMConfig +from .types import LLMConfig, create_llm_config from functools import partial import numpy as np @@ -757,8 +757,6 @@ class LLMExtractionStrategy(ExtractionStrategy): ####################################################### # New extraction strategies for JSON-based extraction # ####################################################### - - class JsonElementExtractionStrategy(ExtractionStrategy): """ Abstract base class for extracting structured JSON from HTML content. @@ -1049,7 +1047,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy): schema_type: str = "CSS", # or XPATH query: str = None, target_json_example: str = None, - llm_config: 'LLMConfig' = None, + llm_config: 'LLMConfig' = create_llm_config(), provider: str = None, api_token: str = None, **kwargs @@ -1140,7 +1138,6 @@ In this scenario, use your best judgment to generate the schema. Try to maximize except Exception as e: raise Exception(f"Failed to generate schema: {str(e)}") - class JsonCssExtractionStrategy(JsonElementExtractionStrategy): """ Concrete implementation of `JsonElementExtractionStrategy` using CSS selectors. diff --git a/crawl4ai/models.py b/crawl4ai/models.py index 474e679e..a904e385 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -28,6 +28,12 @@ class CrawlerTaskResult: start_time: Union[datetime, float] end_time: Union[datetime, float] error_message: str = "" + retry_count: int = 0 + wait_time: float = 0.0 + + @property + def success(self) -> bool: + return self.result.success class CrawlStatus(Enum): @@ -67,6 +73,9 @@ class CrawlStats: memory_usage: float = 0.0 peak_memory: float = 0.0 error_message: str = "" + wait_time: float = 0.0 + retry_count: int = 0 + counted_requeue: bool = False @property def duration(self) -> str: @@ -87,6 +96,7 @@ class CrawlStats: duration = end - start return str(timedelta(seconds=int(duration.total_seconds()))) + class DisplayMode(Enum): DETAILED = "DETAILED" AGGREGATED = "AGGREGATED" diff --git a/crawl4ai/types.py b/crawl4ai/types.py index 2f689e1c..63fd45ba 100644 --- a/crawl4ai/types.py +++ b/crawl4ai/types.py @@ -178,4 +178,10 @@ if TYPE_CHECKING: BestFirstCrawlingStrategy as BestFirstCrawlingStrategyType, DFSDeepCrawlStrategy as DFSDeepCrawlStrategyType, DeepCrawlDecorator as DeepCrawlDecoratorType, - ) \ No newline at end of file + ) + + + +def create_llm_config(*args, **kwargs) -> 'LLMConfigType': + from .async_configs import LLMConfig + return LLMConfig(*args, **kwargs) diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py index 146ce06c..acaf7933 100644 --- a/crawl4ai/utils.py +++ b/crawl4ai/utils.py @@ -26,7 +26,7 @@ import cProfile import pstats from functools import wraps import asyncio - +from lxml import etree, html as lhtml import sqlite3 import hashlib @@ -2617,3 +2617,116 @@ class HeadPeekr: def get_title(head_content: str): title_match = re.search(r'(.*?)', head_content, re.IGNORECASE | re.DOTALL) return title_match.group(1) if title_match else None + +def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_threshold=200, max_size=100000): + """ + Preprocess HTML to reduce size while preserving structure for schema generation. + + Args: + html_content (str): Raw HTML content + text_threshold (int): Maximum length for text nodes before truncation + attr_value_threshold (int): Maximum length for attribute values before truncation + max_size (int): Target maximum size for output HTML + + Returns: + str: Preprocessed HTML content + """ + try: + # Parse HTML with error recovery + parser = etree.HTMLParser(remove_comments=True, remove_blank_text=True) + tree = lhtml.fromstring(html_content, parser=parser) + + # 1. Remove HEAD section (keep only BODY) + head_elements = tree.xpath('//head') + for head in head_elements: + if head.getparent() is not None: + head.getparent().remove(head) + + # 2. Define tags to remove completely + tags_to_remove = [ + 'script', 'style', 'noscript', 'iframe', 'canvas', 'svg', + 'video', 'audio', 'source', 'track', 'map', 'area' + ] + + # Remove unwanted elements + for tag in tags_to_remove: + elements = tree.xpath(f'//{tag}') + for element in elements: + if element.getparent() is not None: + element.getparent().remove(element) + + # 3. Process remaining elements to clean attributes and truncate text + for element in tree.iter(): + # Skip if we're at the root level + if element.getparent() is None: + continue + + # Clean non-essential attributes but preserve structural ones + # attribs_to_keep = {'id', 'class', 'name', 'href', 'src', 'type', 'value', 'data-'} + + # This is more aggressive than the previous version + attribs_to_keep = {'id', 'class', 'name', 'type', 'value'} + + # attributes_hates_truncate = ['id', 'class', "data-"] + + # This means, I don't care, if an attribute is too long, truncate it, go and find a better css selector to build a schema + attributes_hates_truncate = [] + + # Process each attribute + for attrib in list(element.attrib.keys()): + # Keep if it's essential or starts with data- + if not (attrib in attribs_to_keep or attrib.startswith('data-')): + element.attrib.pop(attrib) + # Truncate long attribute values except for selectors + elif attrib not in attributes_hates_truncate and len(element.attrib[attrib]) > attr_value_threshold: + element.attrib[attrib] = element.attrib[attrib][:attr_value_threshold] + '...' + + # Truncate text content if it's too long + if element.text and len(element.text.strip()) > text_threshold: + element.text = element.text.strip()[:text_threshold] + '...' + + # Also truncate tail text if present + if element.tail and len(element.tail.strip()) > text_threshold: + element.tail = element.tail.strip()[:text_threshold] + '...' + + # 4. Find repeated patterns and keep only a few examples + # This is a simplistic approach - more sophisticated pattern detection could be implemented + pattern_elements = {} + for element in tree.xpath('//*[contains(@class, "")]'): + parent = element.getparent() + if parent is None: + continue + + # Create a signature based on tag and classes + classes = element.get('class', '') + if not classes: + continue + signature = f"{element.tag}.{classes}" + + if signature in pattern_elements: + pattern_elements[signature].append(element) + else: + pattern_elements[signature] = [element] + + # Keep only 3 examples of each repeating pattern + for signature, elements in pattern_elements.items(): + if len(elements) > 3: + # Keep the first 2 and last elements + for element in elements[2:-1]: + if element.getparent() is not None: + element.getparent().remove(element) + + # 5. Convert back to string + result = etree.tostring(tree, encoding='unicode', method='html') + + # If still over the size limit, apply more aggressive truncation + if len(result) > max_size: + return result[:max_size] + "..." + + return result + + except Exception as e: + # Fallback for parsing errors + return html_content[:max_size] if len(html_content) > max_size else html_content + + diff --git a/deploy/docker/README.md b/deploy/docker/README.md index c4582031..b4b6e414 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -554,7 +554,7 @@ async def test_stream_crawl(session, token: str): "https://example.com/page3", ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } # headers = {"Authorization": f"Bearer {token}"} # If JWT is enabled, more on this later diff --git a/deploy/docker/api.py b/deploy/docker/api.py index c5700a9e..1dfd5ef9 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -2,6 +2,7 @@ import os import json import asyncio from typing import List, Tuple +from functools import partial import logging from typing import Optional, AsyncGenerator @@ -391,12 +392,13 @@ async def handle_crawl_request( ) async with AsyncWebCrawler(config=browser_config) as crawler: - results = await crawler.arun_many( - urls=urls, - config=crawler_config, - dispatcher=dispatcher - ) - + results = [] + func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many") + partial_func = partial(func, + urls[0] if len(urls) == 1 else urls, + config=crawler_config, + dispatcher=dispatcher) + results = await partial_func() return { "success": True, "results": [result.model_dump() for result in results] diff --git a/docs/examples/arun_vs_arun_many.py b/docs/examples/arun_vs_arun_many.py new file mode 100644 index 00000000..40bc4381 --- /dev/null +++ b/docs/examples/arun_vs_arun_many.py @@ -0,0 +1,79 @@ +import asyncio +import time +from crawl4ai.async_webcrawler import AsyncWebCrawler, CacheMode +from crawl4ai.async_configs import CrawlerRunConfig +from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher, RateLimiter + +VERBOSE = False + +async def crawl_sequential(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + results = [] + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + for url in urls: + result_container = await crawler.arun(url=url, config=config) + results.append(result_container[0]) + total_time = time.perf_counter() - start_time + return total_time, results + +async def crawl_parallel_dispatcher(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + # Dispatcher with rate limiter enabled (default behavior) + dispatcher = MemoryAdaptiveDispatcher( + rate_limiter=RateLimiter(base_delay=(1.0, 3.0), max_delay=60.0, max_retries=3), + max_session_permit=50, + ) + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher) + results = [] + if isinstance(result_container, list): + results = result_container + else: + async for res in result_container: + results.append(res) + total_time = time.perf_counter() - start_time + return total_time, results + +async def crawl_parallel_no_rate_limit(urls): + config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE) + # Dispatcher with no rate limiter and a high session permit to avoid queuing + dispatcher = MemoryAdaptiveDispatcher( + rate_limiter=None, + max_session_permit=len(urls) # allow all URLs concurrently + ) + start_time = time.perf_counter() + async with AsyncWebCrawler() as crawler: + result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher) + results = [] + if isinstance(result_container, list): + results = result_container + else: + async for res in result_container: + results.append(res) + total_time = time.perf_counter() - start_time + return total_time, results + +async def main(): + urls = ["https://example.com"] * 100 + print(f"Crawling {len(urls)} URLs sequentially...") + seq_time, seq_results = await crawl_sequential(urls) + print(f"Sequential crawling took: {seq_time:.2f} seconds\n") + + print(f"Crawling {len(urls)} URLs in parallel using arun_many with dispatcher (with rate limit)...") + disp_time, disp_results = await crawl_parallel_dispatcher(urls) + print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds\n") + + print(f"Crawling {len(urls)} URLs in parallel using dispatcher with no rate limiter...") + no_rl_time, no_rl_results = await crawl_parallel_no_rate_limit(urls) + print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds\n") + + print("Crawl4ai - Crawling Comparison") + print("--------------------------------------------------------") + print(f"Sequential crawling took: {seq_time:.2f} seconds") + print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds") + print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/examples/crawler_monitor_example.py b/docs/examples/crawler_monitor_example.py new file mode 100644 index 00000000..85d80ae6 --- /dev/null +++ b/docs/examples/crawler_monitor_example.py @@ -0,0 +1,209 @@ +""" +CrawlerMonitor Example + +This example demonstrates how to use the CrawlerMonitor component +to visualize and track web crawler operations in real-time. +""" + +import time +import uuid +import random +import threading +from crawl4ai.components.crawler_monitor import CrawlerMonitor +from crawl4ai.models import CrawlStatus + +def simulate_webcrawler_operations(monitor, num_tasks=20): + """ + Simulates a web crawler's operations with multiple tasks and different states. + + Args: + monitor: The CrawlerMonitor instance + num_tasks: Number of tasks to simulate + """ + print(f"Starting simulation with {num_tasks} tasks...") + + # Create and register all tasks first + task_ids = [] + for i in range(num_tasks): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + monitor.add_task(task_id, url) + task_ids.append((task_id, url)) + + # Small delay between task creation + time.sleep(0.2) + + # Process tasks with a variety of different behaviors + threads = [] + for i, (task_id, url) in enumerate(task_ids): + # Create a thread for each task + thread = threading.Thread( + target=process_task, + args=(monitor, task_id, url, i) + ) + thread.daemon = True + threads.append(thread) + + # Start threads in batches to simulate concurrent processing + batch_size = 4 # Process 4 tasks at a time + for i in range(0, len(threads), batch_size): + batch = threads[i:i+batch_size] + for thread in batch: + thread.start() + time.sleep(0.5) # Stagger thread start times + + # Wait a bit before starting next batch + time.sleep(random.uniform(1.0, 3.0)) + + # Update queue statistics + update_queue_stats(monitor) + + # Simulate memory pressure changes + active_threads = [t for t in threads if t.is_alive()] + if len(active_threads) > 8: + monitor.update_memory_status("CRITICAL") + elif len(active_threads) > 4: + monitor.update_memory_status("PRESSURE") + else: + monitor.update_memory_status("NORMAL") + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Final updates + update_queue_stats(monitor) + monitor.update_memory_status("NORMAL") + + print("Simulation completed!") + +def process_task(monitor, task_id, url, index): + """Simulate processing of a single task.""" + # Tasks start in queued state (already added) + + # Simulate waiting in queue + wait_time = random.uniform(0.5, 3.0) + time.sleep(wait_time) + + # Start processing - move to IN_PROGRESS + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time(), + wait_time=wait_time + ) + + # Simulate task processing with memory usage changes + total_process_time = random.uniform(2.0, 10.0) + step_time = total_process_time / 5 # Update in 5 steps + + for step in range(5): + # Simulate increasing then decreasing memory usage + if step < 3: # First 3 steps - increasing + memory_usage = random.uniform(5.0, 20.0) * (step + 1) + else: # Last 2 steps - decreasing + memory_usage = random.uniform(5.0, 20.0) * (5 - step) + + # Update peak memory if this is higher + peak = max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0)) + + monitor.update_task( + task_id=task_id, + memory_usage=memory_usage, + peak_memory=peak + ) + + time.sleep(step_time) + + # Determine final state - 80% success, 20% failure + if index % 5 == 0: # Every 5th task fails + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=time.time(), + memory_usage=0.0, + error_message="Connection timeout" + ) + else: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=time.time(), + memory_usage=0.0 + ) + +def update_queue_stats(monitor): + """Update queue statistics based on current tasks.""" + task_stats = monitor.get_all_task_stats() + + # Count queued tasks + queued_tasks = [ + stats for stats in task_stats.values() + if stats["status"] == CrawlStatus.QUEUED.name + ] + + total_queued = len(queued_tasks) + + if total_queued > 0: + current_time = time.time() + # Calculate wait times + wait_times = [ + current_time - stats.get("enqueue_time", current_time) + for stats in queued_tasks + ] + highest_wait_time = max(wait_times) if wait_times else 0.0 + avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0 + else: + highest_wait_time = 0.0 + avg_wait_time = 0.0 + + # Update monitor + monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + +def main(): + # Initialize the monitor + monitor = CrawlerMonitor( + urls_total=20, # Total URLs to process + refresh_rate=0.5, # Update UI twice per second + enable_ui=True, # Enable terminal UI + max_width=120 # Set maximum width to 120 characters + ) + + # Start the monitor + monitor.start() + + try: + # Run simulation + simulate_webcrawler_operations(monitor) + + # Keep monitor running a bit to see final state + print("Waiting to view final state...") + time.sleep(5) + + except KeyboardInterrupt: + print("\nExample interrupted by user") + finally: + # Stop the monitor + monitor.stop() + print("Example completed!") + + # Print some statistics + summary = monitor.get_summary() + print("\nCrawler Statistics Summary:") + print(f"Total URLs: {summary['urls_total']}") + print(f"Completed: {summary['urls_completed']}") + print(f"Completion percentage: {summary['completion_percentage']:.1f}%") + print(f"Peak memory usage: {summary['peak_memory_percent']:.1f}%") + + # Print task status counts + status_counts = summary['status_counts'] + print("\nTask Status Counts:") + for status, count in status_counts.items(): + print(f" {status}: {count}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/docs/examples/docker_python_rest_api.py b/docs/examples/docker_python_rest_api.py index 000d6464..6650f8d5 100644 --- a/docs/examples/docker_python_rest_api.py +++ b/docs/examples/docker_python_rest_api.py @@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str): # "https://news.ycombinator.com/news" ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } headers = {"Authorization": f"Bearer {token}"} print(f"\nTesting Streaming Crawl: {url}") diff --git a/pyproject.toml b/pyproject.toml index b4fb392f..ad07548d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ dependencies = [ "pyperclip>=1.8.2", "faust-cchardet>=2.1.19", "aiohttp>=3.11.11", - "humanize>=4.10.0" + "humanize>=4.10.0", ] classifiers = [ "Development Status :: 4 - Beta", diff --git a/tests/20241401/test_schema_builder.py b/tests/20241401/test_schema_builder.py index 431fb001..46d0e240 100644 --- a/tests/20241401/test_schema_builder.py +++ b/tests/20241401/test_schema_builder.py @@ -10,6 +10,7 @@ import asyncio from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, JsonXPathExtractionStrategy +from crawl4ai.utils import preprocess_html_for_schema, JsonXPathExtractionStrategy import json # Test HTML - A complex job board with companies, departments, and positions diff --git a/tests/docker/test_server_token.py b/tests/docker/test_server_token.py index d8c7df89..220b6ca2 100644 --- a/tests/docker/test_server_token.py +++ b/tests/docker/test_server_token.py @@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str): # "https://news.ycombinator.com/news" ], "browser_config": {"headless": True, "viewport": {"width": 1200}}, - "crawler_config": {"stream": True, "cache_mode": "aggressive"} + "crawler_config": {"stream": True, "cache_mode": "bypass"} } headers = {"Authorization": f"Bearer {token}"} print(f"\nTesting Streaming Crawl: {url}") diff --git a/tests/memory/test_crawler_monitor.py b/tests/memory/test_crawler_monitor.py new file mode 100644 index 00000000..89cc08b8 --- /dev/null +++ b/tests/memory/test_crawler_monitor.py @@ -0,0 +1,168 @@ +""" +Test script for the CrawlerMonitor component. +This script simulates a crawler with multiple tasks to demonstrate the real-time monitoring capabilities. +""" + +import time +import uuid +import random +import threading +import sys +import os + +# Add the parent directory to the path to import crawl4ai +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from crawl4ai.components.crawler_monitor import CrawlerMonitor +from crawl4ai.models import CrawlStatus + +def simulate_crawler_task(monitor, task_id, url, simulate_failure=False): + """Simulate a crawler task with different states.""" + # Task starts in the QUEUED state + wait_time = random.uniform(0.5, 3.0) + time.sleep(wait_time) + + # Update to IN_PROGRESS state + monitor.update_task( + task_id=task_id, + status=CrawlStatus.IN_PROGRESS, + start_time=time.time(), + wait_time=wait_time + ) + + # Simulate task running + process_time = random.uniform(1.0, 5.0) + for i in range(int(process_time * 2)): + # Simulate memory usage changes + memory_usage = random.uniform(5.0, 25.0) + monitor.update_task( + task_id=task_id, + memory_usage=memory_usage, + peak_memory=max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0)) + ) + time.sleep(0.5) + + # Update to COMPLETED or FAILED state + if simulate_failure and random.random() < 0.8: # 80% chance of failure if simulate_failure is True + monitor.update_task( + task_id=task_id, + status=CrawlStatus.FAILED, + end_time=time.time(), + error_message="Simulated failure: Connection timeout", + memory_usage=0.0 + ) + else: + monitor.update_task( + task_id=task_id, + status=CrawlStatus.COMPLETED, + end_time=time.time(), + memory_usage=0.0 + ) + +def update_queue_stats(monitor, num_queued_tasks): + """Update queue statistics periodically.""" + while monitor.is_running: + queued_tasks = [ + task for task_id, task in monitor.get_all_task_stats().items() + if task["status"] == CrawlStatus.QUEUED.name + ] + + total_queued = len(queued_tasks) + + if total_queued > 0: + current_time = time.time() + wait_times = [ + current_time - task.get("enqueue_time", current_time) + for task in queued_tasks + ] + highest_wait_time = max(wait_times) if wait_times else 0.0 + avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0 + else: + highest_wait_time = 0.0 + avg_wait_time = 0.0 + + monitor.update_queue_statistics( + total_queued=total_queued, + highest_wait_time=highest_wait_time, + avg_wait_time=avg_wait_time + ) + + # Simulate memory pressure based on number of active tasks + active_tasks = len([ + task for task_id, task in monitor.get_all_task_stats().items() + if task["status"] == CrawlStatus.IN_PROGRESS.name + ]) + + if active_tasks > 8: + monitor.update_memory_status("CRITICAL") + elif active_tasks > 4: + monitor.update_memory_status("PRESSURE") + else: + monitor.update_memory_status("NORMAL") + + time.sleep(1.0) + +def test_crawler_monitor(): + """Test the CrawlerMonitor with simulated crawler tasks.""" + # Total number of URLs to crawl + total_urls = 50 + + # Initialize the monitor + monitor = CrawlerMonitor(urls_total=total_urls, refresh_rate=0.5) + + # Start the monitor + monitor.start() + + # Start thread to update queue statistics + queue_stats_thread = threading.Thread(target=update_queue_stats, args=(monitor, total_urls)) + queue_stats_thread.daemon = True + queue_stats_thread.start() + + try: + # Create task threads + threads = [] + for i in range(total_urls): + task_id = str(uuid.uuid4()) + url = f"https://example.com/page{i}" + + # Add task to monitor + monitor.add_task(task_id, url) + + # Determine if this task should simulate failure + simulate_failure = (i % 10 == 0) # Every 10th task + + # Create and start thread for this task + thread = threading.Thread( + target=simulate_crawler_task, + args=(monitor, task_id, url, simulate_failure) + ) + thread.daemon = True + threads.append(thread) + + # Start threads with delay to simulate tasks being added over time + batch_size = 5 + for i in range(0, len(threads), batch_size): + batch = threads[i:i+batch_size] + for thread in batch: + thread.start() + time.sleep(0.5) # Small delay between starting threads + + # Wait a bit before starting the next batch + time.sleep(2.0) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Keep monitor running a bit longer to see the final state + time.sleep(5.0) + + except KeyboardInterrupt: + print("\nTest interrupted by user") + finally: + # Stop the monitor + monitor.stop() + print("\nCrawler monitor test completed") + +if __name__ == "__main__": + test_crawler_monitor() \ No newline at end of file diff --git a/tests/memory/test_dispatcher_stress.py b/tests/memory/test_dispatcher_stress.py new file mode 100644 index 00000000..f81f78f6 --- /dev/null +++ b/tests/memory/test_dispatcher_stress.py @@ -0,0 +1,410 @@ +import asyncio +import time +import psutil +import logging +import random +from typing import List, Dict +import uuid +import sys +import os + +# Import your crawler components +from crawl4ai.models import DisplayMode, CrawlStatus, CrawlResult +from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig, CacheMode +from crawl4ai import AsyncWebCrawler +from crawl4ai import MemoryAdaptiveDispatcher, CrawlerMonitor + +# Global configuration +STREAM = False # Toggle between streaming and non-streaming modes + +# Configure logging to file only (to avoid breaking the rich display) +os.makedirs("logs", exist_ok=True) +file_handler = logging.FileHandler("logs/memory_stress_test.log") +file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) + +# Root logger - only to file, not console +root_logger = logging.getLogger() +root_logger.setLevel(logging.INFO) +root_logger.addHandler(file_handler) + +# Our test logger also writes to file only +logger = logging.getLogger("memory_stress_test") +logger.setLevel(logging.INFO) +logger.addHandler(file_handler) +logger.propagate = False # Don't propagate to root logger + +# Create a memory restrictor to simulate limited memory environment +class MemorySimulator: + def __init__(self, target_percent: float = 85.0, aggressive: bool = False): + """Simulates memory pressure by allocating memory""" + self.target_percent = target_percent + self.memory_blocks: List[bytearray] = [] + self.aggressive = aggressive + + def apply_pressure(self, additional_percent: float = 0.0): + """Fill memory until we reach target percentage""" + current_percent = psutil.virtual_memory().percent + target = self.target_percent + additional_percent + + if current_percent >= target: + return # Already at target + + logger.info(f"Current memory: {current_percent}%, target: {target}%") + + # Calculate how much memory we need to allocate + total_memory = psutil.virtual_memory().total + target_usage = (target / 100.0) * total_memory + current_usage = (current_percent / 100.0) * total_memory + bytes_to_allocate = int(target_usage - current_usage) + + if bytes_to_allocate <= 0: + return + + # Allocate in smaller chunks to avoid overallocation + if self.aggressive: + # Use larger chunks for faster allocation in aggressive mode + chunk_size = min(bytes_to_allocate, 200 * 1024 * 1024) # 200MB chunks + else: + chunk_size = min(bytes_to_allocate, 50 * 1024 * 1024) # 50MB chunks + + try: + logger.info(f"Allocating {chunk_size / (1024 * 1024):.1f}MB to reach target memory usage") + self.memory_blocks.append(bytearray(chunk_size)) + time.sleep(0.5) # Give system time to register the allocation + except MemoryError: + logger.warning("Unable to allocate more memory") + + def release_pressure(self, percent: float = None): + """ + Release allocated memory + If percent is specified, release that percentage of blocks + """ + if not self.memory_blocks: + return + + if percent is None: + # Release all + logger.info(f"Releasing all {len(self.memory_blocks)} memory blocks") + self.memory_blocks.clear() + else: + # Release specified percentage + blocks_to_release = int(len(self.memory_blocks) * (percent / 100.0)) + if blocks_to_release > 0: + logger.info(f"Releasing {blocks_to_release} of {len(self.memory_blocks)} memory blocks ({percent}%)") + self.memory_blocks = self.memory_blocks[blocks_to_release:] + + def spike_pressure(self, duration: float = 5.0): + """ + Create a temporary spike in memory pressure then release + Useful for forcing requeues + """ + logger.info(f"Creating memory pressure spike for {duration} seconds") + # Save current blocks count + initial_blocks = len(self.memory_blocks) + + # Create spike with extra 5% + self.apply_pressure(additional_percent=5.0) + + # Schedule release after duration + asyncio.create_task(self._delayed_release(duration, initial_blocks)) + + async def _delayed_release(self, delay: float, target_blocks: int): + """Helper for spike_pressure - releases extra blocks after delay""" + await asyncio.sleep(delay) + + # Remove blocks added since spike started + if len(self.memory_blocks) > target_blocks: + logger.info(f"Releasing memory spike ({len(self.memory_blocks) - target_blocks} blocks)") + self.memory_blocks = self.memory_blocks[:target_blocks] + +# Test statistics collector +class TestResults: + def __init__(self): + self.start_time = time.time() + self.completed_urls: List[str] = [] + self.failed_urls: List[str] = [] + self.requeued_count = 0 + self.memory_warnings = 0 + self.max_memory_usage = 0.0 + self.max_queue_size = 0 + self.max_wait_time = 0.0 + self.url_to_attempt: Dict[str, int] = {} # Track retries per URL + + def log_summary(self): + duration = time.time() - self.start_time + logger.info("===== TEST SUMMARY =====") + logger.info(f"Stream mode: {'ON' if STREAM else 'OFF'}") + logger.info(f"Total duration: {duration:.1f} seconds") + logger.info(f"Completed URLs: {len(self.completed_urls)}") + logger.info(f"Failed URLs: {len(self.failed_urls)}") + logger.info(f"Requeue events: {self.requeued_count}") + logger.info(f"Memory warnings: {self.memory_warnings}") + logger.info(f"Max memory usage: {self.max_memory_usage:.1f}%") + logger.info(f"Max queue size: {self.max_queue_size}") + logger.info(f"Max wait time: {self.max_wait_time:.1f} seconds") + + # Log URLs with multiple attempts + retried_urls = {url: count for url, count in self.url_to_attempt.items() if count > 1} + if retried_urls: + logger.info(f"URLs with retries: {len(retried_urls)}") + # Log the top 5 most retried + top_retries = sorted(retried_urls.items(), key=lambda x: x[1], reverse=True)[:5] + for url, count in top_retries: + logger.info(f" URL {url[-30:]} had {count} attempts") + + # Write summary to a separate human-readable file + with open("logs/test_summary.txt", "w") as f: + f.write(f"Stream mode: {'ON' if STREAM else 'OFF'}\n") + f.write(f"Total duration: {duration:.1f} seconds\n") + f.write(f"Completed URLs: {len(self.completed_urls)}\n") + f.write(f"Failed URLs: {len(self.failed_urls)}\n") + f.write(f"Requeue events: {self.requeued_count}\n") + f.write(f"Memory warnings: {self.memory_warnings}\n") + f.write(f"Max memory usage: {self.max_memory_usage:.1f}%\n") + f.write(f"Max queue size: {self.max_queue_size}\n") + f.write(f"Max wait time: {self.max_wait_time:.1f} seconds\n") + +# Custom monitor with stats tracking +# Custom monitor that extends CrawlerMonitor with test-specific tracking +class StressTestMonitor(CrawlerMonitor): + def __init__(self, test_results: TestResults, **kwargs): + # Initialize the parent CrawlerMonitor + super().__init__(**kwargs) + self.test_results = test_results + + def update_memory_status(self, status: str): + if status != self.memory_status: + logger.info(f"Memory status changed: {self.memory_status} -> {status}") + if "CRITICAL" in status or "PRESSURE" in status: + self.test_results.memory_warnings += 1 + + # Track peak memory usage in test results + current_memory = psutil.virtual_memory().percent + self.test_results.max_memory_usage = max(self.test_results.max_memory_usage, current_memory) + + # Call parent method to update the dashboard + super().update_memory_status(status) + + def update_queue_statistics(self, total_queued: int, highest_wait_time: float, avg_wait_time: float): + # Track queue metrics in test results + self.test_results.max_queue_size = max(self.test_results.max_queue_size, total_queued) + self.test_results.max_wait_time = max(self.test_results.max_wait_time, highest_wait_time) + + # Call parent method to update the dashboard + super().update_queue_statistics(total_queued, highest_wait_time, avg_wait_time) + + def update_task(self, task_id: str, **kwargs): + # Track URL status changes for test results + if task_id in self.stats: + old_status = self.stats[task_id].status + + # If this is a requeue event (requeued due to memory pressure) + if 'error_message' in kwargs and 'requeued' in kwargs['error_message']: + if not hasattr(self.stats[task_id], 'counted_requeue') or not self.stats[task_id].counted_requeue: + self.test_results.requeued_count += 1 + self.stats[task_id].counted_requeue = True + + # Track completion status for test results + if 'status' in kwargs: + new_status = kwargs['status'] + if old_status != new_status: + if new_status == CrawlStatus.COMPLETED: + if task_id not in self.test_results.completed_urls: + self.test_results.completed_urls.append(task_id) + elif new_status == CrawlStatus.FAILED: + if task_id not in self.test_results.failed_urls: + self.test_results.failed_urls.append(task_id) + + # Call parent method to update the dashboard + super().update_task(task_id, **kwargs) + self.live.update(self._create_table()) + +# Generate test URLs - use example.com with unique paths to avoid browser caching +def generate_test_urls(count: int) -> List[str]: + urls = [] + for i in range(count): + # Add random path and query parameters to create unique URLs + path = f"/path/{uuid.uuid4()}" + query = f"?test={i}&random={random.randint(1, 100000)}" + urls.append(f"https://example.com{path}{query}") + return urls + +# Process result callback +async def process_result(result, test_results: TestResults): + # Track attempt counts + if result.url not in test_results.url_to_attempt: + test_results.url_to_attempt[result.url] = 1 + else: + test_results.url_to_attempt[result.url] += 1 + + if "requeued" in result.error_message: + test_results.requeued_count += 1 + logger.debug(f"Requeued due to memory pressure: {result.url}") + elif result.success: + test_results.completed_urls.append(result.url) + logger.debug(f"Successfully processed: {result.url}") + else: + test_results.failed_urls.append(result.url) + logger.warning(f"Failed to process: {result.url} - {result.error_message}") + +# Process multiple results (used in non-streaming mode) +async def process_results(results, test_results: TestResults): + for result in results: + await process_result(result, test_results) + +# Main test function for extreme memory pressure simulation +async def run_memory_stress_test( + url_count: int = 100, + target_memory_percent: float = 92.0, # Push to dangerous levels + chunk_size: int = 20, # Larger chunks for more chaos + aggressive: bool = False, + spikes: bool = True +): + test_results = TestResults() + memory_simulator = MemorySimulator(target_percent=target_memory_percent, aggressive=aggressive) + + logger.info(f"Starting stress test with {url_count} URLs in {'STREAM' if STREAM else 'NON-STREAM'} mode") + logger.info(f"Target memory usage: {target_memory_percent}%") + + # First, elevate memory usage to create pressure + logger.info("Creating initial memory pressure...") + memory_simulator.apply_pressure() + + # Create test URLs in chunks to simulate real-world crawling where URLs are discovered + all_urls = generate_test_urls(url_count) + url_chunks = [all_urls[i:i+chunk_size] for i in range(0, len(all_urls), chunk_size)] + + # Set up the crawler components - low memory thresholds to create more requeues + browser_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + verbose=False, + stream=STREAM # Use the global STREAM variable to set mode + ) + + # Create monitor with reference to test results + monitor = StressTestMonitor( + test_results=test_results, + display_mode=DisplayMode.DETAILED, + max_visible_rows=20, + total_urls=url_count # Pass total URLs count + ) + + # Create dispatcher with EXTREME settings - pure survival mode + # These settings are designed to create a memory battleground + dispatcher = MemoryAdaptiveDispatcher( + memory_threshold_percent=63.0, # Start throttling at just 60% memory + critical_threshold_percent=70.0, # Start requeuing at 70% - incredibly aggressive + recovery_threshold_percent=55.0, # Only resume normal ops when plenty of memory available + check_interval=0.1, # Check extremely frequently (100ms) + max_session_permit=20 if aggressive else 10, # Double the concurrent sessions - pure chaos + fairness_timeout=10.0, # Extremely low timeout - rapid priority changes + monitor=monitor + ) + + # Set up spike schedule if enabled + if spikes: + spike_intervals = [] + # Create 3-5 random spike times + num_spikes = random.randint(3, 5) + for _ in range(num_spikes): + # Schedule spikes at random chunks + chunk_index = random.randint(1, len(url_chunks) - 1) + spike_intervals.append(chunk_index) + logger.info(f"Scheduled memory spikes at chunks: {spike_intervals}") + + try: + async with AsyncWebCrawler(config=browser_config) as crawler: + # Process URLs in chunks to simulate discovering URLs over time + for chunk_index, url_chunk in enumerate(url_chunks): + logger.info(f"Processing chunk {chunk_index+1}/{len(url_chunks)} ({len(url_chunk)} URLs)") + + # Regular pressure increases + if chunk_index % 2 == 0: + logger.info("Increasing memory pressure...") + memory_simulator.apply_pressure() + + # Memory spike if scheduled for this chunk + if spikes and chunk_index in spike_intervals: + logger.info(f"⚠️ CREATING MASSIVE MEMORY SPIKE at chunk {chunk_index+1} ⚠️") + # Create a nightmare scenario - multiple overlapping spikes + memory_simulator.spike_pressure(duration=10.0) # 10-second spike + + # 50% chance of double-spike (pure evil) + if random.random() < 0.5: + await asyncio.sleep(2.0) # Wait 2 seconds + logger.info("💀 DOUBLE SPIKE - EXTREME MEMORY PRESSURE 💀") + memory_simulator.spike_pressure(duration=8.0) # 8-second overlapping spike + + if STREAM: + # Stream mode - process results as they come in + async for result in dispatcher.run_urls_stream( + urls=url_chunk, + crawler=crawler, + config=run_config + ): + await process_result(result, test_results) + else: + # Non-stream mode - get all results at once + results = await dispatcher.run_urls( + urls=url_chunk, + crawler=crawler, + config=run_config + ) + await process_results(results, test_results) + + # Simulate discovering more URLs while others are still processing + await asyncio.sleep(1) + + # RARELY release pressure - make the system fight for resources + if chunk_index % 5 == 4: # Less frequent releases + release_percent = random.choice([10, 15, 20]) # Smaller, inconsistent releases + logger.info(f"Releasing {release_percent}% of memory blocks - brief respite") + memory_simulator.release_pressure(percent=release_percent) + + except Exception as e: + logger.error(f"Test error: {str(e)}") + raise + finally: + # Release memory pressure + memory_simulator.release_pressure() + # Log final results + test_results.log_summary() + + # Check for success criteria + if len(test_results.completed_urls) + len(test_results.failed_urls) < url_count: + logger.error(f"TEST FAILED: Not all URLs were processed. {url_count - len(test_results.completed_urls) - len(test_results.failed_urls)} URLs missing.") + return False + + logger.info("TEST PASSED: All URLs were processed without crashing.") + return True + +# Command-line entry point +if __name__ == "__main__": + # Parse command line arguments + url_count = int(sys.argv[1]) if len(sys.argv) > 1 else 100 + target_memory = float(sys.argv[2]) if len(sys.argv) > 2 else 85.0 + + # Check if stream mode is specified + if len(sys.argv) > 3: + STREAM = sys.argv[3].lower() in ('true', 'yes', '1', 'stream') + + # Check if aggressive mode is specified + aggressive = False + if len(sys.argv) > 4: + aggressive = sys.argv[4].lower() in ('true', 'yes', '1', 'aggressive') + + print(f"Starting test with {url_count} URLs, {target_memory}% memory target") + print(f"Stream mode: {STREAM}, Aggressive: {aggressive}") + print("Logs will be written to the logs directory") + print("Live display starting now...") + + # Run the test + result = asyncio.run(run_memory_stress_test( + url_count=url_count, + target_memory_percent=target_memory, + aggressive=aggressive + )) + + # Exit with status code + sys.exit(0 if result else 1) \ No newline at end of file