diff --git a/crawl4ai/scraper/__init__.py b/crawl4ai/scraper/__init__.py new file mode 100644 index 00000000..5af7ad6b --- /dev/null +++ b/crawl4ai/scraper/__init__.py @@ -0,0 +1,5 @@ +from .async_web_scraper import AsyncWebScraper +from .bfs_scraper_strategy import BFSScraperStrategy +from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter, DomainFilter +from .scorers import KeywordRelevanceScorer, PathDepthScorer, FreshnessScorer, CompositeScorer +from .scraper_strategy import ScraperStrategy \ No newline at end of file diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py new file mode 100644 index 00000000..45a35306 --- /dev/null +++ b/crawl4ai/scraper/async_web_scraper.py @@ -0,0 +1,123 @@ +from typing import Union, AsyncGenerator, Optional +from .scraper_strategy import ScraperStrategy +from .models import ScraperResult, CrawlResult +from ..async_webcrawler import AsyncWebCrawler +import logging +from dataclasses import dataclass +from contextlib import asynccontextmanager + +@dataclass +class ScrapingProgress: + """Tracks the progress of a scraping operation.""" + processed_urls: int = 0 + failed_urls: int = 0 + current_url: Optional[str] = None + +class AsyncWebScraper: + """ + A high-level web scraper that combines an async crawler with a scraping strategy. + + Args: + crawler (AsyncWebCrawler): The async web crawler implementation + strategy (ScraperStrategy): The scraping strategy to use + logger (Optional[logging.Logger]): Custom logger for the scraper + """ + + def __init__( + self, + crawler: AsyncWebCrawler, + strategy: ScraperStrategy, + logger: Optional[logging.Logger] = None + ): + if not isinstance(crawler, AsyncWebCrawler): + raise TypeError("crawler must be an instance of AsyncWebCrawler") + if not isinstance(strategy, ScraperStrategy): + raise TypeError("strategy must be an instance of ScraperStrategy") + + self.crawler = crawler + self.strategy = strategy + self.logger = logger or logging.getLogger(__name__) + self._progress = ScrapingProgress() + + @property + def progress(self) -> ScrapingProgress: + """Get current scraping progress.""" + return self._progress + + @asynccontextmanager + async def _error_handling_context(self, url: str): + """Context manager for handling errors during scraping.""" + try: + yield + except Exception as e: + self.logger.error(f"Error scraping {url}: {str(e)}") + self._progress.failed_urls += 1 + raise + + async def ascrape( + self, + url: str, + parallel_processing: bool = True, + stream: bool = False + ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + """ + Scrape a website starting from the given URL. + + Args: + url: Starting URL for scraping + parallel_processing: Whether to process URLs in parallel + stream: If True, yield results as they come; if False, collect all results + + Returns: + Either an async generator yielding CrawlResults or a final ScraperResult + """ + self._progress = ScrapingProgress() # Reset progress + + async with self._error_handling_context(url): + if stream: + return self._ascrape_yielding(url, parallel_processing) + return await self._ascrape_collecting(url, parallel_processing) + + async def _ascrape_yielding( + self, + url: str, + parallel_processing: bool + ) -> AsyncGenerator[CrawlResult, None]: + """Stream scraping results as they become available.""" + try: + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: + self._progress.processed_urls += 1 + self._progress.current_url = res.url + yield res + except Exception as e: + self.logger.error(f"Error in streaming scrape: {str(e)}") + raise + + async def _ascrape_collecting( + self, + url: str, + parallel_processing: bool + ) -> ScraperResult: + """Collect all scraping results before returning.""" + extracted_data = {} + + try: + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: + self._progress.processed_urls += 1 + self._progress.current_url = res.url + extracted_data[res.url] = res + + return ScraperResult( + url=url, + crawled_urls=list(extracted_data.keys()), + extracted_data=extracted_data, + stats={ + 'processed_urls': self._progress.processed_urls, + 'failed_urls': self._progress.failed_urls + } + ) + except Exception as e: + self.logger.error(f"Error in collecting scrape: {str(e)}") + raise \ No newline at end of file diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py new file mode 100644 index 00000000..eb7f4cd8 --- /dev/null +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -0,0 +1,312 @@ +from typing import AsyncGenerator, Optional, Dict, Set +from dataclasses import dataclass +from datetime import datetime +import asyncio +import logging +from urllib.parse import urlparse +from urllib.robotparser import RobotFileParser +import validators +import time +from aiolimiter import AsyncLimiter +from tenacity import retry, stop_after_attempt, wait_exponential +from collections import defaultdict + +from .models import CrawlResult +from .filters import FilterChain +from .scorers import URLScorer +from ..async_webcrawler import AsyncWebCrawler, CrawlerRunConfig +from .scraper_strategy import ScraperStrategy + +@dataclass +class CrawlStats: + """Statistics for the crawling process""" + start_time: datetime + urls_processed: int = 0 + urls_failed: int = 0 + urls_skipped: int = 0 + total_depth_reached: int = 0 + current_depth: int = 0 + robots_blocked: int = 0 + +class BFSScraperStrategy(ScraperStrategy): + """Breadth-First Search scraping strategy with politeness controls""" + + def __init__( + self, + max_depth: int, + filter_chain: FilterChain, + url_scorer: URLScorer, + process_external_links: bool = False, + max_concurrent: int = 5, + min_crawl_delay: int = 1, + timeout: int = 30, + logger: Optional[logging.Logger] = None + ): + self.max_depth = max_depth + self.filter_chain = filter_chain + self.url_scorer = url_scorer + self.max_concurrent = max_concurrent + self.min_crawl_delay = min_crawl_delay + self.timeout = timeout + self.logger = logger or logging.getLogger(__name__) + + # Crawl control + self.stats = CrawlStats(start_time=datetime.now()) + self._cancel_event = asyncio.Event() + self.process_external_links = process_external_links + + # Rate limiting and politeness + self.rate_limiter = AsyncLimiter(1, 1) + self.last_crawl_time = defaultdict(float) + self.robot_parsers: Dict[str, RobotFileParser] = {} + self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) + + async def can_process_url(self, url: str, depth: int) -> bool: + """Check if URL can be processed based on robots.txt and filters + This is our gatekeeper method that determines if a URL should be processed. It: + - Validates URL format using the validators library + - Checks robots.txt permissions for the domain + - Applies custom filters from the filter chain + - Updates statistics for blocked URLs + - Returns False early if any check fails + """ + if not validators.url(url): + self.logger.warning(f"Invalid URL: {url}") + return False + + robot_parser = await self._get_robot_parser(url) + if robot_parser and not robot_parser.can_fetch("*", url): + self.stats.robots_blocked += 1 + self.logger.info(f"Blocked by robots.txt: {url}") + return False + + # Apply the filter chain it's not start page + if depth != 0 and not self.filter_chain.apply(url): + return False + + return True + + async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]: + """Get or create robots.txt parser for domain. + This is our robots.txt manager that: + - Uses domain-level caching of robot parsers + - Creates and caches new parsers as needed + - Handles failed robots.txt fetches gracefully + - Returns None if robots.txt can't be fetched, allowing crawling to proceed + """ + domain = urlparse(url).netloc + if domain not in self.robot_parsers: + parser = RobotFileParser() + try: + robots_url = f"{urlparse(url).scheme}://{domain}/robots.txt" + parser.set_url(robots_url) + parser.read() + self.robot_parsers[domain] = parser + except Exception as e: + self.logger.warning(f"Error fetching robots.txt for {domain}: {e}") + return None + return self.robot_parsers[domain] + + @retry(stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10)) + async def _crawl_with_retry( + self, + crawler: AsyncWebCrawler, + url: str + ) -> CrawlResult: + """Crawl URL with retry logic""" + try: + crawler_config = CrawlerRunConfig(cache_mode="BYPASS") + return await asyncio.wait_for(crawler.arun(url, config=crawler_config), timeout=self.timeout) + except asyncio.TimeoutError: + self.logger.error(f"Timeout crawling {url}") + raise + except Exception as e: + # Catch any other exceptions that may cause retries + self.logger.error(f"Error crawling {url}: {e}") + raise + + + async def process_url( + self, + url: str, + depth: int, + crawler: AsyncWebCrawler, + queue: asyncio.PriorityQueue, + visited: Set[str], + depths: Dict[str, int] + ) -> Optional[CrawlResult]: + """Process a single URL and extract links. + This is our main URL processing workhorse that: + - Checks for cancellation + - Validates URLs through can_process_url + - Implements politeness delays per domain + - Applies rate limiting + - Handles crawling with retries + - Updates various statistics + - Processes extracted links + - Returns the crawl result or None on failure + """ + + if self._cancel_event.is_set(): + return None + + if not await self.can_process_url(url, depth): + self.stats.urls_skipped += 1 + return None + + # Politeness delay + domain = urlparse(url).netloc + time_since_last = time.time() - self.last_crawl_time[domain] + if time_since_last < self.min_crawl_delay: + await asyncio.sleep(self.min_crawl_delay - time_since_last) + self.last_crawl_time[domain] = time.time() + + # Crawl with rate limiting + try: + async with self.rate_limiter: + result = await self._crawl_with_retry(crawler, url) + self.stats.urls_processed += 1 + # Process links + await self._process_links(result, url, depth, queue, visited, depths) + return result + except Exception as e: + self.logger.error(f"Error crawling {url}: {e}") + self.stats.urls_failed += 1 + return None + + async def _process_links( + self, + result: CrawlResult, + source_url: str, + depth: int, + queue: asyncio.PriorityQueue, + visited: Set[str], + depths: Dict[str, int] + ): + """Process extracted links from crawl result. + This is our link processor that: + Handles both internal and external links + Normalizes URLs (removes fragments) + Checks depth limits + Scores URLs for priority + Updates depth tracking + Adds valid URLs to the queue + Updates maximum depth statistics + """ + links_to_process = result.links["internal"] + if self.process_external_links: + links_to_process += result.links["external"] + for link in links_to_process: + url = link['href'] + if url not in visited: + new_depth = depths[source_url] + 1 + if new_depth <= self.max_depth: + if self.url_scorer: + score = self.url_scorer.score(url) + else: + # When no url_scorer is provided all urls will have same score of 0. + # Therefore will be process in FIFO order as per URL depth + score = 0 + await queue.put((score, new_depth, url)) + depths[url] = new_depth + self.stats.total_depth_reached = max( + self.stats.total_depth_reached, + new_depth + ) + + async def ascrape( + self, + start_url: str, + crawler: AsyncWebCrawler, + parallel_processing: bool = True + ) -> AsyncGenerator[CrawlResult, None]: + """Implement BFS crawling strategy""" + + # Initialize crawl state + """ + queue: A priority queue where items are tuples of (score, depth, url) + Score: Determines crawling priority (lower = higher priority) + Depth: Current distance from start_url + URL: The actual URL to crawl + visited: Keeps track of URLs we've already seen to avoid cycles + depths: Maps URLs to their depths from the start URL + pending_tasks: Tracks currently running crawl tasks + """ + queue = asyncio.PriorityQueue() + await queue.put((0, 0, start_url)) + visited: Set[str] = set() + depths = {start_url: 0} + pending_tasks = set() + + try: + while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set(): + """ + This sets up our main control loop which: + - Continues while there are URLs to process (not queue.empty()) + - Or while there are tasks still running (pending_tasks) + - Can be interrupted via cancellation (not self._cancel_event.is_set()) + """ + # Start new tasks up to max_concurrent + while not queue.empty() and len(pending_tasks) < self.max_concurrent: + """ + This section manages task creation: + Checks if we can start more tasks (under max_concurrent limit) + Gets the next URL from the priority queue + Marks URLs as visited immediately to prevent duplicates + Updates current depth in stats + Either: + Creates a new async task (parallel mode) + Processes URL directly (sequential mode) + """ + _, depth, url = await queue.get() + if url not in visited: + visited.add(url) + self.stats.current_depth = depth + + if parallel_processing: + task = asyncio.create_task( + self.process_url(url, depth, crawler, queue, visited, depths) + ) + pending_tasks.add(task) + else: + result = await self.process_url( + url, depth, crawler, queue, visited, depths + ) + if result: + yield result + + # Process completed tasks + """ + This section manages completed tasks: + Waits for any task to complete using asyncio.wait + Uses FIRST_COMPLETED to handle results as soon as they're ready + Yields successful results to the caller + Updates pending_tasks to remove completed ones + """ + if pending_tasks: + done, pending_tasks = await asyncio.wait( + pending_tasks, + return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + result = await task + if result: + yield result + + except Exception as e: + self.logger.error(f"Error in crawl process: {e}") + raise + + finally: + # Clean up any remaining tasks + for task in pending_tasks: + task.cancel() + self.stats.end_time = datetime.now() + + async def shutdown(self): + """Clean up resources and stop crawling""" + self._cancel_event.set() + # Clear caches and close connections + self.robot_parsers.clear() + self.domain_queues.clear() \ No newline at end of file diff --git a/crawl4ai/scraper/filters.py b/crawl4ai/scraper/filters.py new file mode 100644 index 00000000..df5d13aa --- /dev/null +++ b/crawl4ai/scraper/filters.py @@ -0,0 +1,205 @@ +# from .url_filter import URLFilter, FilterChain +# from .content_type_filter import ContentTypeFilter +# from .url_pattern_filter import URLPatternFilter + +from abc import ABC, abstractmethod +from typing import List, Pattern, Set, Union +import re +from urllib.parse import urlparse +import mimetypes +import logging +from dataclasses import dataclass +import fnmatch + +@dataclass +class FilterStats: + """Statistics for filter applications""" + total_urls: int = 0 + rejected_urls: int = 0 + passed_urls: int = 0 + +class URLFilter(ABC): + """Base class for URL filters""" + + def __init__(self, name: str = None): + self.name = name or self.__class__.__name__ + self.stats = FilterStats() + self.logger = logging.getLogger(f"urlfilter.{self.name}") + + @abstractmethod + def apply(self, url: str) -> bool: + """Apply the filter to a URL""" + pass + + def _update_stats(self, passed: bool): + """Update filter statistics""" + self.stats.total_urls += 1 + if passed: + self.stats.passed_urls += 1 + else: + self.stats.rejected_urls += 1 + +class FilterChain: + """Chain of URL filters.""" + + def __init__(self, filters: List[URLFilter] = None): + self.filters = filters or [] + self.stats = FilterStats() + self.logger = logging.getLogger("urlfilter.chain") + + def add_filter(self, filter_: URLFilter) -> 'FilterChain': + """Add a filter to the chain""" + self.filters.append(filter_) + return self # Enable method chaining + + def apply(self, url: str) -> bool: + """Apply all filters in the chain""" + self.stats.total_urls += 1 + + for filter_ in self.filters: + if not filter_.apply(url): + self.stats.rejected_urls += 1 + self.logger.debug(f"URL {url} rejected by {filter_.name}") + return False + + self.stats.passed_urls += 1 + return True + +class URLPatternFilter(URLFilter): + """Filter URLs based on glob patterns or regex. + + pattern_filter = URLPatternFilter([ + "*.example.com/*", # Glob pattern + "*/article/*", # Path pattern + re.compile(r"blog-\d+") # Regex pattern + ]) + + - Supports glob patterns and regex + - Multiple patterns per filter + - Pattern pre-compilation for performance + """ + + def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]], + use_glob: bool = True): + super().__init__() + self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns + self.use_glob = use_glob + self._compiled_patterns = [] + + for pattern in self.patterns: + if isinstance(pattern, str) and use_glob: + self._compiled_patterns.append(self._glob_to_regex(pattern)) + else: + self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern) + + def _glob_to_regex(self, pattern: str) -> Pattern: + """Convert glob pattern to regex""" + return re.compile(fnmatch.translate(pattern)) + + def apply(self, url: str) -> bool: + """Check if URL matches any of the patterns""" + matches = any(pattern.search(url) for pattern in self._compiled_patterns) + self._update_stats(matches) + return matches + +class ContentTypeFilter(URLFilter): + """Filter URLs based on expected content type. + + content_filter = ContentTypeFilter([ + "text/html", + "application/pdf" + ], check_extension=True) + + - Filter by MIME types + - Extension checking + - Support for multiple content types + """ + + def __init__(self, allowed_types: Union[str, List[str]], + check_extension: bool = True): + super().__init__() + self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types + self.check_extension = check_extension + self._normalize_types() + + def _normalize_types(self): + """Normalize content type strings""" + self.allowed_types = [t.lower() for t in self.allowed_types] + + def _check_extension(self, url: str) -> bool: + """Check URL's file extension""" + ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else '' + if not ext: + return True # No extension, might be dynamic content + + guessed_type = mimetypes.guess_type(url)[0] + return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types) + + def apply(self, url: str) -> bool: + """Check if URL's content type is allowed""" + result = True + if self.check_extension: + result = self._check_extension(url) + self._update_stats(result) + return result + +class DomainFilter(URLFilter): + """Filter URLs based on allowed/blocked domains. + + domain_filter = DomainFilter( + allowed_domains=["example.com", "blog.example.com"], + blocked_domains=["ads.example.com"] + ) + + - Allow/block specific domains + - Subdomain support + - Efficient domain matching + """ + + def __init__(self, allowed_domains: Union[str, List[str]] = None, + blocked_domains: Union[str, List[str]] = None): + super().__init__() + self.allowed_domains = set(self._normalize_domains(allowed_domains)) if allowed_domains else None + self.blocked_domains = set(self._normalize_domains(blocked_domains)) if blocked_domains else set() + + def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]: + """Normalize domain strings""" + if isinstance(domains, str): + domains = [domains] + return [d.lower().strip() for d in domains] + + def _extract_domain(self, url: str) -> str: + """Extract domain from URL""" + return urlparse(url).netloc.lower() + + def apply(self, url: str) -> bool: + """Check if URL's domain is allowed""" + domain = self._extract_domain(url) + + if domain in self.blocked_domains: + self._update_stats(False) + return False + + if self.allowed_domains is not None and domain not in self.allowed_domains: + self._update_stats(False) + return False + + self._update_stats(True) + return True + +# Example usage: +def create_common_filter_chain() -> FilterChain: + """Create a commonly used filter chain""" + return FilterChain([ + URLPatternFilter([ + "*.html", "*.htm", # HTML files + "*/article/*", "*/blog/*" # Common content paths + ]), + ContentTypeFilter([ + "text/html", + "application/xhtml+xml" + ]), + DomainFilter( + blocked_domains=["ads.*", "analytics.*"] + ) + ]) \ No newline at end of file diff --git a/crawl4ai/scraper/models.py b/crawl4ai/scraper/models.py new file mode 100644 index 00000000..735d1d58 --- /dev/null +++ b/crawl4ai/scraper/models.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel +from typing import List, Dict +from ..models import CrawlResult + +class ScraperResult(BaseModel): + url: str + crawled_urls: List[str] + extracted_data: Dict[str,CrawlResult] \ No newline at end of file diff --git a/crawl4ai/scraper/scorers.py b/crawl4ai/scraper/scorers.py new file mode 100644 index 00000000..548b80f0 --- /dev/null +++ b/crawl4ai/scraper/scorers.py @@ -0,0 +1,268 @@ +# from .url_scorer import URLScorer +# from .keyword_relevance_scorer import KeywordRelevanceScorer + +from abc import ABC, abstractmethod +from typing import List, Dict, Optional, Union +from dataclasses import dataclass +from urllib.parse import urlparse, unquote +import re +from collections import defaultdict +import math +import logging + +@dataclass +class ScoringStats: + """Statistics for URL scoring""" + urls_scored: int = 0 + total_score: float = 0.0 + min_score: float = float('inf') + max_score: float = float('-inf') + + def update(self, score: float): + """Update scoring statistics""" + self.urls_scored += 1 + self.total_score += score + self.min_score = min(self.min_score, score) + self.max_score = max(self.max_score, score) + + @property + def average_score(self) -> float: + """Calculate average score""" + return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0 + +class URLScorer(ABC): + """Base class for URL scoring strategies""" + + def __init__(self, weight: float = 1.0, name: str = None): + self.weight = weight + self.name = name or self.__class__.__name__ + self.stats = ScoringStats() + self.logger = logging.getLogger(f"urlscorer.{self.name}") + + @abstractmethod + def _calculate_score(self, url: str) -> float: + """Calculate the raw score for a URL""" + pass + + def score(self, url: str) -> float: + """Calculate the weighted score for a URL""" + raw_score = self._calculate_score(url) + weighted_score = raw_score * self.weight + self.stats.update(weighted_score) + return weighted_score + +class CompositeScorer(URLScorer): + """Combines multiple scorers with weights""" + + def __init__(self, scorers: List[URLScorer], normalize: bool = True): + super().__init__(name="CompositeScorer") + self.scorers = scorers + self.normalize = normalize + + def _calculate_score(self, url: str) -> float: + scores = [scorer.score(url) for scorer in self.scorers] + total_score = sum(scores) + + if self.normalize and scores: + total_score /= len(scores) + + return total_score + +class KeywordRelevanceScorer(URLScorer): + """Score URLs based on keyword relevance. + + keyword_scorer = KeywordRelevanceScorer( + keywords=["python", "programming"], + weight=1.0, + case_sensitive=False + ) + + - Score based on keyword matches + - Case sensitivity options + - Weighted scoring + """ + + def __init__(self, keywords: List[str], weight: float = 1.0, + case_sensitive: bool = False): + super().__init__(weight=weight) + self.keywords = keywords + self.case_sensitive = case_sensitive + self._compile_keywords() + + def _compile_keywords(self): + """Prepare keywords for matching""" + flags = 0 if self.case_sensitive else re.IGNORECASE + self.patterns = [re.compile(re.escape(k), flags) for k in self.keywords] + + def _calculate_score(self, url: str) -> float: + """Calculate score based on keyword matches""" + decoded_url = unquote(url) + total_matches = sum( + 1 for pattern in self.patterns + if pattern.search(decoded_url) + ) + # Normalize score between 0 and 1 + return total_matches / len(self.patterns) if self.patterns else 0.0 + +class PathDepthScorer(URLScorer): + """Score URLs based on their path depth. + + path_scorer = PathDepthScorer( + optimal_depth=3, # Preferred URL depth + weight=0.7 + ) + + - Score based on URL path depth + - Configurable optimal depth + - Diminishing returns for deeper paths + """ + + def __init__(self, optimal_depth: int = 3, weight: float = 1.0): + super().__init__(weight=weight) + self.optimal_depth = optimal_depth + + def _calculate_score(self, url: str) -> float: + """Calculate score based on path depth""" + path = urlparse(url).path + depth = len([x for x in path.split('/') if x]) + + # Score decreases as we move away from optimal depth + distance_from_optimal = abs(depth - self.optimal_depth) + return 1.0 / (1.0 + distance_from_optimal) + +class ContentTypeScorer(URLScorer): + """Score URLs based on content type preferences. + + content_scorer = ContentTypeScorer({ + r'\.html$': 1.0, + r'\.pdf$': 0.8, + r'\.xml$': 0.6 + }) + + - Score based on file types + - Configurable type weights + - Pattern matching support + """ + + def __init__(self, type_weights: Dict[str, float], weight: float = 1.0): + super().__init__(weight=weight) + self.type_weights = type_weights + self._compile_patterns() + + def _compile_patterns(self): + """Prepare content type patterns""" + self.patterns = { + re.compile(pattern): weight + for pattern, weight in self.type_weights.items() + } + + def _calculate_score(self, url: str) -> float: + """Calculate score based on content type matching""" + for pattern, weight in self.patterns.items(): + if pattern.search(url): + return weight + return 0.0 + +class FreshnessScorer(URLScorer): + """Score URLs based on freshness indicators. + + freshness_scorer = FreshnessScorer(weight=0.9) + + Score based on date indicators in URLs + Multiple date format support + Recency weighting""" + + def __init__(self, weight: float = 1.0): + super().__init__(weight=weight) + self.date_patterns = [ + r'/(\d{4})/(\d{2})/(\d{2})/', # yyyy/mm/dd + r'(\d{4})[-_](\d{2})[-_](\d{2})', # yyyy-mm-dd + r'/(\d{4})/', # year only + ] + self._compile_patterns() + + def _compile_patterns(self): + """Prepare date patterns""" + self.compiled_patterns = [re.compile(p) for p in self.date_patterns] + + def _calculate_score(self, url: str) -> float: + """Calculate score based on date indicators""" + for pattern in self.compiled_patterns: + if match := pattern.search(url): + year = int(match.group(1)) + # Score higher for more recent years + return 1.0 - (2024 - year) * 0.1 + return 0.5 # Default score for URLs without dates + +class DomainAuthorityScorer(URLScorer): + """Score URLs based on domain authority. + + authority_scorer = DomainAuthorityScorer({ + "python.org": 1.0, + "github.com": 0.9, + "medium.com": 0.7 + }) + + Score based on domain importance + Configurable domain weights + Default weight for unknown domains""" + + def __init__(self, domain_weights: Dict[str, float], + default_weight: float = 0.5, weight: float = 1.0): + super().__init__(weight=weight) + self.domain_weights = domain_weights + self.default_weight = default_weight + + def _calculate_score(self, url: str) -> float: + """Calculate score based on domain authority""" + domain = urlparse(url).netloc.lower() + return self.domain_weights.get(domain, self.default_weight) + +def create_balanced_scorer() -> CompositeScorer: + """Create a balanced composite scorer""" + return CompositeScorer([ + KeywordRelevanceScorer( + keywords=["article", "blog", "news", "research"], + weight=1.0 + ), + PathDepthScorer( + optimal_depth=3, + weight=0.7 + ), + ContentTypeScorer( + type_weights={ + r'\.html?$': 1.0, + r'\.pdf$': 0.8, + r'\.xml$': 0.6 + }, + weight=0.8 + ), + FreshnessScorer( + weight=0.9 + ) + ]) + +# Example Usage: +""" +# Create a composite scorer +scorer = CompositeScorer([ + KeywordRelevanceScorer(["python", "programming"], weight=1.0), + PathDepthScorer(optimal_depth=2, weight=0.7), + FreshnessScorer(weight=0.8), + DomainAuthorityScorer( + domain_weights={ + "python.org": 1.0, + "github.com": 0.9, + "medium.com": 0.7 + }, + weight=0.9 + ) +]) + +# Score a URL +score = scorer.score("https://python.org/article/2024/01/new-features") + +# Access statistics +print(f"Average score: {scorer.stats.average_score}") +print(f"URLs scored: {scorer.stats.urls_scored}") +""" \ No newline at end of file diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py new file mode 100644 index 00000000..396ea7c4 --- /dev/null +++ b/crawl4ai/scraper/scraper_strategy.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from .models import ScraperResult, CrawlResult +from ..models import CrawlResult +from ..async_webcrawler import AsyncWebCrawler +from typing import Union, AsyncGenerator + +class ScraperStrategy(ABC): + @abstractmethod + async def ascrape( + self, + url: str, + crawler: AsyncWebCrawler, + parallel_processing: bool = True, + stream: bool = False + ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + """Scrape the given URL using the specified crawler. + + Args: + url (str): The starting URL for the scrape. + crawler (AsyncWebCrawler): The web crawler instance. + parallel_processing (bool): Whether to use parallel processing. Defaults to True. + stream (bool): If True, yields individual crawl results as they are ready; + if False, accumulates results and returns a final ScraperResult. + + Yields: + CrawlResult: Individual crawl results if stream is True. + + Returns: + ScraperResult: A summary of the scrape results containing the final extracted data + and the list of crawled URLs if stream is False. + """ + pass + + @abstractmethod + async def can_process_url(self, url: str, depth: int) -> bool: + """Check if URL can be processed based on strategy rules""" + pass + + @abstractmethod + async def shutdown(self): + """Clean up resources used by the strategy""" + pass \ No newline at end of file diff --git a/docs/scraper/async_web_scraper.md b/docs/scraper/async_web_scraper.md new file mode 100644 index 00000000..ca5f749f --- /dev/null +++ b/docs/scraper/async_web_scraper.md @@ -0,0 +1,166 @@ +# AsyncWebScraper: Smart Web Crawling Made Easy + +AsyncWebScraper is a powerful and flexible web scraping tool that makes it easy to collect data from websites efficiently. Whether you need to scrape a few pages or an entire website, AsyncWebScraper handles the complexity of web crawling while giving you fine-grained control over the process. + +## How It Works + +```mermaid +flowchart TB + Start([Start]) --> Init[Initialize AsyncWebScraper\nwith Crawler and Strategy] + Init --> InputURL[Receive URL to scrape] + InputURL --> Decision{Stream or\nCollect?} + + %% Streaming Path + Decision -->|Stream| StreamInit[Initialize Streaming Mode] + StreamInit --> StreamStrategy[Call Strategy.ascrape] + StreamStrategy --> AsyncGen[Create Async Generator] + AsyncGen --> ProcessURL[Process Next URL] + ProcessURL --> FetchContent[Fetch Page Content] + FetchContent --> Extract[Extract Data] + Extract --> YieldResult[Yield CrawlResult] + YieldResult --> CheckMore{More URLs?} + CheckMore -->|Yes| ProcessURL + CheckMore -->|No| StreamEnd([End Stream]) + + %% Collecting Path + Decision -->|Collect| CollectInit[Initialize Collection Mode] + CollectInit --> CollectStrategy[Call Strategy.ascrape] + CollectStrategy --> CollectGen[Create Async Generator] + CollectGen --> ProcessURLColl[Process Next URL] + ProcessURLColl --> FetchContentColl[Fetch Page Content] + FetchContentColl --> ExtractColl[Extract Data] + ExtractColl --> StoreColl[Store in Dictionary] + StoreColl --> CheckMoreColl{More URLs?} + CheckMoreColl -->|Yes| ProcessURLColl + CheckMoreColl -->|No| CreateResult[Create ScraperResult] + CreateResult --> ReturnResult([Return Result]) + + %% Parallel Processing + subgraph Parallel + ProcessURL + FetchContent + Extract + ProcessURLColl + FetchContentColl + ExtractColl + end + + %% Error Handling + FetchContent --> ErrorCheck{Error?} + ErrorCheck -->|Yes| LogError[Log Error] + LogError --> UpdateStats[Update Error Stats] + UpdateStats --> CheckMore + ErrorCheck -->|No| Extract + + FetchContentColl --> ErrorCheckColl{Error?} + ErrorCheckColl -->|Yes| LogErrorColl[Log Error] + LogErrorColl --> UpdateStatsColl[Update Error Stats] + UpdateStatsColl --> CheckMoreColl + ErrorCheckColl -->|No| ExtractColl + + %% Style definitions + classDef process fill:#90caf9,stroke:#000,stroke-width:2px; + classDef decision fill:#fff59d,stroke:#000,stroke-width:2px; + classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px; + classDef start fill:#a5d6a7,stroke:#000,stroke-width:2px; + + class Start,StreamEnd,ReturnResult start; + class Decision,CheckMore,CheckMoreColl,ErrorCheck,ErrorCheckColl decision; + class LogError,LogErrorColl,UpdateStats,UpdateStatsColl error; + class ProcessURL,FetchContent,Extract,ProcessURLColl,FetchContentColl,ExtractColl process; +``` + +AsyncWebScraper uses an intelligent crawling system that can navigate through websites following your specified strategy. It supports two main modes of operation: + +### 1. Streaming Mode +```python +async for result in scraper.ascrape(url, stream=True): + print(f"Found data on {result.url}") + process_data(result.data) +``` +- Perfect for processing large websites +- Memory efficient - handles one page at a time +- Ideal for real-time data processing +- Great for monitoring or continuous scraping tasks + +### 2. Collection Mode +```python +result = await scraper.ascrape(url) +print(f"Scraped {len(result.crawled_urls)} pages") +process_all_data(result.extracted_data) +``` +- Collects all data before returning +- Best for when you need the complete dataset +- Easier to work with for batch processing +- Includes comprehensive statistics + +## Key Features + +- **Smart Crawling**: Automatically follows relevant links while avoiding duplicates +- **Parallel Processing**: Scrapes multiple pages simultaneously for better performance +- **Memory Efficient**: Choose between streaming and collecting based on your needs +- **Error Resilient**: Continues working even if some pages fail to load +- **Progress Tracking**: Monitor the scraping progress in real-time +- **Customizable**: Configure crawling strategy, filters, and scoring to match your needs + +## Quick Start + +```python +from crawl4ai.scraper import AsyncWebScraper, BFSStrategy +from crawl4ai.async_webcrawler import AsyncWebCrawler + +# Initialize the scraper +crawler = AsyncWebCrawler() +strategy = BFSStrategy( + max_depth=2, # How deep to crawl + url_pattern="*.example.com/*" # What URLs to follow +) +scraper = AsyncWebScraper(crawler, strategy) + +# Start scraping +async def main(): + # Collect all results + result = await scraper.ascrape("https://example.com") + print(f"Found {len(result.extracted_data)} pages") + + # Or stream results + async for page in scraper.ascrape("https://example.com", stream=True): + print(f"Processing {page.url}") + +``` + +## Best Practices + +1. **Choose the Right Mode** + - Use streaming for large websites or real-time processing + - Use collecting for smaller sites or when you need the complete dataset + +2. **Configure Depth** + - Start with a small depth (2-3) and increase if needed + - Higher depths mean exponentially more pages to crawl + +3. **Set Appropriate Filters** + - Use URL patterns to stay within relevant sections + - Set content type filters to only process useful pages + +4. **Handle Resources Responsibly** + - Enable parallel processing for faster results + - Consider the target website's capacity + - Implement appropriate delays between requests + +## Common Use Cases + +- **Content Aggregation**: Collect articles, blog posts, or news from multiple pages +- **Data Extraction**: Gather product information, prices, or specifications +- **Site Mapping**: Create a complete map of a website's structure +- **Content Monitoring**: Track changes or updates across multiple pages +- **Data Mining**: Extract and analyze patterns across web pages + +## Advanced Features + +- Custom scoring algorithms for prioritizing important pages +- URL filters for focusing on specific site sections +- Content type filtering for processing only relevant pages +- Progress tracking for monitoring long-running scrapes + +Need more help? Check out our [examples repository](https://github.com/example/crawl4ai/examples) or join our [community Discord](https://discord.gg/example). \ No newline at end of file diff --git a/docs/scraper/bfs_scraper_strategy.md b/docs/scraper/bfs_scraper_strategy.md new file mode 100644 index 00000000..7fe1319c --- /dev/null +++ b/docs/scraper/bfs_scraper_strategy.md @@ -0,0 +1,244 @@ +# BFS Scraper Strategy: Smart Web Traversal + +The BFS (Breadth-First Search) Scraper Strategy provides an intelligent way to traverse websites systematically. It crawls websites level by level, ensuring thorough coverage while respecting web crawling etiquette. + +```mermaid +flowchart TB + Start([Start]) --> Init[Initialize BFS Strategy] + Init --> InitStats[Initialize CrawlStats] + InitStats --> InitQueue[Initialize Priority Queue] + InitQueue --> AddStart[Add Start URL to Queue] + + AddStart --> CheckState{Queue Empty or\nTasks Pending?} + CheckState -->|No| Cleanup[Cleanup & Stats] + Cleanup --> End([End]) + + CheckState -->|Yes| CheckCancel{Cancel\nRequested?} + CheckCancel -->|Yes| Cleanup + + CheckCancel -->|No| CheckConcurrent{Under Max\nConcurrent?} + + CheckConcurrent -->|No| WaitComplete[Wait for Task Completion] + WaitComplete --> YieldResult[Yield Result] + YieldResult --> CheckState + + CheckConcurrent -->|Yes| GetNextURL[Get Next URL from Queue] + + GetNextURL --> ValidateURL{Already\nVisited?} + ValidateURL -->|Yes| CheckState + + ValidateURL -->|No| ProcessURL[Process URL] + + subgraph URL_Processing [URL Processing] + ProcessURL --> CheckValid{URL Valid?} + CheckValid -->|No| UpdateStats[Update Skip Stats] + + CheckValid -->|Yes| CheckRobots{Allowed by\nrobots.txt?} + CheckRobots -->|No| UpdateRobotStats[Update Robot Stats] + + CheckRobots -->|Yes| ApplyDelay[Apply Politeness Delay] + ApplyDelay --> FetchContent[Fetch Content with Rate Limit] + + FetchContent --> CheckError{Error?} + CheckError -->|Yes| Retry{Retry\nNeeded?} + Retry -->|Yes| FetchContent + Retry -->|No| UpdateFailStats[Update Fail Stats] + + CheckError -->|No| ExtractLinks[Extract & Process Links] + ExtractLinks --> ScoreURLs[Score New URLs] + ScoreURLs --> AddToQueue[Add to Priority Queue] + end + + ProcessURL --> CreateTask{Parallel\nProcessing?} + CreateTask -->|Yes| AddTask[Add to Pending Tasks] + CreateTask -->|No| DirectProcess[Process Directly] + + AddTask --> CheckState + DirectProcess --> YieldResult + + UpdateStats --> CheckState + UpdateRobotStats --> CheckState + UpdateFailStats --> CheckState + + classDef process fill:#90caf9,stroke:#000,stroke-width:2px; + classDef decision fill:#fff59d,stroke:#000,stroke-width:2px; + classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px; + classDef stats fill:#a5d6a7,stroke:#000,stroke-width:2px; + + class Start,End stats; + class CheckState,CheckCancel,CheckConcurrent,ValidateURL,CheckValid,CheckRobots,CheckError,Retry,CreateTask decision; + class UpdateStats,UpdateRobotStats,UpdateFailStats,InitStats,Cleanup stats; + class ProcessURL,FetchContent,ExtractLinks,ScoreURLs process; +``` + +## How It Works + +The BFS strategy crawls a website by: +1. Starting from a root URL +2. Processing all URLs at the current depth +3. Moving to URLs at the next depth level +4. Continuing until maximum depth is reached + +This ensures systematic coverage of the website while maintaining control over the crawling process. + +## Key Features + +### 1. Smart URL Processing +```python +strategy = BFSScraperStrategy( + max_depth=2, + filter_chain=my_filters, + url_scorer=my_scorer, + max_concurrent=5 +) +``` +- Controls crawl depth +- Filters unwanted URLs +- Scores URLs for priority +- Manages concurrent requests + +### 2. Polite Crawling +The strategy automatically implements web crawling best practices: +- Respects robots.txt +- Implements rate limiting +- Adds politeness delays +- Manages concurrent requests + +### 3. Link Processing Control +```python +strategy = BFSScraperStrategy( + ..., + process_external_links=False # Only process internal links +) +``` +- Control whether to follow external links +- Default: internal links only +- Enable external links when needed + +## Configuration Options + +| Parameter | Description | Default | +|-----------|-------------|---------| +| max_depth | Maximum crawl depth | Required | +| filter_chain | URL filtering rules | Required | +| url_scorer | URL priority scoring | Required | +| max_concurrent | Max parallel requests | 5 | +| min_crawl_delay | Seconds between requests | 1 | +| process_external_links | Follow external links | False | + +## Best Practices + +1. **Set Appropriate Depth** + - Start with smaller depths (2-3) + - Increase based on needs + - Consider site structure + +2. **Configure Filters** + - Use URL patterns + - Filter by content type + - Avoid unwanted sections + +3. **Tune Performance** + - Adjust max_concurrent + - Set appropriate delays + - Monitor resource usage + +4. **Handle External Links** + - Keep external_links=False for focused crawls + - Enable only when needed + - Consider additional filtering + +## Example Usage + +```python +from crawl4ai.scraper import BFSScraperStrategy +from crawl4ai.scraper.filters import FilterChain +from crawl4ai.scraper.scorers import BasicURLScorer + +# Configure strategy +strategy = BFSScraperStrategy( + max_depth=3, + filter_chain=FilterChain([ + URLPatternFilter("*.example.com/*"), + ContentTypeFilter(["text/html"]) + ]), + url_scorer=BasicURLScorer(), + max_concurrent=5, + min_crawl_delay=1, + process_external_links=False +) + +# Use with AsyncWebScraper +scraper = AsyncWebScraper(crawler, strategy) +results = await scraper.ascrape("https://example.com") +``` + +## Common Use Cases + +### 1. Site Mapping +```python +strategy = BFSScraperStrategy( + max_depth=5, + filter_chain=site_filter, + url_scorer=depth_scorer, + process_external_links=False +) +``` +Perfect for creating complete site maps or understanding site structure. + +### 2. Content Aggregation +```python +strategy = BFSScraperStrategy( + max_depth=2, + filter_chain=content_filter, + url_scorer=relevance_scorer, + max_concurrent=3 +) +``` +Ideal for collecting specific types of content (articles, products, etc.). + +### 3. Link Analysis +```python +strategy = BFSScraperStrategy( + max_depth=1, + filter_chain=link_filter, + url_scorer=link_scorer, + process_external_links=True +) +``` +Useful for analyzing both internal and external link structures. + +## Advanced Features + +### Progress Monitoring +```python +async for result in scraper.ascrape(url): + print(f"Current depth: {strategy.stats.current_depth}") + print(f"Processed URLs: {strategy.stats.urls_processed}") +``` + +### Custom URL Scoring +```python +class CustomScorer(URLScorer): + def score(self, url: str) -> float: + # Lower scores = higher priority + return score_based_on_criteria(url) +``` + +## Troubleshooting + +1. **Slow Crawling** + - Increase max_concurrent + - Adjust min_crawl_delay + - Check network conditions + +2. **Missing Content** + - Verify max_depth + - Check filter settings + - Review URL patterns + +3. **High Resource Usage** + - Reduce max_concurrent + - Increase crawl delay + - Add more specific filters + diff --git a/docs/scraper/filters_scrorers.md b/docs/scraper/filters_scrorers.md new file mode 100644 index 00000000..22b846c6 --- /dev/null +++ b/docs/scraper/filters_scrorers.md @@ -0,0 +1,342 @@ +# URL Filters and Scorers + +The crawl4ai library provides powerful URL filtering and scoring capabilities that help you control and prioritize your web crawling. This guide explains how to use these features effectively. + +```mermaid +flowchart TB + Start([URL Input]) --> Chain[Filter Chain] + + subgraph Chain Process + Chain --> Pattern{URL Pattern\nFilter} + Pattern -->|Match| Content{Content Type\nFilter} + Pattern -->|No Match| Reject1[Reject URL] + + Content -->|Allowed| Domain{Domain\nFilter} + Content -->|Not Allowed| Reject2[Reject URL] + + Domain -->|Allowed| Accept[Accept URL] + Domain -->|Blocked| Reject3[Reject URL] + end + + subgraph Statistics + Pattern --> UpdatePattern[Update Pattern Stats] + Content --> UpdateContent[Update Content Stats] + Domain --> UpdateDomain[Update Domain Stats] + Accept --> UpdateChain[Update Chain Stats] + Reject1 --> UpdateChain + Reject2 --> UpdateChain + Reject3 --> UpdateChain + end + + Accept --> End([End]) + Reject1 --> End + Reject2 --> End + Reject3 --> End + + classDef process fill:#90caf9,stroke:#000,stroke-width:2px; + classDef decision fill:#fff59d,stroke:#000,stroke-width:2px; + classDef reject fill:#ef9a9a,stroke:#000,stroke-width:2px; + classDef accept fill:#a5d6a7,stroke:#000,stroke-width:2px; + + class Start,End accept; + class Pattern,Content,Domain decision; + class Reject1,Reject2,Reject3 reject; + class Chain,UpdatePattern,UpdateContent,UpdateDomain,UpdateChain process; +``` + +## URL Filters + +URL filters help you control which URLs are crawled. Multiple filters can be chained together to create sophisticated filtering rules. + +### Available Filters + +1. **URL Pattern Filter** +```python +pattern_filter = URLPatternFilter([ + "*.example.com/*", # Glob pattern + "*/article/*", # Path pattern + re.compile(r"blog-\d+") # Regex pattern +]) +``` +- Supports glob patterns and regex +- Multiple patterns per filter +- Pattern pre-compilation for performance + +2. **Content Type Filter** +```python +content_filter = ContentTypeFilter([ + "text/html", + "application/pdf" +], check_extension=True) +``` +- Filter by MIME types +- Extension checking +- Support for multiple content types + +3. **Domain Filter** +```python +domain_filter = DomainFilter( + allowed_domains=["example.com", "blog.example.com"], + blocked_domains=["ads.example.com"] +) +``` +- Allow/block specific domains +- Subdomain support +- Efficient domain matching + +### Creating Filter Chains + +```python +# Create and configure a filter chain +filter_chain = FilterChain([ + URLPatternFilter(["*.example.com/*"]), + ContentTypeFilter(["text/html"]), + DomainFilter(blocked_domains=["ads.*"]) +]) + +# Add more filters +filter_chain.add_filter( + URLPatternFilter(["*/article/*"]) +) +``` + +```mermaid +flowchart TB + Start([URL Input]) --> Composite[Composite Scorer] + + subgraph Scoring Process + Composite --> Keywords[Keyword Relevance] + Composite --> Path[Path Depth] + Composite --> Content[Content Type] + Composite --> Fresh[Freshness] + Composite --> Domain[Domain Authority] + + Keywords --> KeywordScore[Calculate Score] + Path --> PathScore[Calculate Score] + Content --> ContentScore[Calculate Score] + Fresh --> FreshScore[Calculate Score] + Domain --> DomainScore[Calculate Score] + + KeywordScore --> Weight1[Apply Weight] + PathScore --> Weight2[Apply Weight] + ContentScore --> Weight3[Apply Weight] + FreshScore --> Weight4[Apply Weight] + DomainScore --> Weight5[Apply Weight] + end + + Weight1 --> Combine[Combine Scores] + Weight2 --> Combine + Weight3 --> Combine + Weight4 --> Combine + Weight5 --> Combine + + Combine --> Normalize{Normalize?} + Normalize -->|Yes| NormalizeScore[Normalize Combined Score] + Normalize -->|No| FinalScore[Final Score] + NormalizeScore --> FinalScore + + FinalScore --> Stats[Update Statistics] + Stats --> End([End]) + + classDef process fill:#90caf9,stroke:#000,stroke-width:2px; + classDef scorer fill:#fff59d,stroke:#000,stroke-width:2px; + classDef calc fill:#a5d6a7,stroke:#000,stroke-width:2px; + classDef decision fill:#ef9a9a,stroke:#000,stroke-width:2px; + + class Start,End calc; + class Keywords,Path,Content,Fresh,Domain scorer; + class KeywordScore,PathScore,ContentScore,FreshScore,DomainScore process; + class Normalize decision; +``` + +## URL Scorers + +URL scorers help prioritize which URLs to crawl first. Higher scores indicate higher priority. + +### Available Scorers + +1. **Keyword Relevance Scorer** +```python +keyword_scorer = KeywordRelevanceScorer( + keywords=["python", "programming"], + weight=1.0, + case_sensitive=False +) +``` +- Score based on keyword matches +- Case sensitivity options +- Weighted scoring + +2. **Path Depth Scorer** +```python +path_scorer = PathDepthScorer( + optimal_depth=3, # Preferred URL depth + weight=0.7 +) +``` +- Score based on URL path depth +- Configurable optimal depth +- Diminishing returns for deeper paths + +3. **Content Type Scorer** +```python +content_scorer = ContentTypeScorer({ + r'\.html$': 1.0, + r'\.pdf$': 0.8, + r'\.xml$': 0.6 +}) +``` +- Score based on file types +- Configurable type weights +- Pattern matching support + +4. **Freshness Scorer** +```python +freshness_scorer = FreshnessScorer(weight=0.9) +``` +- Score based on date indicators in URLs +- Multiple date format support +- Recency weighting + +5. **Domain Authority Scorer** +```python +authority_scorer = DomainAuthorityScorer({ + "python.org": 1.0, + "github.com": 0.9, + "medium.com": 0.7 +}) +``` +- Score based on domain importance +- Configurable domain weights +- Default weight for unknown domains + +### Combining Scorers + +```python +# Create a composite scorer +composite_scorer = CompositeScorer([ + KeywordRelevanceScorer(["python"], weight=1.0), + PathDepthScorer(optimal_depth=2, weight=0.7), + FreshnessScorer(weight=0.8) +], normalize=True) +``` + +## Best Practices + +### Filter Configuration + +1. **Start Restrictive** + ```python + # Begin with strict filters + filter_chain = FilterChain([ + DomainFilter(allowed_domains=["example.com"]), + ContentTypeFilter(["text/html"]) + ]) + ``` + +2. **Layer Filters** + ```python + # Add more specific filters + filter_chain.add_filter( + URLPatternFilter(["*/article/*", "*/blog/*"]) + ) + ``` + +3. **Monitor Filter Statistics** + ```python + # Check filter performance + for filter in filter_chain.filters: + print(f"{filter.name}: {filter.stats.rejected_urls} rejected") + ``` + +### Scorer Configuration + +1. **Balance Weights** + ```python + # Balanced scoring configuration + scorer = create_balanced_scorer() + ``` + +2. **Customize for Content** + ```python + # News site configuration + news_scorer = CompositeScorer([ + KeywordRelevanceScorer(["news", "article"], weight=1.0), + FreshnessScorer(weight=1.0), + PathDepthScorer(optimal_depth=2, weight=0.5) + ]) + ``` + +3. **Monitor Scoring Statistics** + ```python + # Check scoring distribution + print(f"Average score: {scorer.stats.average_score}") + print(f"Score range: {scorer.stats.min_score} - {scorer.stats.max_score}") + ``` + +## Common Use Cases + +### Blog Crawling +```python +blog_config = { + 'filters': FilterChain([ + URLPatternFilter(["*/blog/*", "*/post/*"]), + ContentTypeFilter(["text/html"]) + ]), + 'scorer': CompositeScorer([ + FreshnessScorer(weight=1.0), + KeywordRelevanceScorer(["blog", "article"], weight=0.8) + ]) +} +``` + +### Documentation Sites +```python +docs_config = { + 'filters': FilterChain([ + URLPatternFilter(["*/docs/*", "*/guide/*"]), + ContentTypeFilter(["text/html", "application/pdf"]) + ]), + 'scorer': CompositeScorer([ + PathDepthScorer(optimal_depth=3, weight=1.0), + KeywordRelevanceScorer(["guide", "tutorial"], weight=0.9) + ]) +} +``` + +### E-commerce Sites +```python +ecommerce_config = { + 'filters': FilterChain([ + URLPatternFilter(["*/product/*", "*/category/*"]), + DomainFilter(blocked_domains=["ads.*", "tracker.*"]) + ]), + 'scorer': CompositeScorer([ + PathDepthScorer(optimal_depth=2, weight=1.0), + ContentTypeScorer({ + r'/product/': 1.0, + r'/category/': 0.8 + }) + ]) +} +``` + +## Advanced Topics + +### Custom Filters +```python +class CustomFilter(URLFilter): + def apply(self, url: str) -> bool: + # Your custom filtering logic + return True +``` + +### Custom Scorers +```python +class CustomScorer(URLScorer): + def _calculate_score(self, url: str) -> float: + # Your custom scoring logic + return 1.0 +``` + +For more examples, check our [example repository](https://github.com/example/crawl4ai/examples). \ No newline at end of file diff --git a/docs/scraper/how_to_use.md b/docs/scraper/how_to_use.md new file mode 100644 index 00000000..79f7912f --- /dev/null +++ b/docs/scraper/how_to_use.md @@ -0,0 +1,206 @@ +# Scraper Examples Guide + +This guide provides two complete examples of using the crawl4ai scraper: a basic implementation for simple use cases and an advanced implementation showcasing all features. + +## Basic Example + +The basic example demonstrates a simple blog scraping scenario: + +```python +from crawl4ai.scraper import AsyncWebScraper, BFSScraperStrategy, FilterChain + +# Create simple filter chain +filter_chain = FilterChain([ + URLPatternFilter("*/blog/*"), + ContentTypeFilter(["text/html"]) +]) + +# Initialize strategy +strategy = BFSScraperStrategy( + max_depth=2, + filter_chain=filter_chain, + url_scorer=None, + max_concurrent=3 +) + +# Create and run scraper +crawler = AsyncWebCrawler() +scraper = AsyncWebScraper(crawler, strategy) +result = await scraper.ascrape("https://example.com/blog/") +``` + +### Features Demonstrated +- Basic URL filtering +- Simple content type filtering +- Depth control +- Concurrent request limiting +- Result collection + +## Advanced Example + +The advanced example shows a sophisticated news site scraping setup with all features enabled: + +```python +# Create comprehensive filter chain +filter_chain = FilterChain([ + DomainFilter( + allowed_domains=["example.com"], + blocked_domains=["ads.example.com"] + ), + URLPatternFilter([ + "*/article/*", + re.compile(r"\d{4}/\d{2}/.*") + ]), + ContentTypeFilter(["text/html"]) +]) + +# Create intelligent scorer +scorer = CompositeScorer([ + KeywordRelevanceScorer( + keywords=["news", "breaking"], + weight=1.0 + ), + PathDepthScorer(optimal_depth=3, weight=0.7), + FreshnessScorer(weight=0.9) +]) + +# Initialize advanced strategy +strategy = BFSScraperStrategy( + max_depth=4, + filter_chain=filter_chain, + url_scorer=scorer, + max_concurrent=5 +) +``` + +### Features Demonstrated +1. **Advanced Filtering** + - Domain filtering + - Pattern matching + - Content type control + +2. **Intelligent Scoring** + - Keyword relevance + - Path optimization + - Freshness priority + +3. **Monitoring** + - Progress tracking + - Error handling + - Statistics collection + +4. **Resource Management** + - Concurrent processing + - Rate limiting + - Cleanup handling + +## Running the Examples + +```bash +# Basic usage +python basic_scraper_example.py + +# Advanced usage with logging +PYTHONPATH=. python advanced_scraper_example.py +``` + +## Example Output + +### Basic Example +``` +Crawled 15 pages: +- https://example.com/blog/post1: 24560 bytes +- https://example.com/blog/post2: 18920 bytes +... +``` + +### Advanced Example +``` +INFO: Starting crawl of https://example.com/news/ +INFO: Processed: https://example.com/news/breaking/story1 +DEBUG: KeywordScorer: 0.85 +DEBUG: FreshnessScorer: 0.95 +INFO: Progress: 10 URLs processed +... +INFO: Scraping completed: +INFO: - URLs processed: 50 +INFO: - Errors: 2 +INFO: - Total content size: 1240.50 KB +``` + +## Customization + +### Adding Custom Filters +```python +class CustomFilter(URLFilter): + def apply(self, url: str) -> bool: + # Your custom filtering logic + return True + +filter_chain.add_filter(CustomFilter()) +``` + +### Custom Scoring Logic +```python +class CustomScorer(URLScorer): + def _calculate_score(self, url: str) -> float: + # Your custom scoring logic + return 1.0 + +scorer = CompositeScorer([ + CustomScorer(weight=1.0), + ... +]) +``` + +## Best Practices + +1. **Start Simple** + - Begin with basic filtering + - Add features incrementally + - Test thoroughly at each step + +2. **Monitor Performance** + - Watch memory usage + - Track processing times + - Adjust concurrency as needed + +3. **Handle Errors** + - Implement proper error handling + - Log important events + - Track error statistics + +4. **Optimize Resources** + - Set appropriate delays + - Limit concurrent requests + - Use streaming for large crawls + +## Troubleshooting + +Common issues and solutions: + +1. **Too Many Requests** + ```python + strategy = BFSScraperStrategy( + max_concurrent=3, # Reduce concurrent requests + min_crawl_delay=2 # Increase delay between requests + ) + ``` + +2. **Memory Issues** + ```python + # Use streaming mode for large crawls + async for result in scraper.ascrape(url, stream=True): + process_result(result) + ``` + +3. **Missing Content** + ```python + # Check your filter chain + filter_chain = FilterChain([ + URLPatternFilter("*"), # Broaden patterns + ContentTypeFilter(["*"]) # Accept all content + ]) + ``` + +For more examples and use cases, visit our [GitHub repository](https://github.com/example/crawl4ai/examples). \ No newline at end of file diff --git a/docs/scraper/scraper_quickstart.py b/docs/scraper/scraper_quickstart.py new file mode 100644 index 00000000..f6100e51 --- /dev/null +++ b/docs/scraper/scraper_quickstart.py @@ -0,0 +1,185 @@ +# basic_scraper_example.py +from crawl4ai.scraper import ( + AsyncWebScraper, + BFSScraperStrategy, + FilterChain, + URLPatternFilter, + ContentTypeFilter +) +from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig +import re + +browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) + +async def basic_scraper_example(): + """ + Basic example: Scrape a blog site for articles + - Crawls only HTML pages + - Stays within the blog section + - Collects all results at once + """ + # Create a simple filter chain + filter_chain = FilterChain([ + # Only crawl pages within the blog section + URLPatternFilter("*/tutorial/*"), + # Only process HTML pages + ContentTypeFilter(["text/html"]) + ]) + + # Initialize the strategy with basic configuration + strategy = BFSScraperStrategy( + max_depth=2, # Only go 2 levels deep + filter_chain=filter_chain, + url_scorer=None, # Use default scoring + max_concurrent=3, # Limit concurrent requests + process_external_links=True + ) + + # Create the crawler and scraper + async with AsyncWebCrawler(config=browser_config,verbose=True) as crawler: + scraper = AsyncWebScraper(crawler, strategy) + # Start scraping + try: + result = await scraper.ascrape("https://crawl4ai.com/mkdocs") + + # Process results + print(f"Crawled {len(result.crawled_urls)} pages:") + for url, data in result.extracted_data.items(): + print(f"- {url}: {len(data.html)} bytes") + + except Exception as e: + print(f"Error during scraping: {e}") + +# advanced_scraper_example.py +import logging +from crawl4ai.scraper import ( + AsyncWebScraper, + BFSScraperStrategy, + FilterChain, + URLPatternFilter, + ContentTypeFilter, + DomainFilter, + KeywordRelevanceScorer, + PathDepthScorer, + FreshnessScorer, + CompositeScorer +) +from crawl4ai.async_webcrawler import AsyncWebCrawler + +async def advanced_scraper_example(): + """ + Advanced example: Intelligent news site scraping + - Uses all filter types + - Implements sophisticated scoring + - Streams results + - Includes monitoring and logging + """ + # Set up logging + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger("advanced_scraper") + + # Create sophisticated filter chain + filter_chain = FilterChain([ + # Domain control + DomainFilter( + allowed_domains=["techcrunch.com"], + blocked_domains=["login.techcrunch.com","legal.yahoo.com"] + ), + # URL patterns + URLPatternFilter([ + "*/article/*", + "*/news/*", + "*/blog/*", + re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs + ]), + # Content types + ContentTypeFilter([ + "text/html", + "application/xhtml+xml" + ]) + ]) + + # Create composite scorer + scorer = CompositeScorer([ + # Prioritize by keywords + KeywordRelevanceScorer( + keywords=["news", "breaking", "update", "latest"], + weight=1.0 + ), + # Prefer optimal URL structure + PathDepthScorer( + optimal_depth=3, + weight=0.7 + ), + # Prioritize fresh content + FreshnessScorer(weight=0.9) + ]) + + # Initialize strategy with advanced configuration + strategy = BFSScraperStrategy( + max_depth=2, + filter_chain=filter_chain, + url_scorer=scorer, + max_concurrent=2, + min_crawl_delay=1 + ) + + # Create crawler and scraper + async with AsyncWebCrawler(verbose=True, config=browser_config) as crawler: + scraper = AsyncWebScraper(crawler, strategy) + + # Track statistics + stats = { + 'processed': 0, + 'errors': 0, + 'total_size': 0 + } + + try: + # Use streaming mode + result_generator = await scraper.ascrape("https://techcrunch.com", parallel_processing=True, stream=True) + async for result in result_generator: + stats['processed'] += 1 + + if result.success: + stats['total_size'] += len(result.html) + logger.info(f"Processed: {result.url}") + else: + stats['errors'] += 1 + logger.error(f"Failed to process {result.url}: {result.error_message}") + + # Log progress regularly + if stats['processed'] % 10 == 0: + logger.info(f"Progress: {stats['processed']} URLs processed") + + except Exception as e: + logger.error(f"Scraping error: {e}") + + finally: + # Print final statistics + logger.info("Scraping completed:") + logger.info(f"- URLs processed: {stats['processed']}") + logger.info(f"- Errors: {stats['errors']}") + logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB") + + # Print filter statistics + for filter_ in filter_chain.filters: + logger.info(f"{filter_.name} stats:") + logger.info(f"- Passed: {filter_.stats.passed_urls}") + logger.info(f"- Rejected: {filter_.stats.rejected_urls}") + + # Print scorer statistics + logger.info("Scoring statistics:") + logger.info(f"- Average score: {scorer.stats.average_score:.2f}") + logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}") + +if __name__ == "__main__": + import asyncio + + # Run basic example + print("Running basic scraper example...") + asyncio.run(basic_scraper_example()) + + # Run advanced example + print("\nRunning advanced scraper example...") + asyncio.run(advanced_scraper_example()) \ No newline at end of file diff --git a/tests/test_scraper.py b/tests/test_scraper.py new file mode 100644 index 00000000..a2c7a239 --- /dev/null +++ b/tests/test_scraper.py @@ -0,0 +1,184 @@ +# basic_scraper_example.py +from crawl4ai.scraper import ( + AsyncWebScraper, + BFSScraperStrategy, + FilterChain, + URLPatternFilter, + ContentTypeFilter +) +from crawl4ai.async_webcrawler import AsyncWebCrawler + +async def basic_scraper_example(): + """ + Basic example: Scrape a blog site for articles + - Crawls only HTML pages + - Stays within the blog section + - Collects all results at once + """ + # Create a simple filter chain + filter_chain = FilterChain([ + # Only crawl pages within the blog section + URLPatternFilter("*/blog/*"), + # Only process HTML pages + ContentTypeFilter(["text/html"]) + ]) + + # Initialize the strategy with basic configuration + strategy = BFSScraperStrategy( + max_depth=2, # Only go 2 levels deep + filter_chain=filter_chain, + url_scorer=None, # Use default scoring + max_concurrent=3 # Limit concurrent requests + ) + + # Create the crawler and scraper + crawler = AsyncWebCrawler() + scraper = AsyncWebScraper(crawler, strategy) + + # Start scraping + try: + result = await scraper.ascrape("https://example.com/blog/") + + # Process results + print(f"Crawled {len(result.crawled_urls)} pages:") + for url, data in result.extracted_data.items(): + print(f"- {url}: {len(data.html)} bytes") + + except Exception as e: + print(f"Error during scraping: {e}") + +# advanced_scraper_example.py +import logging +from crawl4ai.scraper import ( + AsyncWebScraper, + BFSScraperStrategy, + FilterChain, + URLPatternFilter, + ContentTypeFilter, + DomainFilter, + KeywordRelevanceScorer, + PathDepthScorer, + FreshnessScorer, + CompositeScorer +) +from crawl4ai.async_webcrawler import AsyncWebCrawler + +async def advanced_scraper_example(): + """ + Advanced example: Intelligent news site scraping + - Uses all filter types + - Implements sophisticated scoring + - Streams results + - Includes monitoring and logging + """ + # Set up logging + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger("advanced_scraper") + + # Create sophisticated filter chain + filter_chain = FilterChain([ + # Domain control + DomainFilter( + allowed_domains=["example.com", "blog.example.com"], + blocked_domains=["ads.example.com", "tracker.example.com"] + ), + # URL patterns + URLPatternFilter([ + "*/article/*", + "*/news/*", + "*/blog/*", + re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs + ]), + # Content types + ContentTypeFilter([ + "text/html", + "application/xhtml+xml" + ]) + ]) + + # Create composite scorer + scorer = CompositeScorer([ + # Prioritize by keywords + KeywordRelevanceScorer( + keywords=["news", "breaking", "update", "latest"], + weight=1.0 + ), + # Prefer optimal URL structure + PathDepthScorer( + optimal_depth=3, + weight=0.7 + ), + # Prioritize fresh content + FreshnessScorer(weight=0.9) + ]) + + # Initialize strategy with advanced configuration + strategy = BFSScraperStrategy( + max_depth=4, + filter_chain=filter_chain, + url_scorer=scorer, + max_concurrent=5, + min_crawl_delay=1 + ) + + # Create crawler and scraper + crawler = AsyncWebCrawler() + scraper = AsyncWebScraper(crawler, strategy) + + # Track statistics + stats = { + 'processed': 0, + 'errors': 0, + 'total_size': 0 + } + + try: + # Use streaming mode + async for result in scraper.ascrape("https://example.com/news/", stream=True): + stats['processed'] += 1 + + if result.success: + stats['total_size'] += len(result.html) + logger.info(f"Processed: {result.url}") + + # Print scoring information + for scorer_name, score in result.scores.items(): + logger.debug(f"{scorer_name}: {score:.2f}") + else: + stats['errors'] += 1 + logger.error(f"Failed to process {result.url}: {result.error_message}") + + # Log progress regularly + if stats['processed'] % 10 == 0: + logger.info(f"Progress: {stats['processed']} URLs processed") + + except Exception as e: + logger.error(f"Scraping error: {e}") + + finally: + # Print final statistics + logger.info("Scraping completed:") + logger.info(f"- URLs processed: {stats['processed']}") + logger.info(f"- Errors: {stats['errors']}") + logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB") + + # Print filter statistics + for filter_ in filter_chain.filters: + logger.info(f"{filter_.name} stats:") + logger.info(f"- Passed: {filter_.stats.passed_urls}") + logger.info(f"- Rejected: {filter_.stats.rejected_urls}") + + # Print scorer statistics + logger.info("Scoring statistics:") + logger.info(f"- Average score: {scorer.stats.average_score:.2f}") + logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}") + +if __name__ == "__main__": + import asyncio + + # Run basic example + print("Running basic scraper example...") + asyncio.run(basic_scraper_example()) + + print("\nRunning advanced scraper example...") + asyncio.run(advanced_scraper_example()) \ No newline at end of file