1. Moved to asyncio.wait instead of gather so that results can be yeilded just as they are ready, rather than in batches

2. Moved the visted.add(url), to before the task is put in queue rather than after the crawl is completed. This makes sure that  duplicate crawls doesn't happen when same URL is found at different depth and that get's queued too because the crawl is not yet completed and visted set is not updated.
3. Named the yield_results attribute to stream instead. Since that seems to be popularly used in all other AI libraries for intermediate results.
This commit is contained in:
Aravind Karnam
2024-10-17 12:25:17 +05:30
parent de28b59aca
commit ce7fce4b16
3 changed files with 22 additions and 18 deletions

View File

@@ -4,12 +4,12 @@ from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator from typing import Union, AsyncGenerator
class AsyncWebScraper: class AsyncWebScraper:
def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5): def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy):
self.crawler = crawler self.crawler = crawler
self.strategy = strategy self.strategy = strategy
async def ascrape(self, url: str, parallel_processing: bool = True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: async def ascrape(self, url: str, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
if yield_results: if stream:
return self._ascrape_yielding(url, parallel_processing) return self._ascrape_yielding(url, parallel_processing)
else: else:
return await self._ascrape_collecting(url, parallel_processing) return await self._ascrape_collecting(url, parallel_processing)

View File

@@ -71,7 +71,7 @@ class BFSScraperStrategy(ScraperStrategy):
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt") logging.info(f"Skipping {url} as per robots.txt")
return None return None
# Crawl Politeness # Crawl Politeness
domain = urlparse(url).netloc domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain] time_since_last_crawl = time.time() - self.last_crawl_time[domain]
@@ -97,8 +97,6 @@ class BFSScraperStrategy(ScraperStrategy):
elif crawl_result.status_code == 503: elif crawl_result.status_code == 503:
await self.add_to_retry_queue(url) await self.add_to_retry_queue(url)
return crawl_result return crawl_result
visited.add(url)
# Process links # Process links
for link_type in ["internal", "external"]: for link_type in ["internal", "external"]:
@@ -114,27 +112,33 @@ class BFSScraperStrategy(ScraperStrategy):
depths[normalized_link] = new_depth depths[normalized_link] = new_depth
return crawl_result return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> CrawlResult: async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]:
queue = asyncio.PriorityQueue() queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url)) queue.put_nowait((0, 0, start_url))
visited = set() visited = set()
depths = {start_url: 0} depths = {start_url: 0}
pending_tasks = set()
while not queue.empty(): while not queue.empty() or pending_tasks:
tasks = [] while not queue.empty() and len(pending_tasks) < self.max_concurrent:
while not queue.empty() and len(tasks) < self.max_concurrent:
_, depth, url = await queue.get() _, depth, url = await queue.get()
if url not in visited: if url not in visited:
# Adding URL to the visited set here itself, (instead of after result generation)
# so that other tasks are not queued for same URL, found at different depth before
# crawling and extraction of this task is completed.
visited.add(url)
if parallel_processing: if parallel_processing:
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths)) task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
tasks.append(task) pending_tasks.add(task)
else: else:
result = await self.process_url(url, depth, crawler, queue, visited, depths) result = await self.process_url(url, depth, crawler, queue, visited, depths)
if result: if result:
yield result yield result
if parallel_processing and tasks: # Wait for the first task to complete and yield results incrementally as each task is completed
results = await asyncio.gather(*tasks) if pending_tasks:
for result in results: done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result: if result:
yield result yield result

View File

@@ -6,21 +6,21 @@ from typing import Union, AsyncGenerator
class ScraperStrategy(ABC): class ScraperStrategy(ABC):
@abstractmethod @abstractmethod
async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool=True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler. """Scrape the given URL using the specified crawler.
Args: Args:
url (str): The starting URL for the scrape. url (str): The starting URL for the scrape.
crawler (AsyncWebCrawler): The web crawler instance. crawler (AsyncWebCrawler): The web crawler instance.
parallel_processing (bool): Whether to use parallel processing. Defaults to True. parallel_processing (bool): Whether to use parallel processing. Defaults to True.
yield_results (bool): If True, yields individual crawl results as they are ready; stream (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult. if False, accumulates results and returns a final ScraperResult.
Yields: Yields:
CrawlResult: Individual crawl results if yield_results is True. CrawlResult: Individual crawl results if stream is True.
Returns: Returns:
ScraperResult: A summary of the scrape results containing the final extracted data ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if yield_results is False. and the list of crawled URLs if stream is False.
""" """
pass pass