Revieweing the BFS strategy.

This commit is contained in:
UncleCode
2024-11-07 18:54:53 +08:00
parent be472c624c
commit 3d1c9a8434
2 changed files with 368 additions and 111 deletions

View File

@@ -0,0 +1,138 @@
from .scraper_strategy import ScraperStrategy
from .filters import FilterChain
from .scorers import URLScorer
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
import asyncio
import validators
from urllib.parse import urljoin,urlparse,urlunparse
from urllib.robotparser import RobotFileParser
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
import logging
from typing import Dict, AsyncGenerator
logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
# For Crawl Politeness
self.last_crawl_time = defaultdict(float)
self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain
# For Robots.txt Compliance
self.robot_parsers = {}
# Robots.txt Parser
def get_robot_parser(self, url: str) -> RobotFileParser:
domain = urlparse(url)
scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided
netloc = domain.netloc
if netloc not in self.robot_parsers:
rp = RobotFileParser()
rp.set_url(f"{scheme}://{netloc}/robots.txt")
try:
rp.read()
except Exception as e:
# Log the type of error, message, and the URL
logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}")
return None
self.robot_parsers[netloc] = rp
return self.robot_parsers[netloc]
# Retry with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]:
def normalize_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse(parsed._replace(fragment=""))
# URL Validation
if not validators.url(url):
logging.warning(f"Invalid URL: {url}")
return None
# Robots.txt Compliance
robot_parser = self.get_robot_parser(url)
if robot_parser is None:
logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.")
else:
# If robots.txt was fetched, check if crawling is allowed
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness
domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain]
if time_since_last_crawl < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl)
self.last_crawl_time[domain] = time.time()
# Rate Limiting
async with rate_limiter:
# Error Handling
try:
crawl_result = await self.retry_crawl(crawler, url)
except Exception as e:
logging.error(f"Error crawling {url}: {str(e)}")
crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e))
if not crawl_result.success:
# Logging and Monitoring
logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}")
return crawl_result
# Process links
for link_type in ["internal", "external"]:
for link in crawl_result.links[link_type]:
absolute_link = urljoin(url, link['href'])
normalized_link = normalize_url(absolute_link)
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
new_depth = depths[url] + 1
if new_depth <= self.max_depth:
# URL Scoring
score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link))
depths[normalized_link] = new_depth
return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]:
queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url))
visited = set()
depths = {start_url: 0}
pending_tasks = set()
while not queue.empty() or pending_tasks:
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
_, depth, url = await queue.get()
if url not in visited:
# Adding URL to the visited set here itself, (instead of after result generation)
# so that other tasks are not queued for same URL, found at different depth before
# crawling and extraction of this task is completed.
visited.add(url)
if parallel_processing:
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
pending_tasks.add(task)
else:
result = await self.process_url(url, depth, crawler, queue, visited, depths)
if result:
yield result
# Wait for the first task to complete and yield results incrementally as each task is completed
if pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result:
yield result

View File

@@ -1,139 +1,258 @@
from .scraper_strategy import ScraperStrategy
from .filters import FilterChain
from .scorers import URLScorer
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
from abc import ABC, abstractmethod
from typing import Union, AsyncGenerator, Optional, Dict, Set
from dataclasses import dataclass
from datetime import datetime
import asyncio
import validators
from urllib.parse import urljoin,urlparse,urlunparse
import logging
from urllib.parse import urljoin, urlparse, urlunparse
from urllib.robotparser import RobotFileParser
import validators
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
import logging
from typing import Dict, AsyncGenerator
logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
from .models import ScraperResult, CrawlResult
from .filters import FilterChain
from .scorers import URLScorer
from ..async_webcrawler import AsyncWebCrawler
@dataclass
class CrawlStats:
"""Statistics for the crawling process"""
start_time: datetime
urls_processed: int = 0
urls_failed: int = 0
urls_skipped: int = 0
total_depth_reached: int = 0
current_depth: int = 0
robots_blocked: int = 0
class ScraperStrategy(ABC):
"""Base class for scraping strategies"""
@abstractmethod
async def ascrape(
self,
url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Abstract method for scraping implementation"""
pass
@abstractmethod
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on strategy rules"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass
class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1):
"""Breadth-First Search scraping strategy with politeness controls"""
def __init__(
self,
max_depth: int,
filter_chain: FilterChain,
url_scorer: URLScorer,
max_concurrent: int = 5,
min_crawl_delay: int = 1,
timeout: int = 30,
logger: Optional[logging.Logger] = None
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
# For Crawl Politeness
self.min_crawl_delay = min_crawl_delay
self.timeout = timeout
self.logger = logger or logging.getLogger(__name__)
# Crawl control
self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
# Rate limiting and politeness
self.rate_limiter = AsyncLimiter(1, 1)
self.last_crawl_time = defaultdict(float)
self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain
# For Robots.txt Compliance
self.robot_parsers = {}
self.robot_parsers: Dict[str, RobotFileParser] = {}
self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
# Robots.txt Parser
def get_robot_parser(self, url: str) -> RobotFileParser:
domain = urlparse(url)
scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided
netloc = domain.netloc
if netloc not in self.robot_parsers:
rp = RobotFileParser()
rp.set_url(f"{scheme}://{netloc}/robots.txt")
try:
rp.read()
except Exception as e:
# Log the type of error, message, and the URL
logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}")
return None
self.robot_parsers[netloc] = rp
return self.robot_parsers[netloc]
# Retry with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]:
def normalize_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse(parsed._replace(fragment=""))
# URL Validation
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on robots.txt and filters"""
if not validators.url(url):
logging.warning(f"Invalid URL: {url}")
return None
# Robots.txt Compliance
robot_parser = self.get_robot_parser(url)
if robot_parser is None:
logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.")
else:
# If robots.txt was fetched, check if crawling is allowed
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness
self.logger.warning(f"Invalid URL: {url}")
return False
robot_parser = await self._get_robot_parser(url)
if robot_parser and not robot_parser.can_fetch("*", url):
self.stats.robots_blocked += 1
self.logger.info(f"Blocked by robots.txt: {url}")
return False
return self.filter_chain.apply(url)
async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]:
"""Get or create robots.txt parser for domain"""
domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain]
if time_since_last_crawl < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl)
if domain not in self.robot_parsers:
parser = RobotFileParser()
try:
robots_url = f"{urlparse(url).scheme}://{domain}/robots.txt"
parser.set_url(robots_url)
parser.read()
self.robot_parsers[domain] = parser
except Exception as e:
self.logger.warning(f"Error fetching robots.txt for {domain}: {e}")
return None
return self.robot_parsers[domain]
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10))
async def _crawl_with_retry(
self,
crawler: AsyncWebCrawler,
url: str
) -> CrawlResult:
"""Crawl URL with retry logic"""
try:
async with asyncio.timeout(self.timeout):
return await crawler.arun(url)
except asyncio.TimeoutError:
self.logger.error(f"Timeout crawling {url}")
raise
async def process_url(
self,
url: str,
depth: int,
crawler: AsyncWebCrawler,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int]
) -> Optional[CrawlResult]:
"""Process a single URL and extract links"""
if self._cancel_event.is_set():
return None
if not await self.can_process_url(url):
self.stats.urls_skipped += 1
return None
# Politeness delay
domain = urlparse(url).netloc
time_since_last = time.time() - self.last_crawl_time[domain]
if time_since_last < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last)
self.last_crawl_time[domain] = time.time()
# Rate Limiting
async with rate_limiter:
# Error Handling
try:
crawl_result = await self.retry_crawl(crawler, url)
except Exception as e:
logging.error(f"Error crawling {url}: {str(e)}")
crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e))
if not crawl_result.success:
# Logging and Monitoring
logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}")
return crawl_result
# Crawl with rate limiting
try:
async with self.rate_limiter:
result = await self._crawl_with_retry(crawler, url)
self.stats.urls_processed += 1
except Exception as e:
self.logger.error(f"Error crawling {url}: {e}")
self.stats.urls_failed += 1
return None
# Process links
for link_type in ["internal", "external"]:
for link in crawl_result.links[link_type]:
absolute_link = urljoin(url, link['href'])
normalized_link = normalize_url(absolute_link)
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
new_depth = depths[url] + 1
if new_depth <= self.max_depth:
# URL Scoring
score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link))
depths[normalized_link] = new_depth
return crawl_result
await self._process_links(result, url, depth, queue, visited, depths)
return result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]:
async def _process_links(
self,
result: CrawlResult,
source_url: str,
depth: int,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int]
):
"""Process extracted links from crawl result"""
for link_type in ["internal", "external"]:
for link in result.links[link_type]:
url = urljoin(source_url, link['href'])
url = urlunparse(urlparse(url)._replace(fragment=""))
if url not in visited and await self.can_process_url(url):
new_depth = depths[source_url] + 1
if new_depth <= self.max_depth:
score = self.url_scorer.score(url)
await queue.put((score, new_depth, url))
depths[url] = new_depth
self.stats.total_depth_reached = max(
self.stats.total_depth_reached,
new_depth
)
async def ascrape(
self,
start_url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True
) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy"""
# Initialize crawl state
queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url))
visited = set()
await queue.put((0, 0, start_url))
visited: Set[str] = set()
depths = {start_url: 0}
pending_tasks = set()
try:
while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set():
# Start new tasks up to max_concurrent
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
_, depth, url = await queue.get()
if url not in visited:
visited.add(url)
self.stats.current_depth = depth
if parallel_processing:
task = asyncio.create_task(
self.process_url(url, depth, crawler, queue, visited, depths)
)
pending_tasks.add(task)
else:
result = await self.process_url(
url, depth, crawler, queue, visited, depths
)
if result:
yield result
while not queue.empty() or pending_tasks:
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
_, depth, url = await queue.get()
if url not in visited:
# Adding URL to the visited set here itself, (instead of after result generation)
# so that other tasks are not queued for same URL, found at different depth before
# crawling and extraction of this task is completed.
visited.add(url)
if parallel_processing:
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
pending_tasks.add(task)
else:
result = await self.process_url(url, depth, crawler, queue, visited, depths)
# Process completed tasks
if pending_tasks:
done, pending_tasks = await asyncio.wait(
pending_tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if result:
yield result
yield result
except Exception as e:
self.logger.error(f"Error in crawl process: {e}")
raise
finally:
# Clean up any remaining tasks
for task in pending_tasks:
task.cancel()
self.stats.end_time = datetime.now()
# Wait for the first task to complete and yield results incrementally as each task is completed
if pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result:
yield result
async def shutdown(self):
"""Clean up resources and stop crawling"""
self._cancel_event.set()
# Clear caches and close connections
self.robot_parsers.clear()
self.domain_queues.clear()