From 2943feeecf44806a000f5c01502798e52278bce7 Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Wed, 16 Oct 2024 22:05:29 +0530 Subject: [PATCH] 1. Added a flag to yield each crawl result,as they become ready along with the final scraper result as another option 2. Removed ascrape_many method, as I'm currently not focusing on it in the first cut of scraper 3. Added some error handling for cases where robots.txt cannot be fetched or parsed. --- crawl4ai/scraper/async_web_scraper.py | 48 ++++++++++++------------ crawl4ai/scraper/bfs_scraper_strategy.py | 33 +++++++++------- crawl4ai/scraper/scraper_strategy.py | 21 ++++++++++- 3 files changed, 62 insertions(+), 40 deletions(-) diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index fadfa61f..811aeacc 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -1,35 +1,35 @@ import asyncio from typing import List, Dict from .scraper_strategy import ScraperStrategy -from .bfs_scraper_strategy import BFSScraperStrategy -from .models import ScraperResult +from .models import ScraperResult, CrawlResult from ..async_webcrawler import AsyncWebCrawler - -class BatchProcessor: - def __init__(self, batch_size: int, concurrency_limit: int): - self.batch_size = batch_size - self.concurrency_limit = concurrency_limit - - async def process_batch(self, scraper: 'AsyncWebScraper', urls: List[str]) -> List[ScraperResult]: - semaphore = asyncio.Semaphore(self.concurrency_limit) - async def scrape_with_semaphore(url): - async with semaphore: - return await scraper.ascrape(url) - return await asyncio.gather(*[scrape_with_semaphore(url) for url in urls]) +from typing import Union, AsyncGenerator class AsyncWebScraper: def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5): self.crawler = crawler self.strategy = strategy - self.batch_processor = BatchProcessor(batch_size, concurrency_limit) - async def ascrape(self, url: str, parallel_processing: bool = True) -> ScraperResult: - return await self.strategy.ascrape(url, self.crawler, parallel_processing) + async def ascrape(self, url: str, parallel_processing: bool = True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + if yield_results: + return self._ascrape_yielding(url, parallel_processing) + else: + return await self._ascrape_collecting(url, parallel_processing) - async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]: - all_results = [] - for i in range(0, len(urls), self.batch_processor.batch_size): - batch = urls[i:i+self.batch_processor.batch_size] - batch_results = await self.batch_processor.process_batch(self, batch) - all_results.extend(batch_results) - return all_results \ No newline at end of file + async def _ascrape_yielding(self, url: str, parallel_processing: bool) -> AsyncGenerator[CrawlResult, None]: + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: # Consume the async generator + yield res # Yielding individual results + + async def _ascrape_collecting(self, url: str, parallel_processing: bool) -> ScraperResult: + extracted_data = {} + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: # Consume the async generator + extracted_data[res.url] = res + + # Return a final ScraperResult + return ScraperResult( + url=url, + crawled_urls=list(extracted_data.keys()), + extracted_data=extracted_data + ) \ No newline at end of file diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 9022cd90..1146714d 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -1,7 +1,6 @@ from .scraper_strategy import ScraperStrategy from .filters import FilterChain from .scorers import URLScorer -from .models import ScraperResult from ..models import CrawlResult from ..async_webcrawler import AsyncWebCrawler import asyncio @@ -13,7 +12,7 @@ from aiolimiter import AsyncLimiter from tenacity import retry, stop_after_attempt, wait_exponential from collections import defaultdict import logging -from typing import Dict +from typing import Dict, AsyncGenerator logging.basicConfig(level=logging.DEBUG) rate_limiter = AsyncLimiter(1, 1) # 1 request per second @@ -38,7 +37,12 @@ class BFSScraperStrategy(ScraperStrategy): if netloc not in self.robot_parsers: rp = RobotFileParser() rp.set_url(f"{scheme}://{netloc}/robots.txt") - rp.read() + try: + rp.read() + except Exception as e: + # Log the type of error, message, and the URL + logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}") + return None self.robot_parsers[netloc] = rp return self.robot_parsers[netloc] @@ -48,7 +52,7 @@ class BFSScraperStrategy(ScraperStrategy): async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult: return await crawler.arun(url) - async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> CrawlResult: + async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]: def normalize_url(url: str) -> str: parsed = urlparse(url) return urlunparse(parsed._replace(fragment="")) @@ -59,9 +63,14 @@ class BFSScraperStrategy(ScraperStrategy): return None # Robots.txt Compliance - if not self.get_robot_parser(url).can_fetch(crawler.crawler_strategy.user_agent, url): - logging.info(f"Skipping {url} as per robots.txt") - return None + robot_parser = self.get_robot_parser(url) + if robot_parser is None: + logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.") + else: + # If robots.txt was fetched, check if crawling is allowed + if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): + logging.info(f"Skipping {url} as per robots.txt") + return None # Crawl Politeness domain = urlparse(url).netloc @@ -103,14 +112,12 @@ class BFSScraperStrategy(ScraperStrategy): score = self.url_scorer.score(normalized_link) await queue.put((score, new_depth, normalized_link)) depths[normalized_link] = new_depth - return crawl_result - async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> ScraperResult: + async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> CrawlResult: queue = asyncio.PriorityQueue() queue.put_nowait((0, 0, start_url)) visited = set() - extracted_data = {} depths = {start_url: 0} while not queue.empty(): @@ -124,12 +131,10 @@ class BFSScraperStrategy(ScraperStrategy): else: result = await self.process_url(url, depth, crawler, queue, visited, depths) if result: - extracted_data[result.url] = result + yield result if parallel_processing and tasks: results = await asyncio.gather(*tasks) for result in results: if result: - extracted_data[result.url] = result - - return ScraperResult(url=start_url, crawled_urls=list(visited), extracted_data=extracted_data) \ No newline at end of file + yield result \ No newline at end of file diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index 6d1cdc74..e08a980d 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -1,9 +1,26 @@ from abc import ABC, abstractmethod -from .models import ScraperResult +from .models import ScraperResult, CrawlResult from ..models import CrawlResult from ..async_webcrawler import AsyncWebCrawler +from typing import Union, AsyncGenerator class ScraperStrategy(ABC): @abstractmethod - async def ascrape(self, url: str, crawler: AsyncWebCrawler) -> ScraperResult: + async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool=True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + """Scrape the given URL using the specified crawler. + + Args: + url (str): The starting URL for the scrape. + crawler (AsyncWebCrawler): The web crawler instance. + parallel_processing (bool): Whether to use parallel processing. Defaults to True. + yield_results (bool): If True, yields individual crawl results as they are ready; + if False, accumulates results and returns a final ScraperResult. + + Yields: + CrawlResult: Individual crawl results if yield_results is True. + + Returns: + ScraperResult: A summary of the scrape results containing the final extracted data + and the list of crawled URLs if yield_results is False. + """ pass \ No newline at end of file