From 7f3e2e47ed99de1db4bc99d69d0a6f1ddaef962f Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Thu, 19 Sep 2024 12:34:12 +0530 Subject: [PATCH] Parallel processing with retry on failure with exponential backoff - Simplified URL validation and normalisation - respecting Robots.txt --- crawl4ai/scraper/__init__.py | 2 + crawl4ai/scraper/async_web_scraper.py | 3 +- crawl4ai/scraper/bfs_scraper_strategy.py | 139 ++++++++++++++++++----- crawl4ai/scraper/models.py | 3 +- crawl4ai/scraper/scraper_strategy.py | 2 +- 5 files changed, 116 insertions(+), 33 deletions(-) diff --git a/crawl4ai/scraper/__init__.py b/crawl4ai/scraper/__init__.py index e69de29b..1997e162 100644 --- a/crawl4ai/scraper/__init__.py +++ b/crawl4ai/scraper/__init__.py @@ -0,0 +1,2 @@ +from .async_web_scraper import AsyncWebScraper +from .bfs_scraper_strategy import BFSScraperStrategy \ No newline at end of file diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index c67f0e14..6cf5488c 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -24,8 +24,7 @@ class AsyncWebScraper: self.batch_processor = BatchProcessor(batch_size, concurrency_limit) async def ascrape(self, url: str) -> ScraperResult: - crawl_result = await self.crawler.arun(url) - return await self.strategy.ascrape(url, crawl_result, self.crawler) + return await self.strategy.ascrape(url, self.crawler) async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]: all_results = [] diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 9add962e..a8fb1fe1 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -5,46 +5,127 @@ from .models import ScraperResult from ..models import CrawlResult from ..async_webcrawler import AsyncWebCrawler import asyncio -from urllib.parse import urljoin +import validators +from urllib.parse import urljoin,urlparse,urlunparse +from urllib.robotparser import RobotFileParser +import time +from aiolimiter import AsyncLimiter +from tenacity import retry, stop_after_attempt, wait_exponential +from collections import defaultdict +import logging +logging.basicConfig(level=logging.DEBUG) + +rate_limiter = AsyncLimiter(1, 1) # 1 request per second class BFSScraperStrategy(ScraperStrategy): - def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer): + def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5): self.max_depth = max_depth self.filter_chain = filter_chain self.url_scorer = url_scorer + self.max_concurrent = max_concurrent + # 9. Crawl Politeness + self.last_crawl_time = defaultdict(float) + self.min_crawl_delay = 1 # 1 second delay between requests to the same domain + # 5. Robots.txt Compliance + self.robot_parsers = {} + + # Robots.txt Parser + def get_robot_parser(self, url: str) -> RobotFileParser: + domain = urlparse(url).netloc + if domain not in self.robot_parsers: + rp = RobotFileParser() + rp.set_url(f"https://{domain}/robots.txt") + rp.read() + self.robot_parsers[domain] = rp + return self.robot_parsers[domain] + + # Retry with exponential backoff + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult: + return await crawler.arun(url) + + async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set) -> CrawlResult: + def normalize_url(url: str) -> str: + parsed = urlparse(url) + return urlunparse(parsed._replace(fragment="")) + + # URL Validation + if not validators.url(url): + logging.warning(f"Invalid URL: {url}") + return None + + # Robots.txt Compliance + if not self.get_robot_parser(url).can_fetch("YourUserAgent", url): + logging.info(f"Skipping {url} as per robots.txt") + return None + + # Crawl Politeness + domain = urlparse(url).netloc + time_since_last_crawl = time.time() - self.last_crawl_time[domain] + if time_since_last_crawl < self.min_crawl_delay: + await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl) + self.last_crawl_time[domain] = time.time() - async def ascrape(self, start_url: str, initial_crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult: + # Rate Limiting + async with rate_limiter: + # Error Handling + try: + crawl_result = await self.retry_crawl(crawler, url) + except Exception as e: + logging.error(f"Error crawling {url}: {str(e)}") + crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e)) + + if not crawl_result.success: + # Logging and Monitoring + logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}") + # Error Categorization + if crawl_result.status_code == 404: + self.remove_from_future_crawls(url) + elif crawl_result.status_code == 503: + await self.add_to_retry_queue(url) + return crawl_result + + # Content Type Checking + # if 'text/html' not in crawl_result.response_header.get('Content-Type', ''): + # logging.info(f"Skipping non-HTML content: {url}") + # return crawl_result + + visited.add(url) + + # Process links + for link_type in ["internal", "external"]: + for link in crawl_result.links[link_type]: + absolute_link = urljoin(url, link['href']) + normalized_link = normalize_url(absolute_link) + if self.filter_chain.apply(normalized_link) and normalized_link not in visited: + new_depth = depth + 1 + if new_depth <= self.max_depth: + # URL Scoring + score = self.url_scorer.score(normalized_link) + await queue.put((score, new_depth, normalized_link)) + + return crawl_result + + async def ascrape(self, start_url: str, crawler: AsyncWebCrawler) -> ScraperResult: queue = asyncio.PriorityQueue() - queue.put_nowait((0, 0, start_url)) # (score, depth, url) + queue.put_nowait((0, 0, start_url)) visited = set() crawled_urls = [] extracted_data = {} while not queue.empty(): - _, depth, url = await queue.get() - if depth > self.max_depth or url in visited: - continue - crawl_result = initial_crawl_result if url == start_url else await crawler.arun(url) - visited.add(url) - crawled_urls.append(url) - extracted_data[url]=crawl_result - if crawl_result.success == False: - print(f"failed to crawl -- {url}") - continue - for internal in crawl_result.links["internal"]: - link = internal['href'] - is_special_uri = any(link.startswith(scheme) for scheme in ('tel:', 'mailto:', 'sms:', 'geo:', 'fax:', 'file:', 'data:', 'sip:', 'ircs:', 'magnet:')) - is_fragment = '#' in link - if not (is_fragment or is_special_uri): - # To fix partial links: eg:'/support' to 'https://example.com/support' - absolute_link = urljoin(url, link) - if self.filter_chain.apply(absolute_link) and absolute_link not in visited: - score = self.url_scorer.score(absolute_link) - await queue.put((1 / score, depth + 1, absolute_link)) - for external in crawl_result.links["external"]: - link = external['href'] - if self.filter_chain.apply(link) and link not in visited: - score = self.url_scorer.score(link) - await queue.put((1 / score, depth + 1, link)) + tasks = [] + while not queue.empty() and len(tasks) < self.max_concurrent: + _, depth, url = await queue.get() + if url not in visited: + task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited)) + tasks.append(task) + + if tasks: + results = await asyncio.gather(*tasks) + for result in results: + if result: + crawled_urls.append(result.url) + extracted_data[result.url] = result return ScraperResult(url=start_url, crawled_urls=crawled_urls, extracted_data=extracted_data) \ No newline at end of file diff --git a/crawl4ai/scraper/models.py b/crawl4ai/scraper/models.py index 9ffdac52..735d1d58 100644 --- a/crawl4ai/scraper/models.py +++ b/crawl4ai/scraper/models.py @@ -1,7 +1,8 @@ from pydantic import BaseModel from typing import List, Dict +from ..models import CrawlResult class ScraperResult(BaseModel): url: str crawled_urls: List[str] - extracted_data: Dict \ No newline at end of file + extracted_data: Dict[str,CrawlResult] \ No newline at end of file diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index 16df9ece..6d1cdc74 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -5,5 +5,5 @@ from ..async_webcrawler import AsyncWebCrawler class ScraperStrategy(ABC): @abstractmethod - async def ascrape(self, url: str, crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult: + async def ascrape(self, url: str, crawler: AsyncWebCrawler) -> ScraperResult: pass \ No newline at end of file