diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index b97d59a7..06f03b0c 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, List, Tuple +from typing import Dict, Optional, List, Tuple, Union from .async_configs import CrawlerRunConfig from .models import ( CrawlResult, @@ -183,7 +183,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): config: CrawlerRunConfig, task_id: str, retry_count: int = 0, - ) -> CrawlerTaskResult: + ) -> Union[CrawlerTaskResult, List[CrawlerTaskResult]]: start_time = time.time() error_message = "" memory_usage = peak_memory = 0.0 @@ -244,8 +244,53 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): end_memory = process.memory_info().rss / (1024 * 1024) memory_usage = peak_memory = end_memory - start_memory - # Handle rate limiting - if self.rate_limiter and result.status_code: + # Check if we have a container with multiple results (deep crawl result) + if isinstance(result, list) or (hasattr(result, '_results') and len(result._results) > 1): + # Handle deep crawling results - create a list of task results + task_results = [] + result_list = result if isinstance(result, list) else result._results + + for idx, single_result in enumerate(result_list): + # Create individual task result for each crawled page + sub_task_id = f"{task_id}_{idx}" + single_memory = memory_usage / len(result_list) # Distribute memory usage + + # Only update rate limiter for first result which corresponds to the original URL + if idx == 0 and self.rate_limiter and hasattr(single_result, 'status_code') and single_result.status_code: + if not self.rate_limiter.update_delay(url, single_result.status_code): + error_msg = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" + if self.monitor: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + + task_result = CrawlerTaskResult( + task_id=sub_task_id, + url=single_result.url, + result=single_result, + memory_usage=single_memory, + peak_memory=single_memory, + start_time=start_time, + end_time=time.time(), + error_message=single_result.error_message if not single_result.success else "", + retry_count=retry_count + ) + task_results.append(task_result) + + # Update monitor with completion status based on the first/primary result + if self.monitor: + primary_result = result_list[0] + if not primary_result.success: + self.monitor.update_task(task_id, status=CrawlStatus.FAILED) + else: + self.monitor.update_task( + task_id, + status=CrawlStatus.COMPLETED, + extra_info=f"Deep crawl: {len(result_list)} pages" + ) + + return task_results + + # Handle single result (original behavior) + if self.rate_limiter and hasattr(result, 'status_code') and result.status_code: if not self.rate_limiter.update_delay(url, result.status_code): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: @@ -291,7 +336,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): error_message=error_message, retry_count=retry_count ) - + async def run_urls( self, urls: List[str], @@ -356,8 +401,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): # Process completed tasks for completed_task in done: - result = await completed_task - results.append(result) + task_result = await completed_task + + # Handle both single results and lists of results + if isinstance(task_result, list): + results.extend(task_result) + else: + results.append(task_result) # Update active tasks list active_tasks = list(pending) @@ -379,7 +429,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher): memory_monitor.cancel() if self.monitor: self.monitor.stop() - + async def _update_queue_priorities(self): """Periodically update priorities of items in the queue to prevent starvation""" # Skip if queue is empty diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index bbee502b..05c41312 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -673,18 +673,6 @@ class AsyncWebCrawler: urls: List[str], config: Optional[CrawlerRunConfig] = None, dispatcher: Optional[BaseDispatcher] = None, - # Legacy parameters maintained for backwards compatibility - # word_count_threshold=MIN_WORD_THRESHOLD, - # extraction_strategy: ExtractionStrategy = None, - # chunking_strategy: ChunkingStrategy = RegexChunking(), - # content_filter: RelevantContentFilter = None, - # cache_mode: Optional[CacheMode] = None, - # bypass_cache: bool = False, - # css_selector: str = None, - # screenshot: bool = False, - # pdf: bool = False, - # user_agent: str = None, - # verbose=True, **kwargs ) -> RunManyReturn: """ @@ -718,20 +706,7 @@ class AsyncWebCrawler: print(f"Processed {result.url}: {len(result.markdown)} chars") """ config = config or CrawlerRunConfig() - # if config is None: - # config = CrawlerRunConfig( - # word_count_threshold=word_count_threshold, - # extraction_strategy=extraction_strategy, - # chunking_strategy=chunking_strategy, - # content_filter=content_filter, - # cache_mode=cache_mode, - # bypass_cache=bypass_cache, - # css_selector=css_selector, - # screenshot=screenshot, - # pdf=pdf, - # verbose=verbose, - # **kwargs, - # ) + if dispatcher is None: dispatcher = MemoryAdaptiveDispatcher( diff --git a/crawl4ai/deep_crawling/base_strategy.py b/crawl4ai/deep_crawling/base_strategy.py index e1b3fe6b..7d86fc97 100644 --- a/crawl4ai/deep_crawling/base_strategy.py +++ b/crawl4ai/deep_crawling/base_strategy.py @@ -7,6 +7,7 @@ from contextvars import ContextVar from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn + class DeepCrawlDecorator: """Decorator that adds deep crawling capability to arun method.""" deep_crawl_active = ContextVar("deep_crawl_active", default=False) @@ -59,7 +60,8 @@ class DeepCrawlStrategy(ABC): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> List[CrawlResult]: + # ) -> List[CrawlResult]: + ) -> RunManyReturn: """ Batch (non-streaming) mode: Processes one BFS level at a time, then yields all the results. @@ -72,7 +74,8 @@ class DeepCrawlStrategy(ABC): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> AsyncGenerator[CrawlResult, None]: + # ) -> AsyncGenerator[CrawlResult, None]: + ) -> RunManyReturn: """ Streaming mode: Processes one BFS level at a time and yields results immediately as they arrive. diff --git a/crawl4ai/deep_crawling/bfs_strategy.py b/crawl4ai/deep_crawling/bfs_strategy.py index 54b72ea3..efe3e279 100644 --- a/crawl4ai/deep_crawling/bfs_strategy.py +++ b/crawl4ai/deep_crawling/bfs_strategy.py @@ -9,7 +9,7 @@ from ..models import TraversalStats from .filters import FilterChain from .scorers import URLScorer from . import DeepCrawlStrategy -from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult +from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl from math import inf as infinity @@ -143,7 +143,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> List[CrawlResult]: + # ) -> List[CrawlResult]: + ) -> RunManyReturn: """ Batch (non-streaming) mode: Processes one BFS level at a time, then yields all the results. @@ -191,7 +192,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> AsyncGenerator[CrawlResult, None]: + # ) -> AsyncGenerator[CrawlResult, None]: + ) -> RunManyReturn: """ Streaming mode: Processes one BFS level at a time and yields results immediately as they arrive. diff --git a/crawl4ai/deep_crawling/dfs_strategy.py b/crawl4ai/deep_crawling/dfs_strategy.py index f79f9628..f6318b8a 100644 --- a/crawl4ai/deep_crawling/dfs_strategy.py +++ b/crawl4ai/deep_crawling/dfs_strategy.py @@ -3,7 +3,7 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple from ..models import CrawlResult from .bfs_strategy import BFSDeepCrawlStrategy # noqa -from ..types import AsyncWebCrawler, CrawlerRunConfig +from ..types import AsyncWebCrawler, CrawlerRunConfig, RunManyReturn class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): """ @@ -17,7 +17,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> List[CrawlResult]: + # ) -> List[CrawlResult]: + ) -> RunManyReturn: """ Batch (non-streaming) DFS mode. Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list. @@ -65,7 +66,8 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): start_url: str, crawler: AsyncWebCrawler, config: CrawlerRunConfig, - ) -> AsyncGenerator[CrawlResult, None]: + # ) -> AsyncGenerator[CrawlResult, None]: + ) -> RunManyReturn: """ Streaming DFS mode. Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.