This commit resolves issue #1055 where LLM extraction was blocking async

execution, causing URLs to be processed sequentially instead of in parallel.

  Changes:
  - Added aperform_completion_with_backoff() using litellm.acompletion for async LLM calls
  - Implemented arun() method in ExtractionStrategy base class with thread pool fallback
  - Created async arun() and aextract() methods in LLMExtractionStrategy using asyncio.gather
  - Updated AsyncWebCrawler.arun() to detect and use arun() when available
  - Added comprehensive test suite to verify parallel execution

  Impact:
  - LLM extraction now runs truly in parallel across multiple URLs
  - Significant performance improvement for multi-URL crawls with LLM strategies
  - Backward compatible - existing extraction strategies continue to work
  - No breaking changes to public API

  Technical details:
  - Uses litellm.acompletion for non-blocking LLM calls
  - Leverages asyncio.gather for concurrent chunk processing
  - Maintains backward compatibility via asyncio.to_thread fallback
  - Works seamlessly with MemoryAdaptiveDispatcher and other dispatchers
This commit is contained in:
ntohidi
2025-11-06 11:22:45 +01:00
parent 2c918155aa
commit a30548a98f
4 changed files with 492 additions and 1 deletions

View File

@@ -1825,6 +1825,82 @@ def perform_completion_with_backoff(
# ]
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
json_response=False,
base_url=None,
**kwargs,
):
"""
Async version: Perform an API completion request with exponential backoff.
How it works:
1. Sends an async completion request to the API.
2. Retries on rate-limit errors with exponential delays (async).
3. Returns the API response or an error after all retries.
Args:
provider (str): The name of the API provider.
prompt_with_variables (str): The input prompt for the completion request.
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
**kwargs: Additional arguments for the API request.
Returns:
dict: The API response or an error message after all retries.
"""
from litellm import acompletion
from litellm.exceptions import RateLimitError
import asyncio
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
if kwargs.get("extra_args"):
extra_args.update(kwargs["extra_args"])
for attempt in range(max_attempts):
try:
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
)
return response # Return the successful response
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
await asyncio.sleep(delay)
else:
# Return an error response after exhausting all retries
return [
{
"index": 0,
"tags": ["error"],
"content": ["Rate limit error. Please try again later."],
}
]
except Exception as e:
raise e # Raise any other exceptions immediately
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
"""
Extract content blocks from website HTML using an AI provider.