diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index 0f3fab3d..ce130d02 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -407,32 +407,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): t.cancel() raise exc - # If memory pressure is low, start new tasks - if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: - try: - # Try to get a task with timeout to avoid blocking indefinitely - priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( - self.task_queue.get(), timeout=0.1 - ) - - # Create and start the task - task = asyncio.create_task( - self.crawl_url(url, config, task_id, retry_count) - ) - active_tasks.append(task) - - # Update waiting time in monitor - if self.monitor: - wait_time = time.time() - enqueue_time - self.monitor.update_task( - task_id, - wait_time=wait_time, - status=CrawlStatus.IN_PROGRESS - ) + # If memory pressure is low, greedily fill all available slots + if not self.memory_pressure_mode: + slots = self.max_session_permit - len(active_tasks) + while slots > 0: + try: + # Use get_nowait() to immediately get tasks without blocking + priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait() - except asyncio.TimeoutError: - # No tasks in queue, that's fine - pass + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + slots -= 1 + + except asyncio.QueueEmpty: + # No more tasks in queue, exit the loop + break # Wait for completion even if queue is starved if active_tasks: @@ -559,32 +561,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): for t in active_tasks: t.cancel() raise exc - # If memory pressure is low, start new tasks - if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: - try: - # Try to get a task with timeout - priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( - self.task_queue.get(), timeout=0.1 - ) - - # Create and start the task - task = asyncio.create_task( - self.crawl_url(url, config, task_id, retry_count) - ) - active_tasks.append(task) - - # Update waiting time in monitor - if self.monitor: - wait_time = time.time() - enqueue_time - self.monitor.update_task( - task_id, - wait_time=wait_time, - status=CrawlStatus.IN_PROGRESS - ) + # If memory pressure is low, greedily fill all available slots + if not self.memory_pressure_mode: + slots = self.max_session_permit - len(active_tasks) + while slots > 0: + try: + # Use get_nowait() to immediately get tasks without blocking + priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait() - except asyncio.TimeoutError: - # No tasks in queue, that's fine - pass + # Create and start the task + task = asyncio.create_task( + self.crawl_url(url, config, task_id, retry_count) + ) + active_tasks.append(task) + + # Update waiting time in monitor + if self.monitor: + wait_time = time.time() - enqueue_time + self.monitor.update_task( + task_id, + wait_time=wait_time, + status=CrawlStatus.IN_PROGRESS + ) + + slots -= 1 + + except asyncio.QueueEmpty: + # No more tasks in queue, exit the loop + break # Process completed tasks and yield results if active_tasks: