Set memory_wait_timeout default to 10 minutes
This commit is contained in:
@@ -126,6 +126,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
check_interval: float = 1.0,
|
||||
max_session_permit: int = 20,
|
||||
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
|
||||
memory_wait_timeout: Optional[float] = 600.0,
|
||||
rate_limiter: Optional[RateLimiter] = None,
|
||||
monitor: Optional[CrawlerMonitor] = None,
|
||||
):
|
||||
@@ -136,10 +137,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
self.check_interval = check_interval
|
||||
self.max_session_permit = max_session_permit
|
||||
self.fairness_timeout = fairness_timeout
|
||||
self.memory_wait_timeout = memory_wait_timeout
|
||||
self.result_queue = asyncio.Queue()
|
||||
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
|
||||
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
|
||||
self.current_memory_percent = 0.0 # Track current memory usage
|
||||
self._high_memory_start_time: Optional[float] = None
|
||||
|
||||
async def _memory_monitor_task(self):
|
||||
"""Background task to continuously monitor memory usage and update state"""
|
||||
@@ -147,16 +150,33 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
self.current_memory_percent = psutil.virtual_memory().percent
|
||||
|
||||
# Enter memory pressure mode if we cross the threshold
|
||||
if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent:
|
||||
self.memory_pressure_mode = True
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("PRESSURE")
|
||||
if self.current_memory_percent >= self.memory_threshold_percent:
|
||||
if not self.memory_pressure_mode:
|
||||
self.memory_pressure_mode = True
|
||||
self._high_memory_start_time = time.time()
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("PRESSURE")
|
||||
else:
|
||||
if self._high_memory_start_time is None:
|
||||
self._high_memory_start_time = time.time()
|
||||
if (
|
||||
self.memory_wait_timeout is not None
|
||||
and self._high_memory_start_time is not None
|
||||
and time.time() - self._high_memory_start_time >= self.memory_wait_timeout
|
||||
):
|
||||
raise MemoryError(
|
||||
"Memory usage exceeded threshold for"
|
||||
f" {self.memory_wait_timeout} seconds"
|
||||
)
|
||||
|
||||
# Exit memory pressure mode if we go below recovery threshold
|
||||
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
|
||||
self.memory_pressure_mode = False
|
||||
self._high_memory_start_time = None
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("NORMAL")
|
||||
elif self.current_memory_percent < self.memory_threshold_percent:
|
||||
self._high_memory_start_time = None
|
||||
|
||||
# In critical mode, we might need to take more drastic action
|
||||
if self.current_memory_percent >= self.critical_threshold_percent:
|
||||
@@ -321,6 +341,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
|
||||
# Process until both queues are empty
|
||||
while not self.task_queue.empty() or active_tasks:
|
||||
if memory_monitor.done():
|
||||
exc = memory_monitor.exception()
|
||||
if exc:
|
||||
for t in active_tasks:
|
||||
t.cancel()
|
||||
raise exc
|
||||
|
||||
# If memory pressure is low, start new tasks
|
||||
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
|
||||
try:
|
||||
@@ -467,6 +494,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
total_urls = len(urls)
|
||||
|
||||
while completed_count < total_urls:
|
||||
if memory_monitor.done():
|
||||
exc = memory_monitor.exception()
|
||||
if exc:
|
||||
for t in active_tasks:
|
||||
t.cancel()
|
||||
raise exc
|
||||
# If memory pressure is low, start new tasks
|
||||
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
|
||||
try:
|
||||
|
||||
@@ -6705,7 +6705,7 @@ dispatcher = MemoryAdaptiveDispatcher(
|
||||
3. **`max_session_permit`** (`int`, default: `10`)
|
||||
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
|
||||
|
||||
4. **`memory_wait_timeout`** (`float`, default: `300.0`)
|
||||
4. **`memory_wait_timeout`** (`float`, default: `600.0`)
|
||||
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
|
||||
|
||||
5. **`rate_limiter`** (`RateLimiter`, default: `None`)
|
||||
|
||||
@@ -172,7 +172,7 @@ dispatcher = MemoryAdaptiveDispatcher(
|
||||
3. **`max_session_permit`** (`int`, default: `10`)
|
||||
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
|
||||
|
||||
4. **`memory_wait_timeout`** (`float`, default: `300.0`)
|
||||
4. **`memory_wait_timeout`** (`float`, default: `600.0`)
|
||||
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
|
||||
|
||||
5. **`rate_limiter`** (`RateLimiter`, default: `None`)
|
||||
|
||||
@@ -6732,7 +6732,7 @@ dispatcher = MemoryAdaptiveDispatcher(
|
||||
3. **`max_session_permit`** (`int`, default: `10`)
|
||||
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
|
||||
|
||||
4. **`memory_wait_timeout`** (`float`, default: `300.0`)
|
||||
4. **`memory_wait_timeout`** (`float`, default: `600.0`)
|
||||
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
|
||||
|
||||
5. **`rate_limiter`** (`RateLimiter`, default: `None`)
|
||||
|
||||
Reference in New Issue
Block a user