diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index 0d921af5..2fd919e1 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -4,12 +4,12 @@ from ..async_webcrawler import AsyncWebCrawler from typing import Union, AsyncGenerator class AsyncWebScraper: - def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5): + def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy): self.crawler = crawler self.strategy = strategy - async def ascrape(self, url: str, parallel_processing: bool = True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: - if yield_results: + async def ascrape(self, url: str, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + if stream: return self._ascrape_yielding(url, parallel_processing) else: return await self._ascrape_collecting(url, parallel_processing) diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 6fc39e73..b6cdaa80 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -71,7 +71,7 @@ class BFSScraperStrategy(ScraperStrategy): if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): logging.info(f"Skipping {url} as per robots.txt") return None - + # Crawl Politeness domain = urlparse(url).netloc time_since_last_crawl = time.time() - self.last_crawl_time[domain] @@ -97,8 +97,6 @@ class BFSScraperStrategy(ScraperStrategy): elif crawl_result.status_code == 503: await self.add_to_retry_queue(url) return crawl_result - - visited.add(url) # Process links for link_type in ["internal", "external"]: @@ -114,27 +112,33 @@ class BFSScraperStrategy(ScraperStrategy): depths[normalized_link] = new_depth return crawl_result - async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> CrawlResult: + async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]: queue = asyncio.PriorityQueue() queue.put_nowait((0, 0, start_url)) visited = set() depths = {start_url: 0} + pending_tasks = set() - while not queue.empty(): - tasks = [] - while not queue.empty() and len(tasks) < self.max_concurrent: + while not queue.empty() or pending_tasks: + while not queue.empty() and len(pending_tasks) < self.max_concurrent: _, depth, url = await queue.get() if url not in visited: + # Adding URL to the visited set here itself, (instead of after result generation) + # so that other tasks are not queued for same URL, found at different depth before + # crawling and extraction of this task is completed. + visited.add(url) if parallel_processing: task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths)) - tasks.append(task) + pending_tasks.add(task) else: result = await self.process_url(url, depth, crawler, queue, visited, depths) if result: yield result - if parallel_processing and tasks: - results = await asyncio.gather(*tasks) - for result in results: + # Wait for the first task to complete and yield results incrementally as each task is completed + if pending_tasks: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + result = await task if result: yield result \ No newline at end of file diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index e08a980d..e4872de7 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -6,21 +6,21 @@ from typing import Union, AsyncGenerator class ScraperStrategy(ABC): @abstractmethod - async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool=True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """Scrape the given URL using the specified crawler. Args: url (str): The starting URL for the scrape. crawler (AsyncWebCrawler): The web crawler instance. parallel_processing (bool): Whether to use parallel processing. Defaults to True. - yield_results (bool): If True, yields individual crawl results as they are ready; + stream (bool): If True, yields individual crawl results as they are ready; if False, accumulates results and returns a final ScraperResult. Yields: - CrawlResult: Individual crawl results if yield_results is True. + CrawlResult: Individual crawl results if stream is True. Returns: ScraperResult: A summary of the scrape results containing the final extracted data - and the list of crawled URLs if yield_results is False. + and the list of crawled URLs if stream is False. """ pass \ No newline at end of file