diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index 45a35306..b5c68a79 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -57,7 +57,6 @@ class AsyncWebScraper: async def ascrape( self, url: str, - parallel_processing: bool = True, stream: bool = False ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """ @@ -65,7 +64,6 @@ class AsyncWebScraper: Args: url: Starting URL for scraping - parallel_processing: Whether to process URLs in parallel stream: If True, yield results as they come; if False, collect all results Returns: @@ -75,17 +73,16 @@ class AsyncWebScraper: async with self._error_handling_context(url): if stream: - return self._ascrape_yielding(url, parallel_processing) - return await self._ascrape_collecting(url, parallel_processing) + return self._ascrape_yielding(url) + return await self._ascrape_collecting(url) async def _ascrape_yielding( self, url: str, - parallel_processing: bool ) -> AsyncGenerator[CrawlResult, None]: """Stream scraping results as they become available.""" try: - result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + result_generator = self.strategy.ascrape(url, self.crawler) async for res in result_generator: self._progress.processed_urls += 1 self._progress.current_url = res.url @@ -97,13 +94,12 @@ class AsyncWebScraper: async def _ascrape_collecting( self, url: str, - parallel_processing: bool ) -> ScraperResult: """Collect all scraping results before returning.""" extracted_data = {} try: - result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + result_generator = self.strategy.ascrape(url, self.crawler) async for res in result_generator: self._progress.processed_urls += 1 self._progress.current_url = res.url diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index eb7f4cd8..956afbda 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -6,15 +6,12 @@ import logging from urllib.parse import urlparse from urllib.robotparser import RobotFileParser import validators -import time -from aiolimiter import AsyncLimiter -from tenacity import retry, stop_after_attempt, wait_exponential -from collections import defaultdict +from crawl4ai.async_configs import CrawlerRunConfig from .models import CrawlResult from .filters import FilterChain from .scorers import URLScorer -from ..async_webcrawler import AsyncWebCrawler, CrawlerRunConfig +from ..async_webcrawler import AsyncWebCrawler from .scraper_strategy import ScraperStrategy @dataclass @@ -37,29 +34,18 @@ class BFSScraperStrategy(ScraperStrategy): filter_chain: FilterChain, url_scorer: URLScorer, process_external_links: bool = False, - max_concurrent: int = 5, - min_crawl_delay: int = 1, - timeout: int = 30, logger: Optional[logging.Logger] = None ): self.max_depth = max_depth self.filter_chain = filter_chain self.url_scorer = url_scorer - self.max_concurrent = max_concurrent - self.min_crawl_delay = min_crawl_delay - self.timeout = timeout self.logger = logger or logging.getLogger(__name__) # Crawl control self.stats = CrawlStats(start_time=datetime.now()) self._cancel_event = asyncio.Event() self.process_external_links = process_external_links - - # Rate limiting and politeness - self.rate_limiter = AsyncLimiter(1, 1) - self.last_crawl_time = defaultdict(float) self.robot_parsers: Dict[str, RobotFileParser] = {} - self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) async def can_process_url(self, url: str, depth: int) -> bool: """Check if URL can be processed based on robots.txt and filters @@ -107,74 +93,6 @@ class BFSScraperStrategy(ScraperStrategy): return None return self.robot_parsers[domain] - @retry(stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10)) - async def _crawl_with_retry( - self, - crawler: AsyncWebCrawler, - url: str - ) -> CrawlResult: - """Crawl URL with retry logic""" - try: - crawler_config = CrawlerRunConfig(cache_mode="BYPASS") - return await asyncio.wait_for(crawler.arun(url, config=crawler_config), timeout=self.timeout) - except asyncio.TimeoutError: - self.logger.error(f"Timeout crawling {url}") - raise - except Exception as e: - # Catch any other exceptions that may cause retries - self.logger.error(f"Error crawling {url}: {e}") - raise - - - async def process_url( - self, - url: str, - depth: int, - crawler: AsyncWebCrawler, - queue: asyncio.PriorityQueue, - visited: Set[str], - depths: Dict[str, int] - ) -> Optional[CrawlResult]: - """Process a single URL and extract links. - This is our main URL processing workhorse that: - - Checks for cancellation - - Validates URLs through can_process_url - - Implements politeness delays per domain - - Applies rate limiting - - Handles crawling with retries - - Updates various statistics - - Processes extracted links - - Returns the crawl result or None on failure - """ - - if self._cancel_event.is_set(): - return None - - if not await self.can_process_url(url, depth): - self.stats.urls_skipped += 1 - return None - - # Politeness delay - domain = urlparse(url).netloc - time_since_last = time.time() - self.last_crawl_time[domain] - if time_since_last < self.min_crawl_delay: - await asyncio.sleep(self.min_crawl_delay - time_since_last) - self.last_crawl_time[domain] = time.time() - - # Crawl with rate limiting - try: - async with self.rate_limiter: - result = await self._crawl_with_retry(crawler, url) - self.stats.urls_processed += 1 - # Process links - await self._process_links(result, url, depth, queue, visited, depths) - return result - except Exception as e: - self.logger.error(f"Error crawling {url}: {e}") - self.stats.urls_failed += 1 - return None - async def _process_links( self, result: CrawlResult, @@ -187,7 +105,7 @@ class BFSScraperStrategy(ScraperStrategy): """Process extracted links from crawl result. This is our link processor that: Handles both internal and external links - Normalizes URLs (removes fragments) + Checks if URL can be processed - validates URL, applies Filters and tests Robots.txt compliance with can_process_url Checks depth limits Scores URLs for priority Updates depth tracking @@ -199,6 +117,9 @@ class BFSScraperStrategy(ScraperStrategy): links_to_process += result.links["external"] for link in links_to_process: url = link['href'] + if not await self.can_process_url(url, depth): + self.stats.urls_skipped += 1 + continue if url not in visited: new_depth = depths[source_url] + 1 if new_depth <= self.max_depth: @@ -219,7 +140,6 @@ class BFSScraperStrategy(ScraperStrategy): self, start_url: str, crawler: AsyncWebCrawler, - parallel_processing: bool = True ) -> AsyncGenerator[CrawlResult, None]: """Implement BFS crawling strategy""" @@ -237,62 +157,38 @@ class BFSScraperStrategy(ScraperStrategy): await queue.put((0, 0, start_url)) visited: Set[str] = set() depths = {start_url: 0} - pending_tasks = set() try: - while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set(): + while not queue.empty() and not self._cancel_event.is_set(): """ This sets up our main control loop which: - Continues while there are URLs to process (not queue.empty()) - Or while there are tasks still running (pending_tasks) - Can be interrupted via cancellation (not self._cancel_event.is_set()) """ - # Start new tasks up to max_concurrent - while not queue.empty() and len(pending_tasks) < self.max_concurrent: - """ - This section manages task creation: - Checks if we can start more tasks (under max_concurrent limit) - Gets the next URL from the priority queue - Marks URLs as visited immediately to prevent duplicates - Updates current depth in stats - Either: - Creates a new async task (parallel mode) - Processes URL directly (sequential mode) - """ - _, depth, url = await queue.get() + n = 3 + jobs = [] + for _ in range(n): + if self.queue.empty(): + break + jobs.append(await self.queue.get()) + + # Filter jobs directly, ensuring uniqueness and checking against visited + filtered_jobs = [] + for job in jobs: + _, depth, url = job + self.stats.current_depth = depth if url not in visited: visited.add(url) - self.stats.current_depth = depth - - if parallel_processing: - task = asyncio.create_task( - self.process_url(url, depth, crawler, queue, visited, depths) - ) - pending_tasks.add(task) - else: - result = await self.process_url( - url, depth, crawler, queue, visited, depths - ) - if result: - yield result - - # Process completed tasks - """ - This section manages completed tasks: - Waits for any task to complete using asyncio.wait - Uses FIRST_COMPLETED to handle results as soon as they're ready - Yields successful results to the caller - Updates pending_tasks to remove completed ones - """ - if pending_tasks: - done, pending_tasks = await asyncio.wait( - pending_tasks, - return_when=asyncio.FIRST_COMPLETED - ) - for task in done: - result = await task - if result: - yield result + filtered_jobs.append(job) + + crawler_config = CrawlerRunConfig(cache_mode="BYPASS") + async for result in await crawler.arun_many(urls=[url for _, _, url in filtered_jobs], + config=crawler_config.clone(stream=True)): + print(f"Received result for: {result.url} - Success: {result.success}") + source_url, depth = next((url, depth) for _, depth, url in filtered_jobs if url == result.source_url) + await self._process_links(result, source_url, depth, queue, visited, depths) + yield result except Exception as e: self.logger.error(f"Error in crawl process: {e}") @@ -300,13 +196,12 @@ class BFSScraperStrategy(ScraperStrategy): finally: # Clean up any remaining tasks - for task in pending_tasks: - task.cancel() + # for task in pending_tasks: + # task.cancel() self.stats.end_time = datetime.now() async def shutdown(self): """Clean up resources and stop crawling""" self._cancel_event.set() # Clear caches and close connections - self.robot_parsers.clear() - self.domain_queues.clear() \ No newline at end of file + self.robot_parsers.clear() \ No newline at end of file