diff --git a/crawl4ai/config.py b/crawl4ai/config.py index 3e26514a..48536678 100644 --- a/crawl4ai/config.py +++ b/crawl4ai/config.py @@ -84,3 +84,4 @@ SHOW_DEPRECATION_WARNINGS = True SCREENSHOT_HEIGHT_TRESHOLD = 10000 PAGE_TIMEOUT = 60000 DOWNLOAD_PAGE_TIMEOUT = 60000 +SCRAPER_BATCH_SIZE = 5 diff --git a/crawl4ai/scraper/__init__.py b/crawl4ai/scraper/__init__.py index 5af7ad6b..92ef3539 100644 --- a/crawl4ai/scraper/__init__.py +++ b/crawl4ai/scraper/__init__.py @@ -1,5 +1,16 @@ from .async_web_scraper import AsyncWebScraper from .bfs_scraper_strategy import BFSScraperStrategy -from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter, DomainFilter -from .scorers import KeywordRelevanceScorer, PathDepthScorer, FreshnessScorer, CompositeScorer -from .scraper_strategy import ScraperStrategy \ No newline at end of file +from .filters import ( + URLFilter, + FilterChain, + URLPatternFilter, + ContentTypeFilter, + DomainFilter, +) +from .scorers import ( + KeywordRelevanceScorer, + PathDepthScorer, + FreshnessScorer, + CompositeScorer, +) +from .scraper_strategy import ScraperStrategy diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index b5c68a79..fb0d7286 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -6,34 +6,37 @@ import logging from dataclasses import dataclass from contextlib import asynccontextmanager + @dataclass class ScrapingProgress: """Tracks the progress of a scraping operation.""" + processed_urls: int = 0 failed_urls: int = 0 current_url: Optional[str] = None + class AsyncWebScraper: """ A high-level web scraper that combines an async crawler with a scraping strategy. - + Args: crawler (AsyncWebCrawler): The async web crawler implementation strategy (ScraperStrategy): The scraping strategy to use logger (Optional[logging.Logger]): Custom logger for the scraper """ - + def __init__( - self, - crawler: AsyncWebCrawler, + self, + crawler: AsyncWebCrawler, strategy: ScraperStrategy, - logger: Optional[logging.Logger] = None + logger: Optional[logging.Logger] = None, ): if not isinstance(crawler, AsyncWebCrawler): raise TypeError("crawler must be an instance of AsyncWebCrawler") if not isinstance(strategy, ScraperStrategy): raise TypeError("strategy must be an instance of ScraperStrategy") - + self.crawler = crawler self.strategy = strategy self.logger = logger or logging.getLogger(__name__) @@ -55,30 +58,28 @@ class AsyncWebScraper: raise async def ascrape( - self, - url: str, - stream: bool = False + self, url: str, stream: bool = False ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """ Scrape a website starting from the given URL. - + Args: url: Starting URL for scraping stream: If True, yield results as they come; if False, collect all results - + Returns: Either an async generator yielding CrawlResults or a final ScraperResult """ self._progress = ScrapingProgress() # Reset progress - + async with self._error_handling_context(url): if stream: return self._ascrape_yielding(url) return await self._ascrape_collecting(url) async def _ascrape_yielding( - self, - url: str, + self, + url: str, ) -> AsyncGenerator[CrawlResult, None]: """Stream scraping results as they become available.""" try: @@ -92,28 +93,28 @@ class AsyncWebScraper: raise async def _ascrape_collecting( - self, - url: str, + self, + url: str, ) -> ScraperResult: """Collect all scraping results before returning.""" extracted_data = {} - + try: result_generator = self.strategy.ascrape(url, self.crawler) async for res in result_generator: self._progress.processed_urls += 1 self._progress.current_url = res.url extracted_data[res.url] = res - + return ScraperResult( url=url, crawled_urls=list(extracted_data.keys()), extracted_data=extracted_data, stats={ - 'processed_urls': self._progress.processed_urls, - 'failed_urls': self._progress.failed_urls - } + "processed_urls": self._progress.processed_urls, + "failed_urls": self._progress.failed_urls, + }, ) except Exception as e: self.logger.error(f"Error in collecting scrape: {str(e)}") - raise \ No newline at end of file + raise diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 956afbda..212f71c7 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -7,16 +7,19 @@ from urllib.parse import urlparse from urllib.robotparser import RobotFileParser import validators -from crawl4ai.async_configs import CrawlerRunConfig +from ..async_configs import CrawlerRunConfig from .models import CrawlResult from .filters import FilterChain from .scorers import URLScorer from ..async_webcrawler import AsyncWebCrawler from .scraper_strategy import ScraperStrategy +from ..config import SCRAPER_BATCH_SIZE + @dataclass class CrawlStats: """Statistics for the crawling process""" + start_time: datetime urls_processed: int = 0 urls_failed: int = 0 @@ -25,6 +28,7 @@ class CrawlStats: current_depth: int = 0 robots_blocked: int = 0 + class BFSScraperStrategy(ScraperStrategy): """Breadth-First Search scraping strategy with politeness controls""" @@ -34,13 +38,13 @@ class BFSScraperStrategy(ScraperStrategy): filter_chain: FilterChain, url_scorer: URLScorer, process_external_links: bool = False, - logger: Optional[logging.Logger] = None + logger: Optional[logging.Logger] = None, ): self.max_depth = max_depth self.filter_chain = filter_chain self.url_scorer = url_scorer self.logger = logger or logging.getLogger(__name__) - + # Crawl control self.stats = CrawlStats(start_time=datetime.now()) self._cancel_event = asyncio.Event() @@ -74,11 +78,11 @@ class BFSScraperStrategy(ScraperStrategy): async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]: """Get or create robots.txt parser for domain. - This is our robots.txt manager that: - - Uses domain-level caching of robot parsers - - Creates and caches new parsers as needed - - Handles failed robots.txt fetches gracefully - - Returns None if robots.txt can't be fetched, allowing crawling to proceed + This is our robots.txt manager that: + - Uses domain-level caching of robot parsers + - Creates and caches new parsers as needed + - Handles failed robots.txt fetches gracefully + - Returns None if robots.txt can't be fetched, allowing crawling to proceed """ domain = urlparse(url).netloc if domain not in self.robot_parsers: @@ -100,7 +104,7 @@ class BFSScraperStrategy(ScraperStrategy): depth: int, queue: asyncio.PriorityQueue, visited: Set[str], - depths: Dict[str, int] + depths: Dict[str, int], ): """Process extracted links from crawl result. This is our link processor that: @@ -116,7 +120,7 @@ class BFSScraperStrategy(ScraperStrategy): if self.process_external_links: links_to_process += result.links["external"] for link in links_to_process: - url = link['href'] + url = link["href"] if not await self.can_process_url(url, depth): self.stats.urls_skipped += 1 continue @@ -132,8 +136,7 @@ class BFSScraperStrategy(ScraperStrategy): await queue.put((score, new_depth, url)) depths[url] = new_depth self.stats.total_depth_reached = max( - self.stats.total_depth_reached, - new_depth + self.stats.total_depth_reached, new_depth ) async def ascrape( @@ -142,7 +145,7 @@ class BFSScraperStrategy(ScraperStrategy): crawler: AsyncWebCrawler, ) -> AsyncGenerator[CrawlResult, None]: """Implement BFS crawling strategy""" - + # Initialize crawl state """ queue: A priority queue where items are tuples of (score, depth, url) @@ -151,57 +154,76 @@ class BFSScraperStrategy(ScraperStrategy): URL: The actual URL to crawl visited: Keeps track of URLs we've already seen to avoid cycles depths: Maps URLs to their depths from the start URL - pending_tasks: Tracks currently running crawl tasks + active_crawls: Tracks currently running crawl tasks """ queue = asyncio.PriorityQueue() await queue.put((0, 0, start_url)) visited: Set[str] = set() depths = {start_url: 0} - + active_crawls = set() # Track URLs currently being processed try: - while not queue.empty() and not self._cancel_event.is_set(): + while ( + not queue.empty() or active_crawls + ) and not self._cancel_event.is_set(): """ This sets up our main control loop which: - Continues while there are URLs to process (not queue.empty()) - - Or while there are tasks still running (pending_tasks) + - Or while there are active crawls still running (arun_many) - Can be interrupted via cancellation (not self._cancel_event.is_set()) """ - n = 3 + # Collect batch of jobs to process jobs = [] - for _ in range(n): - if self.queue.empty(): - break - jobs.append(await self.queue.get()) - - # Filter jobs directly, ensuring uniqueness and checking against visited - filtered_jobs = [] - for job in jobs: - _, depth, url = job - self.stats.current_depth = depth - if url not in visited: - visited.add(url) - filtered_jobs.append(job) - - crawler_config = CrawlerRunConfig(cache_mode="BYPASS") - async for result in await crawler.arun_many(urls=[url for _, _, url in filtered_jobs], - config=crawler_config.clone(stream=True)): - print(f"Received result for: {result.url} - Success: {result.success}") - source_url, depth = next((url, depth) for _, depth, url in filtered_jobs if url == result.source_url) - await self._process_links(result, source_url, depth, queue, visited, depths) - yield result - + # Fill batch with available jobs + while len(jobs) < SCRAPER_BATCH_SIZE and not queue.empty(): + score, depth, url = await queue.get() + if url not in active_crawls: # Only add if not currently processing + jobs.append((score, depth, url)) + active_crawls.add(url) + self.stats.current_depth = depth + + if not jobs: + # If no jobs but active crawls exist, wait a bit and continue + if active_crawls: + await asyncio.sleep(0.1) + continue + + # Process batch + crawler_config = CrawlerRunConfig(cache_mode="BYPASS", stream=True) + try: + async for result in await crawler.arun_many( + urls=[url for _, _, url in jobs], config=crawler_config + ): + source_url, depth = next( + (url, depth) for _, depth, url in jobs if url == result.url + ) + active_crawls.remove(source_url) # Remove from active set + + if result.success: + await self._process_links( + result, source_url, depth, queue, visited, depths + ) + yield result + else: + self.logger.warning( + f"Failed to crawl {result.url}: {result.error_message}" + ) + except Exception as e: + # Remove failed URLs from active set + for _, _, url in jobs: + active_crawls.discard(url) + self.logger.error(f"Batch processing error: {e}") + # Continue processing other batches + continue + except Exception as e: self.logger.error(f"Error in crawl process: {e}") raise - + finally: - # Clean up any remaining tasks - # for task in pending_tasks: - # task.cancel() self.stats.end_time = datetime.now() async def shutdown(self): """Clean up resources and stop crawling""" self._cancel_event.set() # Clear caches and close connections - self.robot_parsers.clear() \ No newline at end of file + self.robot_parsers.clear() diff --git a/crawl4ai/scraper/filters.py b/crawl4ai/scraper/filters.py index df5d13aa..f0ec77cd 100644 --- a/crawl4ai/scraper/filters.py +++ b/crawl4ai/scraper/filters.py @@ -11,16 +11,19 @@ import logging from dataclasses import dataclass import fnmatch + @dataclass class FilterStats: """Statistics for filter applications""" + total_urls: int = 0 rejected_urls: int = 0 passed_urls: int = 0 + class URLFilter(ABC): """Base class for URL filters""" - + def __init__(self, name: str = None): self.name = name or self.__class__.__name__ self.stats = FilterStats() @@ -39,15 +42,16 @@ class URLFilter(ABC): else: self.stats.rejected_urls += 1 + class FilterChain: """Chain of URL filters.""" - + def __init__(self, filters: List[URLFilter] = None): self.filters = filters or [] self.stats = FilterStats() self.logger = logging.getLogger("urlfilter.chain") - def add_filter(self, filter_: URLFilter) -> 'FilterChain': + def add_filter(self, filter_: URLFilter) -> "FilterChain": """Add a filter to the chain""" self.filters.append(filter_) return self # Enable method chaining @@ -55,19 +59,20 @@ class FilterChain: def apply(self, url: str) -> bool: """Apply all filters in the chain""" self.stats.total_urls += 1 - + for filter_ in self.filters: if not filter_.apply(url): self.stats.rejected_urls += 1 self.logger.debug(f"URL {url} rejected by {filter_.name}") return False - + self.stats.passed_urls += 1 return True + class URLPatternFilter(URLFilter): """Filter URLs based on glob patterns or regex. - + pattern_filter = URLPatternFilter([ "*.example.com/*", # Glob pattern "*/article/*", # Path pattern @@ -76,21 +81,26 @@ class URLPatternFilter(URLFilter): - Supports glob patterns and regex - Multiple patterns per filter - - Pattern pre-compilation for performance + - Pattern pre-compilation for performance """ - - def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]], - use_glob: bool = True): + + def __init__( + self, + patterns: Union[str, Pattern, List[Union[str, Pattern]]], + use_glob: bool = True, + ): super().__init__() self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns self.use_glob = use_glob self._compiled_patterns = [] - + for pattern in self.patterns: if isinstance(pattern, str) and use_glob: self._compiled_patterns.append(self._glob_to_regex(pattern)) else: - self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern) + self._compiled_patterns.append( + re.compile(pattern) if isinstance(pattern, str) else pattern + ) def _glob_to_regex(self, pattern: str) -> Pattern: """Convert glob pattern to regex""" @@ -102,9 +112,10 @@ class URLPatternFilter(URLFilter): self._update_stats(matches) return matches + class ContentTypeFilter(URLFilter): """Filter URLs based on expected content type. - + content_filter = ContentTypeFilter([ "text/html", "application/pdf" @@ -114,11 +125,14 @@ class ContentTypeFilter(URLFilter): - Extension checking - Support for multiple content types """ - - def __init__(self, allowed_types: Union[str, List[str]], - check_extension: bool = True): + + def __init__( + self, allowed_types: Union[str, List[str]], check_extension: bool = True + ): super().__init__() - self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types + self.allowed_types = ( + [allowed_types] if isinstance(allowed_types, str) else allowed_types + ) self.check_extension = check_extension self._normalize_types() @@ -128,12 +142,18 @@ class ContentTypeFilter(URLFilter): def _check_extension(self, url: str) -> bool: """Check URL's file extension""" - ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else '' + ext = ( + urlparse(url).path.split(".")[-1].lower() + if "." in urlparse(url).path + else "" + ) if not ext: return True # No extension, might be dynamic content - + guessed_type = mimetypes.guess_type(url)[0] - return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types) + return any( + allowed in (guessed_type or "").lower() for allowed in self.allowed_types + ) def apply(self, url: str) -> bool: """Check if URL's content type is allowed""" @@ -143,9 +163,10 @@ class ContentTypeFilter(URLFilter): self._update_stats(result) return result + class DomainFilter(URLFilter): """Filter URLs based on allowed/blocked domains. - + domain_filter = DomainFilter( allowed_domains=["example.com", "blog.example.com"], blocked_domains=["ads.example.com"] @@ -155,12 +176,19 @@ class DomainFilter(URLFilter): - Subdomain support - Efficient domain matching """ - - def __init__(self, allowed_domains: Union[str, List[str]] = None, - blocked_domains: Union[str, List[str]] = None): + + def __init__( + self, + allowed_domains: Union[str, List[str]] = None, + blocked_domains: Union[str, List[str]] = None, + ): super().__init__() - self.allowed_domains = set(self._normalize_domains(allowed_domains)) if allowed_domains else None - self.blocked_domains = set(self._normalize_domains(blocked_domains)) if blocked_domains else set() + self.allowed_domains = ( + set(self._normalize_domains(allowed_domains)) if allowed_domains else None + ) + self.blocked_domains = ( + set(self._normalize_domains(blocked_domains)) if blocked_domains else set() + ) def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]: """Normalize domain strings""" @@ -175,31 +203,33 @@ class DomainFilter(URLFilter): def apply(self, url: str) -> bool: """Check if URL's domain is allowed""" domain = self._extract_domain(url) - + if domain in self.blocked_domains: self._update_stats(False) return False - + if self.allowed_domains is not None and domain not in self.allowed_domains: self._update_stats(False) return False - + self._update_stats(True) return True + # Example usage: def create_common_filter_chain() -> FilterChain: """Create a commonly used filter chain""" - return FilterChain([ - URLPatternFilter([ - "*.html", "*.htm", # HTML files - "*/article/*", "*/blog/*" # Common content paths - ]), - ContentTypeFilter([ - "text/html", - "application/xhtml+xml" - ]), - DomainFilter( - blocked_domains=["ads.*", "analytics.*"] - ) - ]) \ No newline at end of file + return FilterChain( + [ + URLPatternFilter( + [ + "*.html", + "*.htm", # HTML files + "*/article/*", + "*/blog/*", # Common content paths + ] + ), + ContentTypeFilter(["text/html", "application/xhtml+xml"]), + DomainFilter(blocked_domains=["ads.*", "analytics.*"]), + ] + ) diff --git a/crawl4ai/scraper/models.py b/crawl4ai/scraper/models.py index 735d1d58..c779f81a 100644 --- a/crawl4ai/scraper/models.py +++ b/crawl4ai/scraper/models.py @@ -2,7 +2,8 @@ from pydantic import BaseModel from typing import List, Dict from ..models import CrawlResult + class ScraperResult(BaseModel): url: str crawled_urls: List[str] - extracted_data: Dict[str,CrawlResult] \ No newline at end of file + extracted_data: Dict[str, CrawlResult] diff --git a/crawl4ai/scraper/scorers.py b/crawl4ai/scraper/scorers.py index 548b80f0..8552aa8e 100644 --- a/crawl4ai/scraper/scorers.py +++ b/crawl4ai/scraper/scorers.py @@ -10,29 +10,32 @@ from collections import defaultdict import math import logging + @dataclass class ScoringStats: """Statistics for URL scoring""" + urls_scored: int = 0 total_score: float = 0.0 - min_score: float = float('inf') - max_score: float = float('-inf') - + min_score: float = float("inf") + max_score: float = float("-inf") + def update(self, score: float): """Update scoring statistics""" self.urls_scored += 1 self.total_score += score self.min_score = min(self.min_score, score) self.max_score = max(self.max_score, score) - + @property def average_score(self) -> float: """Calculate average score""" return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0 + class URLScorer(ABC): """Base class for URL scoring strategies""" - + def __init__(self, weight: float = 1.0, name: str = None): self.weight = weight self.name = name or self.__class__.__name__ @@ -51,9 +54,10 @@ class URLScorer(ABC): self.stats.update(weighted_score) return weighted_score + class CompositeScorer(URLScorer): """Combines multiple scorers with weights""" - + def __init__(self, scorers: List[URLScorer], normalize: bool = True): super().__init__(name="CompositeScorer") self.scorers = scorers @@ -62,12 +66,13 @@ class CompositeScorer(URLScorer): def _calculate_score(self, url: str) -> float: scores = [scorer.score(url) for scorer in self.scorers] total_score = sum(scores) - + if self.normalize and scores: total_score /= len(scores) - + return total_score + class KeywordRelevanceScorer(URLScorer): """Score URLs based on keyword relevance. @@ -81,9 +86,10 @@ class KeywordRelevanceScorer(URLScorer): - Case sensitivity options - Weighted scoring """ - - def __init__(self, keywords: List[str], weight: float = 1.0, - case_sensitive: bool = False): + + def __init__( + self, keywords: List[str], weight: float = 1.0, case_sensitive: bool = False + ): super().__init__(weight=weight) self.keywords = keywords self.case_sensitive = case_sensitive @@ -98,15 +104,15 @@ class KeywordRelevanceScorer(URLScorer): """Calculate score based on keyword matches""" decoded_url = unquote(url) total_matches = sum( - 1 for pattern in self.patterns - if pattern.search(decoded_url) + 1 for pattern in self.patterns if pattern.search(decoded_url) ) # Normalize score between 0 and 1 return total_matches / len(self.patterns) if self.patterns else 0.0 + class PathDepthScorer(URLScorer): """Score URLs based on their path depth. - + path_scorer = PathDepthScorer( optimal_depth=3, # Preferred URL depth weight=0.7 @@ -116,7 +122,7 @@ class PathDepthScorer(URLScorer): - Configurable optimal depth - Diminishing returns for deeper paths """ - + def __init__(self, optimal_depth: int = 3, weight: float = 1.0): super().__init__(weight=weight) self.optimal_depth = optimal_depth @@ -124,15 +130,16 @@ class PathDepthScorer(URLScorer): def _calculate_score(self, url: str) -> float: """Calculate score based on path depth""" path = urlparse(url).path - depth = len([x for x in path.split('/') if x]) - + depth = len([x for x in path.split("/") if x]) + # Score decreases as we move away from optimal depth distance_from_optimal = abs(depth - self.optimal_depth) return 1.0 / (1.0 + distance_from_optimal) + class ContentTypeScorer(URLScorer): """Score URLs based on content type preferences. - + content_scorer = ContentTypeScorer({ r'\.html$': 1.0, r'\.pdf$': 0.8, @@ -143,7 +150,7 @@ class ContentTypeScorer(URLScorer): - Configurable type weights - Pattern matching support """ - + def __init__(self, type_weights: Dict[str, float], weight: float = 1.0): super().__init__(weight=weight) self.type_weights = type_weights @@ -152,8 +159,7 @@ class ContentTypeScorer(URLScorer): def _compile_patterns(self): """Prepare content type patterns""" self.patterns = { - re.compile(pattern): weight - for pattern, weight in self.type_weights.items() + re.compile(pattern): weight for pattern, weight in self.type_weights.items() } def _calculate_score(self, url: str) -> float: @@ -163,21 +169,22 @@ class ContentTypeScorer(URLScorer): return weight return 0.0 + class FreshnessScorer(URLScorer): """Score URLs based on freshness indicators. - + freshness_scorer = FreshnessScorer(weight=0.9) Score based on date indicators in URLs Multiple date format support Recency weighting""" - + def __init__(self, weight: float = 1.0): super().__init__(weight=weight) self.date_patterns = [ - r'/(\d{4})/(\d{2})/(\d{2})/', # yyyy/mm/dd - r'(\d{4})[-_](\d{2})[-_](\d{2})', # yyyy-mm-dd - r'/(\d{4})/', # year only + r"/(\d{4})/(\d{2})/(\d{2})/", # yyyy/mm/dd + r"(\d{4})[-_](\d{2})[-_](\d{2})", # yyyy-mm-dd + r"/(\d{4})/", # year only ] self._compile_patterns() @@ -194,6 +201,7 @@ class FreshnessScorer(URLScorer): return 1.0 - (2024 - year) * 0.1 return 0.5 # Default score for URLs without dates + class DomainAuthorityScorer(URLScorer): """Score URLs based on domain authority. @@ -206,9 +214,13 @@ class DomainAuthorityScorer(URLScorer): Score based on domain importance Configurable domain weights Default weight for unknown domains""" - - def __init__(self, domain_weights: Dict[str, float], - default_weight: float = 0.5, weight: float = 1.0): + + def __init__( + self, + domain_weights: Dict[str, float], + default_weight: float = 0.5, + weight: float = 1.0, + ): super().__init__(weight=weight) self.domain_weights = domain_weights self.default_weight = default_weight @@ -218,29 +230,23 @@ class DomainAuthorityScorer(URLScorer): domain = urlparse(url).netloc.lower() return self.domain_weights.get(domain, self.default_weight) + def create_balanced_scorer() -> CompositeScorer: """Create a balanced composite scorer""" - return CompositeScorer([ - KeywordRelevanceScorer( - keywords=["article", "blog", "news", "research"], - weight=1.0 - ), - PathDepthScorer( - optimal_depth=3, - weight=0.7 - ), - ContentTypeScorer( - type_weights={ - r'\.html?$': 1.0, - r'\.pdf$': 0.8, - r'\.xml$': 0.6 - }, - weight=0.8 - ), - FreshnessScorer( - weight=0.9 - ) - ]) + return CompositeScorer( + [ + KeywordRelevanceScorer( + keywords=["article", "blog", "news", "research"], weight=1.0 + ), + PathDepthScorer(optimal_depth=3, weight=0.7), + ContentTypeScorer( + type_weights={r"\.html?$": 1.0, r"\.pdf$": 0.8, r"\.xml$": 0.6}, + weight=0.8, + ), + FreshnessScorer(weight=0.9), + ] + ) + # Example Usage: """ @@ -265,4 +271,4 @@ score = scorer.score("https://python.org/article/2024/01/new-features") # Access statistics print(f"Average score: {scorer.stats.average_score}") print(f"URLs scored: {scorer.stats.urls_scored}") -""" \ No newline at end of file +""" diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index 396ea7c4..66a60130 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -4,29 +4,28 @@ from ..models import CrawlResult from ..async_webcrawler import AsyncWebCrawler from typing import Union, AsyncGenerator + class ScraperStrategy(ABC): @abstractmethod async def ascrape( - self, - url: str, - crawler: AsyncWebCrawler, - parallel_processing: bool = True, - stream: bool = False + self, + url: str, + crawler: AsyncWebCrawler, + stream: bool = False, ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """Scrape the given URL using the specified crawler. Args: url (str): The starting URL for the scrape. crawler (AsyncWebCrawler): The web crawler instance. - parallel_processing (bool): Whether to use parallel processing. Defaults to True. - stream (bool): If True, yields individual crawl results as they are ready; + stream (bool): If True, yields individual crawl results as they are ready; if False, accumulates results and returns a final ScraperResult. Yields: CrawlResult: Individual crawl results if stream is True. Returns: - ScraperResult: A summary of the scrape results containing the final extracted data + ScraperResult: A summary of the scrape results containing the final extracted data and the list of crawled URLs if stream is False. """ pass @@ -39,4 +38,4 @@ class ScraperStrategy(ABC): @abstractmethod async def shutdown(self): """Clean up resources used by the strategy""" - pass \ No newline at end of file + pass diff --git a/docs/scraper/scraper_quickstart.py b/docs/scraper/scraper_quickstart.py index f6100e51..29afc365 100644 --- a/docs/scraper/scraper_quickstart.py +++ b/docs/scraper/scraper_quickstart.py @@ -4,13 +4,14 @@ from crawl4ai.scraper import ( BFSScraperStrategy, FilterChain, URLPatternFilter, - ContentTypeFilter + ContentTypeFilter, ) from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig import re browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) + async def basic_scraper_example(): """ Basic example: Scrape a blog site for articles @@ -19,37 +20,39 @@ async def basic_scraper_example(): - Collects all results at once """ # Create a simple filter chain - filter_chain = FilterChain([ - # Only crawl pages within the blog section - URLPatternFilter("*/tutorial/*"), - # Only process HTML pages - ContentTypeFilter(["text/html"]) - ]) + filter_chain = FilterChain( + [ + # Only crawl pages within the blog section + URLPatternFilter("*/tutorial/*"), + # Only process HTML pages + ContentTypeFilter(["text/html"]), + ] + ) # Initialize the strategy with basic configuration strategy = BFSScraperStrategy( max_depth=2, # Only go 2 levels deep filter_chain=filter_chain, url_scorer=None, # Use default scoring - max_concurrent=3, # Limit concurrent requests - process_external_links=True + process_external_links=True, ) # Create the crawler and scraper - async with AsyncWebCrawler(config=browser_config,verbose=True) as crawler: + async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler: scraper = AsyncWebScraper(crawler, strategy) # Start scraping try: result = await scraper.ascrape("https://crawl4ai.com/mkdocs") - + # Process results print(f"Crawled {len(result.crawled_urls)} pages:") for url, data in result.extracted_data.items(): print(f"- {url}: {len(data.html)} bytes") - + except Exception as e: print(f"Error during scraping: {e}") + # advanced_scraper_example.py import logging from crawl4ai.scraper import ( @@ -62,10 +65,11 @@ from crawl4ai.scraper import ( KeywordRelevanceScorer, PathDepthScorer, FreshnessScorer, - CompositeScorer + CompositeScorer, ) from crawl4ai.async_webcrawler import AsyncWebCrawler + async def advanced_scraper_example(): """ Advanced example: Intelligent news site scraping @@ -79,49 +83,44 @@ async def advanced_scraper_example(): logger = logging.getLogger("advanced_scraper") # Create sophisticated filter chain - filter_chain = FilterChain([ - # Domain control - DomainFilter( - allowed_domains=["techcrunch.com"], - blocked_domains=["login.techcrunch.com","legal.yahoo.com"] - ), - # URL patterns - URLPatternFilter([ - "*/article/*", - "*/news/*", - "*/blog/*", - re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs - ]), - # Content types - ContentTypeFilter([ - "text/html", - "application/xhtml+xml" - ]) - ]) + filter_chain = FilterChain( + [ + # Domain control + DomainFilter( + allowed_domains=["techcrunch.com"], + blocked_domains=["login.techcrunch.com", "legal.yahoo.com"], + ), + # URL patterns + URLPatternFilter( + [ + "*/article/*", + "*/news/*", + "*/blog/*", + re.compile(r"\d{4}/\d{2}/.*"), # Date-based URLs + ] + ), + # Content types + ContentTypeFilter(["text/html", "application/xhtml+xml"]), + ] + ) # Create composite scorer - scorer = CompositeScorer([ - # Prioritize by keywords - KeywordRelevanceScorer( - keywords=["news", "breaking", "update", "latest"], - weight=1.0 - ), - # Prefer optimal URL structure - PathDepthScorer( - optimal_depth=3, - weight=0.7 - ), - # Prioritize fresh content - FreshnessScorer(weight=0.9) - ]) + scorer = CompositeScorer( + [ + # Prioritize by keywords + KeywordRelevanceScorer( + keywords=["news", "breaking", "update", "latest"], weight=1.0 + ), + # Prefer optimal URL structure + PathDepthScorer(optimal_depth=3, weight=0.7), + # Prioritize fresh content + FreshnessScorer(weight=0.9), + ] + ) # Initialize strategy with advanced configuration strategy = BFSScraperStrategy( - max_depth=2, - filter_chain=filter_chain, - url_scorer=scorer, - max_concurrent=2, - min_crawl_delay=1 + max_depth=2, filter_chain=filter_chain, url_scorer=scorer ) # Create crawler and scraper @@ -129,57 +128,60 @@ async def advanced_scraper_example(): scraper = AsyncWebScraper(crawler, strategy) # Track statistics - stats = { - 'processed': 0, - 'errors': 0, - 'total_size': 0 - } + stats = {"processed": 0, "errors": 0, "total_size": 0} try: # Use streaming mode - result_generator = await scraper.ascrape("https://techcrunch.com", parallel_processing=True, stream=True) + result_generator = await scraper.ascrape( + "https://techcrunch.com", stream=True + ) async for result in result_generator: - stats['processed'] += 1 - + stats["processed"] += 1 + if result.success: - stats['total_size'] += len(result.html) + stats["total_size"] += len(result.html) logger.info(f"Processed: {result.url}") else: - stats['errors'] += 1 - logger.error(f"Failed to process {result.url}: {result.error_message}") + stats["errors"] += 1 + logger.error( + f"Failed to process {result.url}: {result.error_message}" + ) # Log progress regularly - if stats['processed'] % 10 == 0: + if stats["processed"] % 10 == 0: logger.info(f"Progress: {stats['processed']} URLs processed") except Exception as e: logger.error(f"Scraping error: {e}") - + finally: # Print final statistics logger.info("Scraping completed:") logger.info(f"- URLs processed: {stats['processed']}") logger.info(f"- Errors: {stats['errors']}") logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB") - + # Print filter statistics for filter_ in filter_chain.filters: logger.info(f"{filter_.name} stats:") logger.info(f"- Passed: {filter_.stats.passed_urls}") logger.info(f"- Rejected: {filter_.stats.rejected_urls}") - + # Print scorer statistics logger.info("Scoring statistics:") logger.info(f"- Average score: {scorer.stats.average_score:.2f}") - logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}") + logger.info( + f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}" + ) + if __name__ == "__main__": import asyncio - + # Run basic example print("Running basic scraper example...") asyncio.run(basic_scraper_example()) # Run advanced example - print("\nRunning advanced scraper example...") - asyncio.run(advanced_scraper_example()) \ No newline at end of file + # print("\nRunning advanced scraper example...") + # asyncio.run(advanced_scraper_example())