Set memory_wait_timeout default to 10 minutes (#1193)

This commit is contained in:
UncleCode
2025-06-08 07:53:09 +02:00
parent ef6f4329fa
commit c73a130c50
4 changed files with 7753 additions and 12 deletions

View File

@@ -126,6 +126,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
check_interval: float = 1.0, check_interval: float = 1.0,
max_session_permit: int = 20, max_session_permit: int = 20,
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
memory_wait_timeout: Optional[float] = 600.0,
rate_limiter: Optional[RateLimiter] = None, rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None, monitor: Optional[CrawlerMonitor] = None,
): ):
@@ -136,10 +137,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.check_interval = check_interval self.check_interval = check_interval
self.max_session_permit = max_session_permit self.max_session_permit = max_session_permit
self.fairness_timeout = fairness_timeout self.fairness_timeout = fairness_timeout
self.memory_wait_timeout = memory_wait_timeout
self.result_queue = asyncio.Queue() self.result_queue = asyncio.Queue()
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
self.current_memory_percent = 0.0 # Track current memory usage self.current_memory_percent = 0.0 # Track current memory usage
self._high_memory_start_time: Optional[float] = None
async def _memory_monitor_task(self): async def _memory_monitor_task(self):
"""Background task to continuously monitor memory usage and update state""" """Background task to continuously monitor memory usage and update state"""
@@ -147,16 +150,33 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.current_memory_percent = psutil.virtual_memory().percent self.current_memory_percent = psutil.virtual_memory().percent
# Enter memory pressure mode if we cross the threshold # Enter memory pressure mode if we cross the threshold
if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent: if self.current_memory_percent >= self.memory_threshold_percent:
if not self.memory_pressure_mode:
self.memory_pressure_mode = True self.memory_pressure_mode = True
self._high_memory_start_time = time.time()
if self.monitor: if self.monitor:
self.monitor.update_memory_status("PRESSURE") self.monitor.update_memory_status("PRESSURE")
else:
if self._high_memory_start_time is None:
self._high_memory_start_time = time.time()
if (
self.memory_wait_timeout is not None
and self._high_memory_start_time is not None
and time.time() - self._high_memory_start_time >= self.memory_wait_timeout
):
raise MemoryError(
"Memory usage exceeded threshold for"
f" {self.memory_wait_timeout} seconds"
)
# Exit memory pressure mode if we go below recovery threshold # Exit memory pressure mode if we go below recovery threshold
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent: elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
self.memory_pressure_mode = False self.memory_pressure_mode = False
self._high_memory_start_time = None
if self.monitor: if self.monitor:
self.monitor.update_memory_status("NORMAL") self.monitor.update_memory_status("NORMAL")
elif self.current_memory_percent < self.memory_threshold_percent:
self._high_memory_start_time = None
# In critical mode, we might need to take more drastic action # In critical mode, we might need to take more drastic action
if self.current_memory_percent >= self.critical_threshold_percent: if self.current_memory_percent >= self.critical_threshold_percent:
@@ -321,6 +341,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
# Process until both queues are empty # Process until both queues are empty
while not self.task_queue.empty() or active_tasks: while not self.task_queue.empty() or active_tasks:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try: try:
@@ -467,6 +494,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
total_urls = len(urls) total_urls = len(urls)
while completed_count < total_urls: while completed_count < total_urls:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try: try:

View File

@@ -6705,7 +6705,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`) 3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `300.0`) 4.**`memory_wait_timeout`** (`float`, default: `600.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`) 5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -172,7 +172,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`) 3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `300.0`) 4.**`memory_wait_timeout`** (`float`, default: `600.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`) 5.**`rate_limiter`** (`RateLimiter`, default: `None`)

File diff suppressed because it is too large Load Diff