1. Added a flag to yield each crawl result,as they become ready along with the final scraper result as another option

2. Removed ascrape_many method, as I'm currently not focusing on it in the first cut of scraper
3. Added some error handling for cases where robots.txt cannot be fetched or parsed.
This commit is contained in:
Aravind Karnam
2024-10-16 22:05:29 +05:30
parent 8a7d29ce85
commit 2943feeecf
3 changed files with 62 additions and 40 deletions

View File

@@ -1,35 +1,35 @@
import asyncio import asyncio
from typing import List, Dict from typing import List, Dict
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
from .bfs_scraper_strategy import BFSScraperStrategy from .models import ScraperResult, CrawlResult
from .models import ScraperResult
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator
class BatchProcessor:
def __init__(self, batch_size: int, concurrency_limit: int):
self.batch_size = batch_size
self.concurrency_limit = concurrency_limit
async def process_batch(self, scraper: 'AsyncWebScraper', urls: List[str]) -> List[ScraperResult]:
semaphore = asyncio.Semaphore(self.concurrency_limit)
async def scrape_with_semaphore(url):
async with semaphore:
return await scraper.ascrape(url)
return await asyncio.gather(*[scrape_with_semaphore(url) for url in urls])
class AsyncWebScraper: class AsyncWebScraper:
def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5): def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5):
self.crawler = crawler self.crawler = crawler
self.strategy = strategy self.strategy = strategy
self.batch_processor = BatchProcessor(batch_size, concurrency_limit)
async def ascrape(self, url: str, parallel_processing: bool = True) -> ScraperResult: async def ascrape(self, url: str, parallel_processing: bool = True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
return await self.strategy.ascrape(url, self.crawler, parallel_processing) if yield_results:
return self._ascrape_yielding(url, parallel_processing)
else:
return await self._ascrape_collecting(url, parallel_processing)
async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]: async def _ascrape_yielding(self, url: str, parallel_processing: bool) -> AsyncGenerator[CrawlResult, None]:
all_results = [] result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
for i in range(0, len(urls), self.batch_processor.batch_size): async for res in result_generator: # Consume the async generator
batch = urls[i:i+self.batch_processor.batch_size] yield res # Yielding individual results
batch_results = await self.batch_processor.process_batch(self, batch)
all_results.extend(batch_results) async def _ascrape_collecting(self, url: str, parallel_processing: bool) -> ScraperResult:
return all_results extracted_data = {}
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
async for res in result_generator: # Consume the async generator
extracted_data[res.url] = res
# Return a final ScraperResult
return ScraperResult(
url=url,
crawled_urls=list(extracted_data.keys()),
extracted_data=extracted_data
)

View File

@@ -1,7 +1,6 @@
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
from .filters import FilterChain from .filters import FilterChain
from .scorers import URLScorer from .scorers import URLScorer
from .models import ScraperResult
from ..models import CrawlResult from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
import asyncio import asyncio
@@ -13,7 +12,7 @@ from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict from collections import defaultdict
import logging import logging
from typing import Dict from typing import Dict, AsyncGenerator
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second rate_limiter = AsyncLimiter(1, 1) # 1 request per second
@@ -38,7 +37,12 @@ class BFSScraperStrategy(ScraperStrategy):
if netloc not in self.robot_parsers: if netloc not in self.robot_parsers:
rp = RobotFileParser() rp = RobotFileParser()
rp.set_url(f"{scheme}://{netloc}/robots.txt") rp.set_url(f"{scheme}://{netloc}/robots.txt")
rp.read() try:
rp.read()
except Exception as e:
# Log the type of error, message, and the URL
logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}")
return None
self.robot_parsers[netloc] = rp self.robot_parsers[netloc] = rp
return self.robot_parsers[netloc] return self.robot_parsers[netloc]
@@ -48,7 +52,7 @@ class BFSScraperStrategy(ScraperStrategy):
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult: async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url) return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> CrawlResult: async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]:
def normalize_url(url: str) -> str: def normalize_url(url: str) -> str:
parsed = urlparse(url) parsed = urlparse(url)
return urlunparse(parsed._replace(fragment="")) return urlunparse(parsed._replace(fragment=""))
@@ -59,9 +63,14 @@ class BFSScraperStrategy(ScraperStrategy):
return None return None
# Robots.txt Compliance # Robots.txt Compliance
if not self.get_robot_parser(url).can_fetch(crawler.crawler_strategy.user_agent, url): robot_parser = self.get_robot_parser(url)
logging.info(f"Skipping {url} as per robots.txt") if robot_parser is None:
return None logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.")
else:
# If robots.txt was fetched, check if crawling is allowed
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness # Crawl Politeness
domain = urlparse(url).netloc domain = urlparse(url).netloc
@@ -103,14 +112,12 @@ class BFSScraperStrategy(ScraperStrategy):
score = self.url_scorer.score(normalized_link) score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link)) await queue.put((score, new_depth, normalized_link))
depths[normalized_link] = new_depth depths[normalized_link] = new_depth
return crawl_result return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> ScraperResult: async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> CrawlResult:
queue = asyncio.PriorityQueue() queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url)) queue.put_nowait((0, 0, start_url))
visited = set() visited = set()
extracted_data = {}
depths = {start_url: 0} depths = {start_url: 0}
while not queue.empty(): while not queue.empty():
@@ -124,12 +131,10 @@ class BFSScraperStrategy(ScraperStrategy):
else: else:
result = await self.process_url(url, depth, crawler, queue, visited, depths) result = await self.process_url(url, depth, crawler, queue, visited, depths)
if result: if result:
extracted_data[result.url] = result yield result
if parallel_processing and tasks: if parallel_processing and tasks:
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
for result in results: for result in results:
if result: if result:
extracted_data[result.url] = result yield result
return ScraperResult(url=start_url, crawled_urls=list(visited), extracted_data=extracted_data)

View File

@@ -1,9 +1,26 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from .models import ScraperResult from .models import ScraperResult, CrawlResult
from ..models import CrawlResult from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator
class ScraperStrategy(ABC): class ScraperStrategy(ABC):
@abstractmethod @abstractmethod
async def ascrape(self, url: str, crawler: AsyncWebCrawler) -> ScraperResult: async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool=True, yield_results: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler.
Args:
url (str): The starting URL for the scrape.
crawler (AsyncWebCrawler): The web crawler instance.
parallel_processing (bool): Whether to use parallel processing. Defaults to True.
yield_results (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult.
Yields:
CrawlResult: Individual crawl results if yield_results is True.
Returns:
ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if yield_results is False.
"""
pass pass