From ce7fce4b1648761b90ad95cc699b2e13abe19be2 Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Thu, 17 Oct 2024 12:25:17 +0530 Subject: [PATCH] 1. Moved to asyncio.wait instead of gather so that results can be yeilded just as they are ready, rather than in batches 2. Moved the visted.add(url), to before the task is put in queue rather than after the crawl is completed. This makes sure that duplicate crawls doesn't happen when same URL is found at different depth and that get's queued too because the crawl is not yet completed and visted set is not updated. 3. Named the yield_results attribute to stream instead. Since that seems to be popularly used in all other AI libraries for intermediate results. --- crawl4ai/scraper/async_web_scraper.py | 6 +++--- crawl4ai/scraper/bfs_scraper_strategy.py | 26 ++++++++++++++---------- crawl4ai/scraper/scraper_strategy.py | 8 ++++---- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index 0d921af5..2fd919e1 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -4,12 +4,12 @@ from ..async_webcrawler import AsyncWebCrawler from typing import Union, AsyncGenerator class AsyncWebScraper: - def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5): + def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy): self.crawler = crawler self.strategy = strategy - async def ascrape(self, url: str, parallel_processing: bool = True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: - if yield_results: + async def ascrape(self, url: str, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + if stream: return self._ascrape_yielding(url, parallel_processing) else: return await self._ascrape_collecting(url, parallel_processing) diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 6fc39e73..b6cdaa80 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -71,7 +71,7 @@ class BFSScraperStrategy(ScraperStrategy): if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): logging.info(f"Skipping {url} as per robots.txt") return None - + # Crawl Politeness domain = urlparse(url).netloc time_since_last_crawl = time.time() - self.last_crawl_time[domain] @@ -97,8 +97,6 @@ class BFSScraperStrategy(ScraperStrategy): elif crawl_result.status_code == 503: await self.add_to_retry_queue(url) return crawl_result - - visited.add(url) # Process links for link_type in ["internal", "external"]: @@ -114,27 +112,33 @@ class BFSScraperStrategy(ScraperStrategy): depths[normalized_link] = new_depth return crawl_result - async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> CrawlResult: + async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]: queue = asyncio.PriorityQueue() queue.put_nowait((0, 0, start_url)) visited = set() depths = {start_url: 0} + pending_tasks = set() - while not queue.empty(): - tasks = [] - while not queue.empty() and len(tasks) < self.max_concurrent: + while not queue.empty() or pending_tasks: + while not queue.empty() and len(pending_tasks) < self.max_concurrent: _, depth, url = await queue.get() if url not in visited: + # Adding URL to the visited set here itself, (instead of after result generation) + # so that other tasks are not queued for same URL, found at different depth before + # crawling and extraction of this task is completed. + visited.add(url) if parallel_processing: task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths)) - tasks.append(task) + pending_tasks.add(task) else: result = await self.process_url(url, depth, crawler, queue, visited, depths) if result: yield result - if parallel_processing and tasks: - results = await asyncio.gather(*tasks) - for result in results: + # Wait for the first task to complete and yield results incrementally as each task is completed + if pending_tasks: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + result = await task if result: yield result \ No newline at end of file diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index e08a980d..e4872de7 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -6,21 +6,21 @@ from typing import Union, AsyncGenerator class ScraperStrategy(ABC): @abstractmethod - async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool=True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """Scrape the given URL using the specified crawler. Args: url (str): The starting URL for the scrape. crawler (AsyncWebCrawler): The web crawler instance. parallel_processing (bool): Whether to use parallel processing. Defaults to True. - yield_results (bool): If True, yields individual crawl results as they are ready; + stream (bool): If True, yields individual crawl results as they are ready; if False, accumulates results and returns a final ScraperResult. Yields: - CrawlResult: Individual crawl results if yield_results is True. + CrawlResult: Individual crawl results if stream is True. Returns: ScraperResult: A summary of the scrape results containing the final extracted data - and the list of crawled URLs if yield_results is False. + and the list of crawled URLs if stream is False. """ pass \ No newline at end of file