From c8456d8a01ce0db8aa3738bff7a4fa5eece55e64 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Sun, 8 Jun 2025 07:52:21 +0200 Subject: [PATCH] Set memory_wait_timeout default to 10 minutes --- crawl4ai/async_dispatcher.py | 53 +++++++++++++++---- deploy/docker/c4ai-doc-context.md | 2 +- docs/md_v2/advanced/multi-url-crawling.md | 2 +- .../crawl4ai_all_reasoning_content.llm.txt | 2 +- 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index b97d59a7..1558efc0 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -126,6 +126,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): check_interval: float = 1.0, max_session_permit: int = 20, fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs + memory_wait_timeout: Optional[float] = 600.0, rate_limiter: Optional[RateLimiter] = None, monitor: Optional[CrawlerMonitor] = None, ): @@ -136,27 +137,46 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): self.check_interval = check_interval self.max_session_permit = max_session_permit self.fairness_timeout = fairness_timeout + self.memory_wait_timeout = memory_wait_timeout self.result_queue = asyncio.Queue() self.task_queue = asyncio.PriorityQueue() # Priority queue for better management self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode self.current_memory_percent = 0.0 # Track current memory usage + self._high_memory_start_time: Optional[float] = None async def _memory_monitor_task(self): """Background task to continuously monitor memory usage and update state""" while True: self.current_memory_percent = psutil.virtual_memory().percent - + # Enter memory pressure mode if we cross the threshold - if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent: - self.memory_pressure_mode = True - if self.monitor: - self.monitor.update_memory_status("PRESSURE") - + if self.current_memory_percent >= self.memory_threshold_percent: + if not self.memory_pressure_mode: + self.memory_pressure_mode = True + self._high_memory_start_time = time.time() + if self.monitor: + self.monitor.update_memory_status("PRESSURE") + else: + if self._high_memory_start_time is None: + self._high_memory_start_time = time.time() + if ( + self.memory_wait_timeout is not None + and self._high_memory_start_time is not None + and time.time() - self._high_memory_start_time >= self.memory_wait_timeout + ): + raise MemoryError( + "Memory usage exceeded threshold for" + f" {self.memory_wait_timeout} seconds" + ) + # Exit memory pressure mode if we go below recovery threshold elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent: self.memory_pressure_mode = False + self._high_memory_start_time = None if self.monitor: self.monitor.update_memory_status("NORMAL") + elif self.current_memory_percent < self.memory_threshold_percent: + self._high_memory_start_time = None # In critical mode, we might need to take more drastic action if self.current_memory_percent >= self.critical_threshold_percent: @@ -307,7 +327,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): self.monitor.start() results = [] - + try: # Initialize task queue for url in urls: @@ -316,11 +336,18 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): self.monitor.add_task(task_id, url) # Add to queue with initial priority 0, retry count 0, and current time await self.task_queue.put((0, (url, task_id, 0, time.time()))) - + active_tasks = [] - + # Process until both queues are empty while not self.task_queue.empty() or active_tasks: + if memory_monitor.done(): + exc = memory_monitor.exception() + if exc: + for t in active_tasks: + t.cancel() + raise exc + # If memory pressure is low, start new tasks if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: try: @@ -465,8 +492,14 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): active_tasks = [] completed_count = 0 total_urls = len(urls) - + while completed_count < total_urls: + if memory_monitor.done(): + exc = memory_monitor.exception() + if exc: + for t in active_tasks: + t.cancel() + raise exc # If memory pressure is low, start new tasks if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: try: diff --git a/deploy/docker/c4ai-doc-context.md b/deploy/docker/c4ai-doc-context.md index 1642f85e..77785cec 100644 --- a/deploy/docker/c4ai-doc-context.md +++ b/deploy/docker/c4ai-doc-context.md @@ -6705,7 +6705,7 @@ dispatcher = MemoryAdaptiveDispatcher( 3. **`max_session_permit`** (`int`, default: `10`)   The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. -4. **`memory_wait_timeout`** (`float`, default: `300.0`) +4. **`memory_wait_timeout`** (`float`, default: `600.0`)   Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. 5. **`rate_limiter`** (`RateLimiter`, default: `None`) diff --git a/docs/md_v2/advanced/multi-url-crawling.md b/docs/md_v2/advanced/multi-url-crawling.md index f6d944d6..40493c21 100644 --- a/docs/md_v2/advanced/multi-url-crawling.md +++ b/docs/md_v2/advanced/multi-url-crawling.md @@ -172,7 +172,7 @@ dispatcher = MemoryAdaptiveDispatcher( 3. **`max_session_permit`** (`int`, default: `10`)   The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. -4. **`memory_wait_timeout`** (`float`, default: `300.0`) +4. **`memory_wait_timeout`** (`float`, default: `600.0`)   Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. 5. **`rate_limiter`** (`RateLimiter`, default: `None`) diff --git a/docs/md_v2/assets/llmtxt/crawl4ai_all_reasoning_content.llm.txt b/docs/md_v2/assets/llmtxt/crawl4ai_all_reasoning_content.llm.txt index 850c1237..846b6914 100644 --- a/docs/md_v2/assets/llmtxt/crawl4ai_all_reasoning_content.llm.txt +++ b/docs/md_v2/assets/llmtxt/crawl4ai_all_reasoning_content.llm.txt @@ -6732,7 +6732,7 @@ dispatcher = MemoryAdaptiveDispatcher( 3. **`max_session_permit`** (`int`, default: `10`)   The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. -4. **`memory_wait_timeout`** (`float`, default: `300.0`) +4. **`memory_wait_timeout`** (`float`, default: `600.0`)   Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. 5. **`rate_limiter`** (`RateLimiter`, default: `None`)