From 3d1c9a84349ad4b7507ba257760071d4177c205a Mon Sep 17 00:00:00 2001 From: UncleCode Date: Thu, 7 Nov 2024 18:54:53 +0800 Subject: [PATCH] Revieweing the BFS strategy. --- crawl4ai/scraper/bfs_scraper_strategy copy.py | 138 +++++++ crawl4ai/scraper/bfs_scraper_strategy.py | 341 ++++++++++++------ 2 files changed, 368 insertions(+), 111 deletions(-) create mode 100644 crawl4ai/scraper/bfs_scraper_strategy copy.py diff --git a/crawl4ai/scraper/bfs_scraper_strategy copy.py b/crawl4ai/scraper/bfs_scraper_strategy copy.py new file mode 100644 index 00000000..51bf9cb3 --- /dev/null +++ b/crawl4ai/scraper/bfs_scraper_strategy copy.py @@ -0,0 +1,138 @@ +from .scraper_strategy import ScraperStrategy +from .filters import FilterChain +from .scorers import URLScorer +from ..models import CrawlResult +from ..async_webcrawler import AsyncWebCrawler +import asyncio +import validators +from urllib.parse import urljoin,urlparse,urlunparse +from urllib.robotparser import RobotFileParser +import time +from aiolimiter import AsyncLimiter +from tenacity import retry, stop_after_attempt, wait_exponential +from collections import defaultdict +import logging +from typing import Dict, AsyncGenerator +logging.basicConfig(level=logging.DEBUG) + +rate_limiter = AsyncLimiter(1, 1) # 1 request per second + +class BFSScraperStrategy(ScraperStrategy): + def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1): + self.max_depth = max_depth + self.filter_chain = filter_chain + self.url_scorer = url_scorer + self.max_concurrent = max_concurrent + # For Crawl Politeness + self.last_crawl_time = defaultdict(float) + self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain + # For Robots.txt Compliance + self.robot_parsers = {} + + # Robots.txt Parser + def get_robot_parser(self, url: str) -> RobotFileParser: + domain = urlparse(url) + scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided + netloc = domain.netloc + if netloc not in self.robot_parsers: + rp = RobotFileParser() + rp.set_url(f"{scheme}://{netloc}/robots.txt") + try: + rp.read() + except Exception as e: + # Log the type of error, message, and the URL + logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}") + return None + self.robot_parsers[netloc] = rp + return self.robot_parsers[netloc] + + # Retry with exponential backoff + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult: + return await crawler.arun(url) + + async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]: + def normalize_url(url: str) -> str: + parsed = urlparse(url) + return urlunparse(parsed._replace(fragment="")) + + # URL Validation + if not validators.url(url): + logging.warning(f"Invalid URL: {url}") + return None + + # Robots.txt Compliance + robot_parser = self.get_robot_parser(url) + if robot_parser is None: + logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.") + else: + # If robots.txt was fetched, check if crawling is allowed + if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): + logging.info(f"Skipping {url} as per robots.txt") + return None + + # Crawl Politeness + domain = urlparse(url).netloc + time_since_last_crawl = time.time() - self.last_crawl_time[domain] + if time_since_last_crawl < self.min_crawl_delay: + await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl) + self.last_crawl_time[domain] = time.time() + + # Rate Limiting + async with rate_limiter: + # Error Handling + try: + crawl_result = await self.retry_crawl(crawler, url) + except Exception as e: + logging.error(f"Error crawling {url}: {str(e)}") + crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e)) + + if not crawl_result.success: + # Logging and Monitoring + logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}") + return crawl_result + + # Process links + for link_type in ["internal", "external"]: + for link in crawl_result.links[link_type]: + absolute_link = urljoin(url, link['href']) + normalized_link = normalize_url(absolute_link) + if self.filter_chain.apply(normalized_link) and normalized_link not in visited: + new_depth = depths[url] + 1 + if new_depth <= self.max_depth: + # URL Scoring + score = self.url_scorer.score(normalized_link) + await queue.put((score, new_depth, normalized_link)) + depths[normalized_link] = new_depth + return crawl_result + + async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]: + queue = asyncio.PriorityQueue() + queue.put_nowait((0, 0, start_url)) + visited = set() + depths = {start_url: 0} + pending_tasks = set() + + while not queue.empty() or pending_tasks: + while not queue.empty() and len(pending_tasks) < self.max_concurrent: + _, depth, url = await queue.get() + if url not in visited: + # Adding URL to the visited set here itself, (instead of after result generation) + # so that other tasks are not queued for same URL, found at different depth before + # crawling and extraction of this task is completed. + visited.add(url) + if parallel_processing: + task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths)) + pending_tasks.add(task) + else: + result = await self.process_url(url, depth, crawler, queue, visited, depths) + if result: + yield result + + # Wait for the first task to complete and yield results incrementally as each task is completed + if pending_tasks: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + result = await task + if result: + yield result \ No newline at end of file diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index ce4d0127..4506dbfe 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -1,139 +1,258 @@ -from .scraper_strategy import ScraperStrategy -from .filters import FilterChain -from .scorers import URLScorer -from ..models import CrawlResult -from ..async_webcrawler import AsyncWebCrawler +from abc import ABC, abstractmethod +from typing import Union, AsyncGenerator, Optional, Dict, Set +from dataclasses import dataclass +from datetime import datetime import asyncio -import validators -from urllib.parse import urljoin,urlparse,urlunparse +import logging +from urllib.parse import urljoin, urlparse, urlunparse from urllib.robotparser import RobotFileParser +import validators import time from aiolimiter import AsyncLimiter from tenacity import retry, stop_after_attempt, wait_exponential from collections import defaultdict -import logging -from typing import Dict, AsyncGenerator -logging.basicConfig(level=logging.DEBUG) -rate_limiter = AsyncLimiter(1, 1) # 1 request per second +from .models import ScraperResult, CrawlResult +from .filters import FilterChain +from .scorers import URLScorer +from ..async_webcrawler import AsyncWebCrawler + +@dataclass +class CrawlStats: + """Statistics for the crawling process""" + start_time: datetime + urls_processed: int = 0 + urls_failed: int = 0 + urls_skipped: int = 0 + total_depth_reached: int = 0 + current_depth: int = 0 + robots_blocked: int = 0 + +class ScraperStrategy(ABC): + """Base class for scraping strategies""" + + @abstractmethod + async def ascrape( + self, + url: str, + crawler: AsyncWebCrawler, + parallel_processing: bool = True, + stream: bool = False + ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + """Abstract method for scraping implementation""" + pass + + @abstractmethod + async def can_process_url(self, url: str) -> bool: + """Check if URL can be processed based on strategy rules""" + pass + + @abstractmethod + async def shutdown(self): + """Clean up resources used by the strategy""" + pass class BFSScraperStrategy(ScraperStrategy): - def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1): + """Breadth-First Search scraping strategy with politeness controls""" + + def __init__( + self, + max_depth: int, + filter_chain: FilterChain, + url_scorer: URLScorer, + max_concurrent: int = 5, + min_crawl_delay: int = 1, + timeout: int = 30, + logger: Optional[logging.Logger] = None + ): self.max_depth = max_depth self.filter_chain = filter_chain self.url_scorer = url_scorer self.max_concurrent = max_concurrent - # For Crawl Politeness + self.min_crawl_delay = min_crawl_delay + self.timeout = timeout + self.logger = logger or logging.getLogger(__name__) + + # Crawl control + self.stats = CrawlStats(start_time=datetime.now()) + self._cancel_event = asyncio.Event() + + # Rate limiting and politeness + self.rate_limiter = AsyncLimiter(1, 1) self.last_crawl_time = defaultdict(float) - self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain - # For Robots.txt Compliance - self.robot_parsers = {} + self.robot_parsers: Dict[str, RobotFileParser] = {} + self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) - # Robots.txt Parser - def get_robot_parser(self, url: str) -> RobotFileParser: - domain = urlparse(url) - scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided - netloc = domain.netloc - if netloc not in self.robot_parsers: - rp = RobotFileParser() - rp.set_url(f"{scheme}://{netloc}/robots.txt") - try: - rp.read() - except Exception as e: - # Log the type of error, message, and the URL - logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}") - return None - self.robot_parsers[netloc] = rp - return self.robot_parsers[netloc] - - - # Retry with exponential backoff - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) - async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult: - return await crawler.arun(url) - - async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]: - def normalize_url(url: str) -> str: - parsed = urlparse(url) - return urlunparse(parsed._replace(fragment="")) - - # URL Validation + async def can_process_url(self, url: str) -> bool: + """Check if URL can be processed based on robots.txt and filters""" if not validators.url(url): - logging.warning(f"Invalid URL: {url}") - return None - - # Robots.txt Compliance - robot_parser = self.get_robot_parser(url) - if robot_parser is None: - logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.") - else: - # If robots.txt was fetched, check if crawling is allowed - if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url): - logging.info(f"Skipping {url} as per robots.txt") - return None - - # Crawl Politeness + self.logger.warning(f"Invalid URL: {url}") + return False + + robot_parser = await self._get_robot_parser(url) + if robot_parser and not robot_parser.can_fetch("*", url): + self.stats.robots_blocked += 1 + self.logger.info(f"Blocked by robots.txt: {url}") + return False + + return self.filter_chain.apply(url) + + async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]: + """Get or create robots.txt parser for domain""" domain = urlparse(url).netloc - time_since_last_crawl = time.time() - self.last_crawl_time[domain] - if time_since_last_crawl < self.min_crawl_delay: - await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl) + if domain not in self.robot_parsers: + parser = RobotFileParser() + try: + robots_url = f"{urlparse(url).scheme}://{domain}/robots.txt" + parser.set_url(robots_url) + parser.read() + self.robot_parsers[domain] = parser + except Exception as e: + self.logger.warning(f"Error fetching robots.txt for {domain}: {e}") + return None + return self.robot_parsers[domain] + + @retry(stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10)) + async def _crawl_with_retry( + self, + crawler: AsyncWebCrawler, + url: str + ) -> CrawlResult: + """Crawl URL with retry logic""" + try: + async with asyncio.timeout(self.timeout): + return await crawler.arun(url) + except asyncio.TimeoutError: + self.logger.error(f"Timeout crawling {url}") + raise + + async def process_url( + self, + url: str, + depth: int, + crawler: AsyncWebCrawler, + queue: asyncio.PriorityQueue, + visited: Set[str], + depths: Dict[str, int] + ) -> Optional[CrawlResult]: + """Process a single URL and extract links""" + + if self._cancel_event.is_set(): + return None + + if not await self.can_process_url(url): + self.stats.urls_skipped += 1 + return None + + # Politeness delay + domain = urlparse(url).netloc + time_since_last = time.time() - self.last_crawl_time[domain] + if time_since_last < self.min_crawl_delay: + await asyncio.sleep(self.min_crawl_delay - time_since_last) self.last_crawl_time[domain] = time.time() - # Rate Limiting - async with rate_limiter: - # Error Handling - try: - crawl_result = await self.retry_crawl(crawler, url) - except Exception as e: - logging.error(f"Error crawling {url}: {str(e)}") - crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e)) - - if not crawl_result.success: - # Logging and Monitoring - logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}") - return crawl_result + # Crawl with rate limiting + try: + async with self.rate_limiter: + result = await self._crawl_with_retry(crawler, url) + self.stats.urls_processed += 1 + except Exception as e: + self.logger.error(f"Error crawling {url}: {e}") + self.stats.urls_failed += 1 + return None # Process links - for link_type in ["internal", "external"]: - for link in crawl_result.links[link_type]: - absolute_link = urljoin(url, link['href']) - normalized_link = normalize_url(absolute_link) - if self.filter_chain.apply(normalized_link) and normalized_link not in visited: - new_depth = depths[url] + 1 - if new_depth <= self.max_depth: - # URL Scoring - score = self.url_scorer.score(normalized_link) - await queue.put((score, new_depth, normalized_link)) - depths[normalized_link] = new_depth - return crawl_result + await self._process_links(result, url, depth, queue, visited, depths) + + return result - async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]: + async def _process_links( + self, + result: CrawlResult, + source_url: str, + depth: int, + queue: asyncio.PriorityQueue, + visited: Set[str], + depths: Dict[str, int] + ): + """Process extracted links from crawl result""" + for link_type in ["internal", "external"]: + for link in result.links[link_type]: + url = urljoin(source_url, link['href']) + url = urlunparse(urlparse(url)._replace(fragment="")) + + if url not in visited and await self.can_process_url(url): + new_depth = depths[source_url] + 1 + if new_depth <= self.max_depth: + score = self.url_scorer.score(url) + await queue.put((score, new_depth, url)) + depths[url] = new_depth + self.stats.total_depth_reached = max( + self.stats.total_depth_reached, + new_depth + ) + + async def ascrape( + self, + start_url: str, + crawler: AsyncWebCrawler, + parallel_processing: bool = True + ) -> AsyncGenerator[CrawlResult, None]: + """Implement BFS crawling strategy""" + + # Initialize crawl state queue = asyncio.PriorityQueue() - queue.put_nowait((0, 0, start_url)) - visited = set() + await queue.put((0, 0, start_url)) + visited: Set[str] = set() depths = {start_url: 0} pending_tasks = set() + + try: + while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set(): + # Start new tasks up to max_concurrent + while not queue.empty() and len(pending_tasks) < self.max_concurrent: + _, depth, url = await queue.get() + if url not in visited: + visited.add(url) + self.stats.current_depth = depth + + if parallel_processing: + task = asyncio.create_task( + self.process_url(url, depth, crawler, queue, visited, depths) + ) + pending_tasks.add(task) + else: + result = await self.process_url( + url, depth, crawler, queue, visited, depths + ) + if result: + yield result - while not queue.empty() or pending_tasks: - while not queue.empty() and len(pending_tasks) < self.max_concurrent: - _, depth, url = await queue.get() - if url not in visited: - # Adding URL to the visited set here itself, (instead of after result generation) - # so that other tasks are not queued for same URL, found at different depth before - # crawling and extraction of this task is completed. - visited.add(url) - if parallel_processing: - task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths)) - pending_tasks.add(task) - else: - result = await self.process_url(url, depth, crawler, queue, visited, depths) + # Process completed tasks + if pending_tasks: + done, pending_tasks = await asyncio.wait( + pending_tasks, + return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + result = await task if result: - yield result + yield result + + except Exception as e: + self.logger.error(f"Error in crawl process: {e}") + raise + + finally: + # Clean up any remaining tasks + for task in pending_tasks: + task.cancel() + self.stats.end_time = datetime.now() - # Wait for the first task to complete and yield results incrementally as each task is completed - if pending_tasks: - done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) - for task in done: - result = await task - if result: - yield result \ No newline at end of file + async def shutdown(self): + """Clean up resources and stop crawling""" + self._cancel_event.set() + # Clear caches and close connections + self.robot_parsers.clear() + self.domain_queues.clear() \ No newline at end of file