Parallel processing with retry on failure with exponential backoff - Simplified URL validation and normalisation - respecting Robots.txt

This commit is contained in:
Aravind Karnam
2024-09-19 12:34:12 +05:30
parent 78f26ac263
commit 7f3e2e47ed
5 changed files with 116 additions and 33 deletions

View File

@@ -0,0 +1,2 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy

View File

@@ -24,8 +24,7 @@ class AsyncWebScraper:
self.batch_processor = BatchProcessor(batch_size, concurrency_limit) self.batch_processor = BatchProcessor(batch_size, concurrency_limit)
async def ascrape(self, url: str) -> ScraperResult: async def ascrape(self, url: str) -> ScraperResult:
crawl_result = await self.crawler.arun(url) return await self.strategy.ascrape(url, self.crawler)
return await self.strategy.ascrape(url, crawl_result, self.crawler)
async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]: async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]:
all_results = [] all_results = []

View File

@@ -5,46 +5,127 @@ from .models import ScraperResult
from ..models import CrawlResult from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
import asyncio import asyncio
from urllib.parse import urljoin import validators
from urllib.parse import urljoin,urlparse,urlunparse
from urllib.robotparser import RobotFileParser
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
import logging
logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
class BFSScraperStrategy(ScraperStrategy): class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer): def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5):
self.max_depth = max_depth self.max_depth = max_depth
self.filter_chain = filter_chain self.filter_chain = filter_chain
self.url_scorer = url_scorer self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
# 9. Crawl Politeness
self.last_crawl_time = defaultdict(float)
self.min_crawl_delay = 1 # 1 second delay between requests to the same domain
# 5. Robots.txt Compliance
self.robot_parsers = {}
# Robots.txt Parser
def get_robot_parser(self, url: str) -> RobotFileParser:
domain = urlparse(url).netloc
if domain not in self.robot_parsers:
rp = RobotFileParser()
rp.set_url(f"https://{domain}/robots.txt")
rp.read()
self.robot_parsers[domain] = rp
return self.robot_parsers[domain]
# Retry with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set) -> CrawlResult:
def normalize_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse(parsed._replace(fragment=""))
# URL Validation
if not validators.url(url):
logging.warning(f"Invalid URL: {url}")
return None
# Robots.txt Compliance
if not self.get_robot_parser(url).can_fetch("YourUserAgent", url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness
domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain]
if time_since_last_crawl < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl)
self.last_crawl_time[domain] = time.time()
async def ascrape(self, start_url: str, initial_crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult: # Rate Limiting
async with rate_limiter:
# Error Handling
try:
crawl_result = await self.retry_crawl(crawler, url)
except Exception as e:
logging.error(f"Error crawling {url}: {str(e)}")
crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e))
if not crawl_result.success:
# Logging and Monitoring
logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}")
# Error Categorization
if crawl_result.status_code == 404:
self.remove_from_future_crawls(url)
elif crawl_result.status_code == 503:
await self.add_to_retry_queue(url)
return crawl_result
# Content Type Checking
# if 'text/html' not in crawl_result.response_header.get('Content-Type', ''):
# logging.info(f"Skipping non-HTML content: {url}")
# return crawl_result
visited.add(url)
# Process links
for link_type in ["internal", "external"]:
for link in crawl_result.links[link_type]:
absolute_link = urljoin(url, link['href'])
normalized_link = normalize_url(absolute_link)
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
new_depth = depth + 1
if new_depth <= self.max_depth:
# URL Scoring
score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link))
return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler) -> ScraperResult:
queue = asyncio.PriorityQueue() queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url)) # (score, depth, url) queue.put_nowait((0, 0, start_url))
visited = set() visited = set()
crawled_urls = [] crawled_urls = []
extracted_data = {} extracted_data = {}
while not queue.empty(): while not queue.empty():
_, depth, url = await queue.get() tasks = []
if depth > self.max_depth or url in visited: while not queue.empty() and len(tasks) < self.max_concurrent:
continue _, depth, url = await queue.get()
crawl_result = initial_crawl_result if url == start_url else await crawler.arun(url) if url not in visited:
visited.add(url) task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited))
crawled_urls.append(url) tasks.append(task)
extracted_data[url]=crawl_result
if crawl_result.success == False: if tasks:
print(f"failed to crawl -- {url}") results = await asyncio.gather(*tasks)
continue for result in results:
for internal in crawl_result.links["internal"]: if result:
link = internal['href'] crawled_urls.append(result.url)
is_special_uri = any(link.startswith(scheme) for scheme in ('tel:', 'mailto:', 'sms:', 'geo:', 'fax:', 'file:', 'data:', 'sip:', 'ircs:', 'magnet:')) extracted_data[result.url] = result
is_fragment = '#' in link
if not (is_fragment or is_special_uri):
# To fix partial links: eg:'/support' to 'https://example.com/support'
absolute_link = urljoin(url, link)
if self.filter_chain.apply(absolute_link) and absolute_link not in visited:
score = self.url_scorer.score(absolute_link)
await queue.put((1 / score, depth + 1, absolute_link))
for external in crawl_result.links["external"]:
link = external['href']
if self.filter_chain.apply(link) and link not in visited:
score = self.url_scorer.score(link)
await queue.put((1 / score, depth + 1, link))
return ScraperResult(url=start_url, crawled_urls=crawled_urls, extracted_data=extracted_data) return ScraperResult(url=start_url, crawled_urls=crawled_urls, extracted_data=extracted_data)

View File

@@ -1,7 +1,8 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Dict from typing import List, Dict
from ..models import CrawlResult
class ScraperResult(BaseModel): class ScraperResult(BaseModel):
url: str url: str
crawled_urls: List[str] crawled_urls: List[str]
extracted_data: Dict extracted_data: Dict[str,CrawlResult]

View File

@@ -5,5 +5,5 @@ from ..async_webcrawler import AsyncWebCrawler
class ScraperStrategy(ABC): class ScraperStrategy(ABC):
@abstractmethod @abstractmethod
async def ascrape(self, url: str, crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult: async def ascrape(self, url: str, crawler: AsyncWebCrawler) -> ScraperResult:
pass pass