fix(dispatcher): enable true concurrency for fast-completing tasks in arun_many. REF: #560

The MemoryAdaptiveDispatcher was processing tasks sequentially despite
  max_session_permit > 1 due to fetching only one task per event loop iteration.
  This particularly affected raw:// URLs which complete in microseconds.

  Changes:
  - Replace single task fetch with greedy slot filling using get_nowait()
  - Fill all available slots (up to max_session_permit) immediately
  - Break on empty queue instead of waiting with timeout

  This ensures proper parallelization for all task types, especially
  ultra-fast operations like raw HTML processing.
This commit is contained in:
ntohidi
2025-08-12 16:51:22 +08:00
parent 955110a8b0
commit dfcfd8ae57

View File

@@ -407,32 +407,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
t.cancel() t.cancel()
raise exc raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode:
try: slots = self.max_session_permit - len(active_tasks)
# Try to get a task with timeout to avoid blocking indefinitely while slots > 0:
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( try:
self.task_queue.get(), timeout=0.1 # Use get_nowait() to immediately get tasks without blocking
) priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
# Create and start the task # Create and start the task
task = asyncio.create_task( task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count) self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
) )
active_tasks.append(task)
except asyncio.TimeoutError: # Update waiting time in monitor
# No tasks in queue, that's fine if self.monitor:
pass wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Wait for completion even if queue is starved # Wait for completion even if queue is starved
if active_tasks: if active_tasks:
@@ -559,32 +561,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
for t in active_tasks: for t in active_tasks:
t.cancel() t.cancel()
raise exc raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode:
try: slots = self.max_session_permit - len(active_tasks)
# Try to get a task with timeout while slots > 0:
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for( try:
self.task_queue.get(), timeout=0.1 # Use get_nowait() to immediately get tasks without blocking
) priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
# Create and start the task # Create and start the task
task = asyncio.create_task( task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count) self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
) )
active_tasks.append(task)
except asyncio.TimeoutError: # Update waiting time in monitor
# No tasks in queue, that's fine if self.monitor:
pass wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Process completed tasks and yield results # Process completed tasks and yield results
if active_tasks: if active_tasks: