feat(crawler): add memory-adaptive dispatcher with rate limiting

Implements a new MemoryAdaptiveDispatcher class to manage concurrent crawling operations with memory monitoring and rate limiting capabilities. Changes include:

- Added RateLimitConfig dataclass for configuring rate limiting behavior
- Extended CrawlerRunConfig with dispatcher-related settings
- Refactored arun_many to use the new dispatcher system
- Added memory threshold and session permit controls
- Integrated optional progress monitoring display

BREAKING CHANGE: The arun_many method now uses MemoryAdaptiveDispatcher by default, which may affect concurrent crawling behavior
This commit is contained in:
UncleCode
2025-01-10 16:01:18 +08:00
parent 196dc79ec7
commit ac5f461d40
6 changed files with 675 additions and 70 deletions

View File

@@ -690,7 +690,7 @@ class AsyncWebCrawler:
**kwargs,
) -> List[CrawlResult]:
"""
Runs the crawler for multiple URLs concurrently.
Runs the crawler for multiple URLs concurrently using MemoryAdaptiveDispatcher.
Migration Guide:
Old way (deprecated):
@@ -700,35 +700,35 @@ class AsyncWebCrawler:
screenshot=True,
...
)
New way (recommended):
config = CrawlerRunConfig(
word_count_threshold=200,
screenshot=True,
enable_rate_limiting=True,
rate_limit_config=RateLimitConfig(...),
...
)
results = await crawler.arun_many(urls, crawler_config=config)
results = await crawler.arun_many(urls, config=config)
Args:
urls: List of URLs to crawl
crawler_config: Configuration object controlling crawl behavior for all URLs
config: Configuration object controlling crawl behavior for all URLs
[other parameters maintained for backwards compatibility]
Returns:
List[CrawlResult]: Results for each URL
"""
crawler_config = config
# Handle configuration
if crawler_config is not None:
if config is not None:
if any(param is not None for param in [
word_count_threshold, extraction_strategy, chunking_strategy,
content_filter, cache_mode, css_selector, screenshot, pdf
]):
self.logger.warning(
message="Both crawler_config and legacy parameters provided. crawler_config will take precedence.",
message="Both config and legacy parameters provided. config will take precedence.",
tag="WARNING"
)
config = crawler_config
else:
# Merge all parameters into a single kwargs dict for config creation
config_kwargs = {
@@ -758,67 +758,31 @@ class AsyncWebCrawler:
if config.cache_mode is None:
config.cache_mode = CacheMode.BYPASS
semaphore_count = config.semaphore_count or 5
semaphore = asyncio.Semaphore(semaphore_count)
from .dispatcher import MemoryAdaptiveDispatcher, CrawlerMonitor, DisplayMode
async def crawl_with_semaphore(url):
# Handle rate limiting per domain
domain = urlparse(url).netloc
current_time = time.time()
self.logger.debug(
message="Started task for {url:.50}...",
tag="PARALLEL",
params={"url": url}
# Create dispatcher with configuration from CrawlerRunConfig
dispatcher = MemoryAdaptiveDispatcher(
crawler=self,
memory_threshold_percent=config.memory_threshold_percent,
check_interval=config.check_interval,
max_session_permit=config.max_session_permit,
enable_rate_limiting=config.enable_rate_limiting,
rate_limit_config=vars(config.rate_limit_config) if config.rate_limit_config else None
)
# Create monitor if display mode is specified
monitor = None
if config.display_mode:
monitor = CrawlerMonitor(
max_visible_rows=15,
display_mode=DisplayMode(config.display_mode)
)
# Get delay settings from config
mean_delay = config.mean_delay
max_range = config.max_range
# Apply rate limiting
if domain in self._domain_last_hit:
time_since_last = current_time - self._domain_last_hit[domain]
if time_since_last < mean_delay:
delay = mean_delay + random.uniform(0, max_range)
await asyncio.sleep(delay)
self._domain_last_hit[domain] = current_time
async with semaphore:
return await self.arun(
url,
crawler_config=config, # Pass the entire config object
user_agent=user_agent # Maintain user_agent override capability
)
# Log start of concurrent crawling
self.logger.info(
message="Starting concurrent crawling for {count} URLs...",
tag="INIT",
params={"count": len(urls)}
)
# Execute concurrent crawls
start_time = time.perf_counter()
tasks = [crawl_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
# Log completion
self.logger.success(
message="Concurrent crawling completed for {count} URLs | Total time: {timing}",
tag="COMPLETE",
params={
"count": len(urls),
"timing": f"{end_time - start_time:.2f}s"
},
colors={
"timing": Fore.YELLOW
}
)
return [result if not isinstance(result, Exception) else str(result) for result in results]
# Run URLs through dispatcher
task_results = await dispatcher.run_urls(urls, config, monitor=monitor)
# Convert CrawlerTaskResult to CrawlResult
return [task_result.result for task_result in task_results]
async def aclear_cache(self):
"""Clear the cache database."""