Compare commits

...

20 Commits

Author SHA1 Message Date
UncleCode
0d357ab7d2 feat(scraper): Enhance URL filtering and scoring systems
Implement comprehensive URL filtering and scoring capabilities:

Filters:
- Add URLPatternFilter with glob/regex support
- Implement ContentTypeFilter with MIME type checking
- Add DomainFilter for domain control
- Create FilterChain with stats tracking

Scorers:
- Complete KeywordRelevanceScorer implementation
- Add PathDepthScorer for URL structure scoring
- Implement ContentTypeScorer for file type priorities
- Add FreshnessScorer for date-based scoring
- Add DomainAuthorityScorer for domain weighting
- Create CompositeScorer for combined strategies

Features:
- Add statistics tracking for both filters and scorers
- Implement logging support throughout
- Add resource cleanup methods
- Create comprehensive documentation
- Include performance optimizations

Tests and docs included.
Note: Review URL normalization overlap with recent crawler changes.
2024-11-08 19:02:28 +08:00
UncleCode
bae4665949 feat(scraper): Enhance URL filtering and scoring systems
Implement comprehensive URL filtering and scoring capabilities:

Filters:
- Add URLPatternFilter with glob/regex support
- Implement ContentTypeFilter with MIME type checking
- Add DomainFilter for domain control
- Create FilterChain with stats tracking

Scorers:
- Complete KeywordRelevanceScorer implementation
- Add PathDepthScorer for URL structure scoring
- Implement ContentTypeScorer for file type priorities
- Add FreshnessScorer for date-based scoring
- Add DomainAuthorityScorer for domain weighting
- Create CompositeScorer for combined strategies

Features:
- Add statistics tracking for both filters and scorers
- Implement logging support throughout
- Add resource cleanup methods
- Create comprehensive documentation
- Include performance optimizations

Tests and docs included.
Note: Review URL normalization overlap with recent crawler changes.

- Quick Start is created and added
2024-11-08 18:45:12 +08:00
UncleCode
d11c004fbb Enhanced BFS Strategy: Improved monitoring, resource management & configuration
- Added CrawlStats for comprehensive crawl monitoring
- Implemented proper resource cleanup with shutdown mechanism
- Enhanced URL processing with better validation and politeness controls
- Added configuration options (max_concurrent, timeout, external_links)
- Improved error handling with retry logic
- Added domain-specific queues for better performance
- Created comprehensive documentation

Note: URL normalization needs review - potential duplicate processing
with core crawler for internal links. Currently commented out pending
further investigation of edge cases.
2024-11-08 15:57:23 +08:00
UncleCode
3d1c9a8434 Revieweing the BFS strategy. 2024-11-07 18:54:53 +08:00
UncleCode
be472c624c Refactored AsyncWebScraper to include comprehensive error handling and progress tracking capabilities. Introduced a ScrapingProgress data class to monitor processed and failed URLs. Enhanced scraping methods to log errors and track stats throughout the scraping process. 2024-11-06 21:09:47 +08:00
UncleCode
06b21dcc50 Update .gitignore to include new directories for issues and documentation 2024-11-06 18:44:03 +08:00
UncleCode
0f0f60527d Merge pull request #172 from aravindkarnam/scraper
Scraper
2024-11-06 07:00:44 +01:00
Aravind Karnam
8105fd178e Removed stubs for remove_from_future_crawls since the visited set is updated soon as the URL was queued, Removed add_to_retry_queue(url) since retry with exponential backoff with help of tenacity is going to take care of it. 2024-10-17 15:42:43 +05:30
Aravind Karnam
ce7fce4b16 1. Moved to asyncio.wait instead of gather so that results can be yeilded just as they are ready, rather than in batches
2. Moved the visted.add(url), to before the task is put in queue rather than after the crawl is completed. This makes sure that  duplicate crawls doesn't happen when same URL is found at different depth and that get's queued too because the crawl is not yet completed and visted set is not updated.
3. Named the yield_results attribute to stream instead. Since that seems to be popularly used in all other AI libraries for intermediate results.
2024-10-17 12:25:17 +05:30
Aravind Karnam
de28b59aca removed unused imports 2024-10-16 22:36:48 +05:30
Aravind Karnam
04d8b47b92 Exposed min_crawl_delay for BFSScraperStrategy 2024-10-16 22:34:54 +05:30
Aravind Karnam
2943feeecf 1. Added a flag to yield each crawl result,as they become ready along with the final scraper result as another option
2. Removed ascrape_many method, as I'm currently not focusing on it in the first cut of scraper
3. Added some error handling for cases where robots.txt cannot be fetched or parsed.
2024-10-16 22:05:29 +05:30
Aravind Karnam
8a7d29ce85 updated some comments and removed content type checking functionality from core as it's implemented as a filter 2024-10-16 15:59:37 +05:30
aravind
159bd875bd Merge pull request #5 from aravindkarnam/main
Merging 0.3.6
2024-10-16 10:41:22 +05:30
Aravind Karnam
d743adac68 Fixed some bugs in robots.txt processing 2024-10-03 15:58:57 +05:30
Aravind Karnam
7fe220dbd5 1. Introduced a bool flag to ascrape method to switch between sequential and concurrent processing
2. Introduced a dictionary for depth tracking across various tasks
3. Removed redundancy with crawled_urls variable. Instead created a list with visited set variable in returned object.
2024-10-03 11:17:11 +05:30
aravind
65e013d9d1 Merge pull request #3 from aravindkarnam/main
Merging latest changes from main branch
2024-10-03 09:52:12 +05:30
Aravind Karnam
7f3e2e47ed Parallel processing with retry on failure with exponential backoff - Simplified URL validation and normalisation - respecting Robots.txt 2024-09-19 12:34:12 +05:30
aravind
78f26ac263 Merge pull request #2 from aravindkarnam/staging
Staging
2024-09-18 18:16:23 +05:30
Aravind Karnam
44ce12c62c Created scaffolding for Scraper as per the plan. Implemented the ascrape method in bfs_scraper_strategy 2024-09-09 13:13:34 +05:30
14 changed files with 2292 additions and 1 deletions

7
.gitignore vendored
View File

@@ -202,5 +202,10 @@ todo.md
git_changes.py
git_changes.md
pypi_build.sh
git_issues.py
git_issues.md
.tests/
.tests/
.issues/
.docs/
.issues/

View File

@@ -0,0 +1,3 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy
from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter

View File

@@ -0,0 +1,123 @@
from typing import Union, AsyncGenerator, Optional
from .scraper_strategy import ScraperStrategy
from .models import ScraperResult, CrawlResult
from ..async_webcrawler import AsyncWebCrawler
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
@dataclass
class ScrapingProgress:
"""Tracks the progress of a scraping operation."""
processed_urls: int = 0
failed_urls: int = 0
current_url: Optional[str] = None
class AsyncWebScraper:
"""
A high-level web scraper that combines an async crawler with a scraping strategy.
Args:
crawler (AsyncWebCrawler): The async web crawler implementation
strategy (ScraperStrategy): The scraping strategy to use
logger (Optional[logging.Logger]): Custom logger for the scraper
"""
def __init__(
self,
crawler: AsyncWebCrawler,
strategy: ScraperStrategy,
logger: Optional[logging.Logger] = None
):
if not isinstance(crawler, AsyncWebCrawler):
raise TypeError("crawler must be an instance of AsyncWebCrawler")
if not isinstance(strategy, ScraperStrategy):
raise TypeError("strategy must be an instance of ScraperStrategy")
self.crawler = crawler
self.strategy = strategy
self.logger = logger or logging.getLogger(__name__)
self._progress = ScrapingProgress()
@property
def progress(self) -> ScrapingProgress:
"""Get current scraping progress."""
return self._progress
@asynccontextmanager
async def _error_handling_context(self, url: str):
"""Context manager for handling errors during scraping."""
try:
yield
except Exception as e:
self.logger.error(f"Error scraping {url}: {str(e)}")
self._progress.failed_urls += 1
raise
async def ascrape(
self,
url: str,
parallel_processing: bool = True,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""
Scrape a website starting from the given URL.
Args:
url: Starting URL for scraping
parallel_processing: Whether to process URLs in parallel
stream: If True, yield results as they come; if False, collect all results
Returns:
Either an async generator yielding CrawlResults or a final ScraperResult
"""
self._progress = ScrapingProgress() # Reset progress
async with self._error_handling_context(url):
if stream:
return self._ascrape_yielding(url, parallel_processing)
return await self._ascrape_collecting(url, parallel_processing)
async def _ascrape_yielding(
self,
url: str,
parallel_processing: bool
) -> AsyncGenerator[CrawlResult, None]:
"""Stream scraping results as they become available."""
try:
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
async for res in result_generator:
self._progress.processed_urls += 1
self._progress.current_url = res.url
yield res
except Exception as e:
self.logger.error(f"Error in streaming scrape: {str(e)}")
raise
async def _ascrape_collecting(
self,
url: str,
parallel_processing: bool
) -> ScraperResult:
"""Collect all scraping results before returning."""
extracted_data = {}
try:
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
async for res in result_generator:
self._progress.processed_urls += 1
self._progress.current_url = res.url
extracted_data[res.url] = res
return ScraperResult(
url=url,
crawled_urls=list(extracted_data.keys()),
extracted_data=extracted_data,
stats={
'processed_urls': self._progress.processed_urls,
'failed_urls': self._progress.failed_urls
}
)
except Exception as e:
self.logger.error(f"Error in collecting scrape: {str(e)}")
raise

View File

@@ -0,0 +1,327 @@
from abc import ABC, abstractmethod
from typing import Union, AsyncGenerator, Optional, Dict, Set
from dataclasses import dataclass
from datetime import datetime
import asyncio
import logging
from urllib.parse import urljoin, urlparse, urlunparse
from urllib.robotparser import RobotFileParser
import validators
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
from .models import ScraperResult, CrawlResult
from .filters import FilterChain
from .scorers import URLScorer
from ..async_webcrawler import AsyncWebCrawler
@dataclass
class CrawlStats:
"""Statistics for the crawling process"""
start_time: datetime
urls_processed: int = 0
urls_failed: int = 0
urls_skipped: int = 0
total_depth_reached: int = 0
current_depth: int = 0
robots_blocked: int = 0
class ScraperStrategy(ABC):
"""Base class for scraping strategies"""
@abstractmethod
async def ascrape(
self,
url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Abstract method for scraping implementation"""
pass
@abstractmethod
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on strategy rules"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass
class BFSScraperStrategy(ScraperStrategy):
"""Breadth-First Search scraping strategy with politeness controls"""
def __init__(
self,
max_depth: int,
filter_chain: FilterChain,
url_scorer: URLScorer,
max_concurrent: int = 5,
min_crawl_delay: int = 1,
timeout: int = 30,
logger: Optional[logging.Logger] = None
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
self.min_crawl_delay = min_crawl_delay
self.timeout = timeout
self.logger = logger or logging.getLogger(__name__)
# Crawl control
self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self.process_external_links = False
# Rate limiting and politeness
self.rate_limiter = AsyncLimiter(1, 1)
self.last_crawl_time = defaultdict(float)
self.robot_parsers: Dict[str, RobotFileParser] = {}
self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on robots.txt and filters
This is our gatekeeper method that determines if a URL should be processed. It:
- Validates URL format using the validators library
- Checks robots.txt permissions for the domain
- Applies custom filters from the filter chain
- Updates statistics for blocked URLs
- Returns False early if any check fails
"""
if not validators.url(url):
self.logger.warning(f"Invalid URL: {url}")
return False
robot_parser = await self._get_robot_parser(url)
if robot_parser and not robot_parser.can_fetch("*", url):
self.stats.robots_blocked += 1
self.logger.info(f"Blocked by robots.txt: {url}")
return False
return self.filter_chain.apply(url)
async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]:
"""Get or create robots.txt parser for domain.
This is our robots.txt manager that:
- Uses domain-level caching of robot parsers
- Creates and caches new parsers as needed
- Handles failed robots.txt fetches gracefully
- Returns None if robots.txt can't be fetched, allowing crawling to proceed
"""
domain = urlparse(url).netloc
if domain not in self.robot_parsers:
parser = RobotFileParser()
try:
robots_url = f"{urlparse(url).scheme}://{domain}/robots.txt"
parser.set_url(robots_url)
parser.read()
self.robot_parsers[domain] = parser
except Exception as e:
self.logger.warning(f"Error fetching robots.txt for {domain}: {e}")
return None
return self.robot_parsers[domain]
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10))
async def _crawl_with_retry(
self,
crawler: AsyncWebCrawler,
url: str
) -> CrawlResult:
"""Crawl URL with retry logic"""
try:
async with asyncio.timeout(self.timeout):
return await crawler.arun(url)
except asyncio.TimeoutError:
self.logger.error(f"Timeout crawling {url}")
raise
async def process_url(
self,
url: str,
depth: int,
crawler: AsyncWebCrawler,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int]
) -> Optional[CrawlResult]:
"""Process a single URL and extract links.
This is our main URL processing workhorse that:
- Checks for cancellation
- Validates URLs through can_process_url
- Implements politeness delays per domain
- Applies rate limiting
- Handles crawling with retries
- Updates various statistics
- Processes extracted links
- Returns the crawl result or None on failure
"""
if self._cancel_event.is_set():
return None
if not await self.can_process_url(url):
self.stats.urls_skipped += 1
return None
# Politeness delay
domain = urlparse(url).netloc
time_since_last = time.time() - self.last_crawl_time[domain]
if time_since_last < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last)
self.last_crawl_time[domain] = time.time()
# Crawl with rate limiting
try:
async with self.rate_limiter:
result = await self._crawl_with_retry(crawler, url)
self.stats.urls_processed += 1
except Exception as e:
self.logger.error(f"Error crawling {url}: {e}")
self.stats.urls_failed += 1
return None
# Process links
await self._process_links(result, url, depth, queue, visited, depths)
return result
async def _process_links(
self,
result: CrawlResult,
source_url: str,
depth: int,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int]
):
"""Process extracted links from crawl result.
This is our link processor that:
Handles both internal and external links
Normalizes URLs (removes fragments)
Checks depth limits
Scores URLs for priority
Updates depth tracking
Adds valid URLs to the queue
Updates maximum depth statistics
"""
links_ro_process = result.links["internal"]
if self.process_external_links:
links_ro_process += result.links["external"]
for link_type in links_ro_process:
for link in result.links[link_type]:
url = link['href']
# url = urljoin(source_url, link['href'])
# url = urlunparse(urlparse(url)._replace(fragment=""))
if url not in visited and await self.can_process_url(url):
new_depth = depths[source_url] + 1
if new_depth <= self.max_depth:
score = self.url_scorer.score(url)
await queue.put((score, new_depth, url))
depths[url] = new_depth
self.stats.total_depth_reached = max(
self.stats.total_depth_reached,
new_depth
)
async def ascrape(
self,
start_url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True
) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy"""
# Initialize crawl state
"""
queue: A priority queue where items are tuples of (score, depth, url)
Score: Determines crawling priority (lower = higher priority)
Depth: Current distance from start_url
URL: The actual URL to crawl
visited: Keeps track of URLs we've already seen to avoid cycles
depths: Maps URLs to their depths from the start URL
pending_tasks: Tracks currently running crawl tasks
"""
queue = asyncio.PriorityQueue()
await queue.put((0, 0, start_url))
visited: Set[str] = set()
depths = {start_url: 0}
pending_tasks = set()
try:
while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set():
"""
This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty())
- Or while there are tasks still running (pending_tasks)
- Can be interrupted via cancellation (not self._cancel_event.is_set())
"""
# Start new tasks up to max_concurrent
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
"""
This section manages task creation:
Checks if we can start more tasks (under max_concurrent limit)
Gets the next URL from the priority queue
Marks URLs as visited immediately to prevent duplicates
Updates current depth in stats
Either:
Creates a new async task (parallel mode)
Processes URL directly (sequential mode)
"""
_, depth, url = await queue.get()
if url not in visited:
visited.add(url)
self.stats.current_depth = depth
if parallel_processing:
task = asyncio.create_task(
self.process_url(url, depth, crawler, queue, visited, depths)
)
pending_tasks.add(task)
else:
result = await self.process_url(
url, depth, crawler, queue, visited, depths
)
if result:
yield result
# Process completed tasks
"""
This section manages completed tasks:
Waits for any task to complete using asyncio.wait
Uses FIRST_COMPLETED to handle results as soon as they're ready
Yields successful results to the caller
Updates pending_tasks to remove completed ones
"""
if pending_tasks:
done, pending_tasks = await asyncio.wait(
pending_tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if result:
yield result
except Exception as e:
self.logger.error(f"Error in crawl process: {e}")
raise
finally:
# Clean up any remaining tasks
for task in pending_tasks:
task.cancel()
self.stats.end_time = datetime.now()
async def shutdown(self):
"""Clean up resources and stop crawling"""
self._cancel_event.set()
# Clear caches and close connections
self.robot_parsers.clear()
self.domain_queues.clear()

205
crawl4ai/scraper/filters.py Normal file
View File

@@ -0,0 +1,205 @@
# from .url_filter import URLFilter, FilterChain
# from .content_type_filter import ContentTypeFilter
# from .url_pattern_filter import URLPatternFilter
from abc import ABC, abstractmethod
from typing import List, Pattern, Set, Union
import re
from urllib.parse import urlparse
import mimetypes
import logging
from dataclasses import dataclass
import fnmatch
@dataclass
class FilterStats:
"""Statistics for filter applications"""
total_urls: int = 0
rejected_urls: int = 0
passed_urls: int = 0
class URLFilter(ABC):
"""Base class for URL filters"""
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FilterStats()
self.logger = logging.getLogger(f"urlfilter.{self.name}")
@abstractmethod
def apply(self, url: str) -> bool:
"""Apply the filter to a URL"""
pass
def _update_stats(self, passed: bool):
"""Update filter statistics"""
self.stats.total_urls += 1
if passed:
self.stats.passed_urls += 1
else:
self.stats.rejected_urls += 1
class FilterChain:
"""Chain of URL filters."""
def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or []
self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain")
def add_filter(self, filter_: URLFilter) -> 'FilterChain':
"""Add a filter to the chain"""
self.filters.append(filter_)
return self # Enable method chaining
def apply(self, url: str) -> bool:
"""Apply all filters in the chain"""
self.stats.total_urls += 1
for filter_ in self.filters:
if not filter_.apply(url):
self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False
self.stats.passed_urls += 1
return True
class URLPatternFilter(URLFilter):
"""Filter URLs based on glob patterns or regex.
pattern_filter = URLPatternFilter([
"*.example.com/*", # Glob pattern
"*/article/*", # Path pattern
re.compile(r"blog-\d+") # Regex pattern
])
- Supports glob patterns and regex
- Multiple patterns per filter
- Pattern pre-compilation for performance
"""
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True):
super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob
self._compiled_patterns = []
for pattern in self.patterns:
if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern))
else:
self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern)
def _glob_to_regex(self, pattern: str) -> Pattern:
"""Convert glob pattern to regex"""
return re.compile(fnmatch.translate(pattern))
def apply(self, url: str) -> bool:
"""Check if URL matches any of the patterns"""
matches = any(pattern.search(url) for pattern in self._compiled_patterns)
self._update_stats(matches)
return matches
class ContentTypeFilter(URLFilter):
"""Filter URLs based on expected content type.
content_filter = ContentTypeFilter([
"text/html",
"application/pdf"
], check_extension=True)
- Filter by MIME types
- Extension checking
- Support for multiple content types
"""
def __init__(self, allowed_types: Union[str, List[str]],
check_extension: bool = True):
super().__init__()
self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types
self.check_extension = check_extension
self._normalize_types()
def _normalize_types(self):
"""Normalize content type strings"""
self.allowed_types = [t.lower() for t in self.allowed_types]
def _check_extension(self, url: str) -> bool:
"""Check URL's file extension"""
ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else ''
if not ext:
return True # No extension, might be dynamic content
guessed_type = mimetypes.guess_type(url)[0]
return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types)
def apply(self, url: str) -> bool:
"""Check if URL's content type is allowed"""
result = True
if self.check_extension:
result = self._check_extension(url)
self._update_stats(result)
return result
class DomainFilter(URLFilter):
"""Filter URLs based on allowed/blocked domains.
domain_filter = DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com"]
)
- Allow/block specific domains
- Subdomain support
- Efficient domain matching
"""
def __init__(self, allowed_domains: Union[str, List[str]] = None,
blocked_domains: Union[str, List[str]] = None):
super().__init__()
self.allowed_domains = set(self._normalize_domains(allowed_domains)) if allowed_domains else None
self.blocked_domains = set(self._normalize_domains(blocked_domains)) if blocked_domains else set()
def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]:
"""Normalize domain strings"""
if isinstance(domains, str):
domains = [domains]
return [d.lower().strip() for d in domains]
def _extract_domain(self, url: str) -> str:
"""Extract domain from URL"""
return urlparse(url).netloc.lower()
def apply(self, url: str) -> bool:
"""Check if URL's domain is allowed"""
domain = self._extract_domain(url)
if domain in self.blocked_domains:
self._update_stats(False)
return False
if self.allowed_domains is not None and domain not in self.allowed_domains:
self._update_stats(False)
return False
self._update_stats(True)
return True
# Example usage:
def create_common_filter_chain() -> FilterChain:
"""Create a commonly used filter chain"""
return FilterChain([
URLPatternFilter([
"*.html", "*.htm", # HTML files
"*/article/*", "*/blog/*" # Common content paths
]),
ContentTypeFilter([
"text/html",
"application/xhtml+xml"
]),
DomainFilter(
blocked_domains=["ads.*", "analytics.*"]
)
])

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
from typing import List, Dict
from ..models import CrawlResult
class ScraperResult(BaseModel):
url: str
crawled_urls: List[str]
extracted_data: Dict[str,CrawlResult]

268
crawl4ai/scraper/scorers.py Normal file
View File

@@ -0,0 +1,268 @@
# from .url_scorer import URLScorer
# from .keyword_relevance_scorer import KeywordRelevanceScorer
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Union
from dataclasses import dataclass
from urllib.parse import urlparse, unquote
import re
from collections import defaultdict
import math
import logging
@dataclass
class ScoringStats:
"""Statistics for URL scoring"""
urls_scored: int = 0
total_score: float = 0.0
min_score: float = float('inf')
max_score: float = float('-inf')
def update(self, score: float):
"""Update scoring statistics"""
self.urls_scored += 1
self.total_score += score
self.min_score = min(self.min_score, score)
self.max_score = max(self.max_score, score)
@property
def average_score(self) -> float:
"""Calculate average score"""
return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0
class URLScorer(ABC):
"""Base class for URL scoring strategies"""
def __init__(self, weight: float = 1.0, name: str = None):
self.weight = weight
self.name = name or self.__class__.__name__
self.stats = ScoringStats()
self.logger = logging.getLogger(f"urlscorer.{self.name}")
@abstractmethod
def _calculate_score(self, url: str) -> float:
"""Calculate the raw score for a URL"""
pass
def score(self, url: str) -> float:
"""Calculate the weighted score for a URL"""
raw_score = self._calculate_score(url)
weighted_score = raw_score * self.weight
self.stats.update(weighted_score)
return weighted_score
class CompositeScorer(URLScorer):
"""Combines multiple scorers with weights"""
def __init__(self, scorers: List[URLScorer], normalize: bool = True):
super().__init__(name="CompositeScorer")
self.scorers = scorers
self.normalize = normalize
def _calculate_score(self, url: str) -> float:
scores = [scorer.score(url) for scorer in self.scorers]
total_score = sum(scores)
if self.normalize and scores:
total_score /= len(scores)
return total_score
class KeywordRelevanceScorer(URLScorer):
"""Score URLs based on keyword relevance.
keyword_scorer = KeywordRelevanceScorer(
keywords=["python", "programming"],
weight=1.0,
case_sensitive=False
)
- Score based on keyword matches
- Case sensitivity options
- Weighted scoring
"""
def __init__(self, keywords: List[str], weight: float = 1.0,
case_sensitive: bool = False):
super().__init__(weight=weight)
self.keywords = keywords
self.case_sensitive = case_sensitive
self._compile_keywords()
def _compile_keywords(self):
"""Prepare keywords for matching"""
flags = 0 if self.case_sensitive else re.IGNORECASE
self.patterns = [re.compile(re.escape(k), flags) for k in self.keywords]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on keyword matches"""
decoded_url = unquote(url)
total_matches = sum(
1 for pattern in self.patterns
if pattern.search(decoded_url)
)
# Normalize score between 0 and 1
return total_matches / len(self.patterns) if self.patterns else 0.0
class PathDepthScorer(URLScorer):
"""Score URLs based on their path depth.
path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth
weight=0.7
)
- Score based on URL path depth
- Configurable optimal depth
- Diminishing returns for deeper paths
"""
def __init__(self, optimal_depth: int = 3, weight: float = 1.0):
super().__init__(weight=weight)
self.optimal_depth = optimal_depth
def _calculate_score(self, url: str) -> float:
"""Calculate score based on path depth"""
path = urlparse(url).path
depth = len([x for x in path.split('/') if x])
# Score decreases as we move away from optimal depth
distance_from_optimal = abs(depth - self.optimal_depth)
return 1.0 / (1.0 + distance_from_optimal)
class ContentTypeScorer(URLScorer):
"""Score URLs based on content type preferences.
content_scorer = ContentTypeScorer({
r'\.html$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
})
- Score based on file types
- Configurable type weights
- Pattern matching support
"""
def __init__(self, type_weights: Dict[str, float], weight: float = 1.0):
super().__init__(weight=weight)
self.type_weights = type_weights
self._compile_patterns()
def _compile_patterns(self):
"""Prepare content type patterns"""
self.patterns = {
re.compile(pattern): weight
for pattern, weight in self.type_weights.items()
}
def _calculate_score(self, url: str) -> float:
"""Calculate score based on content type matching"""
for pattern, weight in self.patterns.items():
if pattern.search(url):
return weight
return 0.0
class FreshnessScorer(URLScorer):
"""Score URLs based on freshness indicators.
freshness_scorer = FreshnessScorer(weight=0.9)
Score based on date indicators in URLs
Multiple date format support
Recency weighting"""
def __init__(self, weight: float = 1.0):
super().__init__(weight=weight)
self.date_patterns = [
r'/(\d{4})/(\d{2})/(\d{2})/', # yyyy/mm/dd
r'(\d{4})[-_](\d{2})[-_](\d{2})', # yyyy-mm-dd
r'/(\d{4})/', # year only
]
self._compile_patterns()
def _compile_patterns(self):
"""Prepare date patterns"""
self.compiled_patterns = [re.compile(p) for p in self.date_patterns]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on date indicators"""
for pattern in self.compiled_patterns:
if match := pattern.search(url):
year = int(match.group(1))
# Score higher for more recent years
return 1.0 - (2024 - year) * 0.1
return 0.5 # Default score for URLs without dates
class DomainAuthorityScorer(URLScorer):
"""Score URLs based on domain authority.
authority_scorer = DomainAuthorityScorer({
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
})
Score based on domain importance
Configurable domain weights
Default weight for unknown domains"""
def __init__(self, domain_weights: Dict[str, float],
default_weight: float = 0.5, weight: float = 1.0):
super().__init__(weight=weight)
self.domain_weights = domain_weights
self.default_weight = default_weight
def _calculate_score(self, url: str) -> float:
"""Calculate score based on domain authority"""
domain = urlparse(url).netloc.lower()
return self.domain_weights.get(domain, self.default_weight)
def create_balanced_scorer() -> CompositeScorer:
"""Create a balanced composite scorer"""
return CompositeScorer([
KeywordRelevanceScorer(
keywords=["article", "blog", "news", "research"],
weight=1.0
),
PathDepthScorer(
optimal_depth=3,
weight=0.7
),
ContentTypeScorer(
type_weights={
r'\.html?$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
},
weight=0.8
),
FreshnessScorer(
weight=0.9
)
])
# Example Usage:
"""
# Create a composite scorer
scorer = CompositeScorer([
KeywordRelevanceScorer(["python", "programming"], weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.7),
FreshnessScorer(weight=0.8),
DomainAuthorityScorer(
domain_weights={
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
},
weight=0.9
)
])
# Score a URL
score = scorer.score("https://python.org/article/2024/01/new-features")
# Access statistics
print(f"Average score: {scorer.stats.average_score}")
print(f"URLs scored: {scorer.stats.urls_scored}")
"""

View File

@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod
from .models import ScraperResult, CrawlResult
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator
class ScraperStrategy(ABC):
@abstractmethod
async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler.
Args:
url (str): The starting URL for the scrape.
crawler (AsyncWebCrawler): The web crawler instance.
parallel_processing (bool): Whether to use parallel processing. Defaults to True.
stream (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult.
Yields:
CrawlResult: Individual crawl results if stream is True.
Returns:
ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if stream is False.
"""
pass

View File

@@ -0,0 +1,166 @@
# AsyncWebScraper: Smart Web Crawling Made Easy
AsyncWebScraper is a powerful and flexible web scraping tool that makes it easy to collect data from websites efficiently. Whether you need to scrape a few pages or an entire website, AsyncWebScraper handles the complexity of web crawling while giving you fine-grained control over the process.
## How It Works
```mermaid
flowchart TB
Start([Start]) --> Init[Initialize AsyncWebScraper\nwith Crawler and Strategy]
Init --> InputURL[Receive URL to scrape]
InputURL --> Decision{Stream or\nCollect?}
%% Streaming Path
Decision -->|Stream| StreamInit[Initialize Streaming Mode]
StreamInit --> StreamStrategy[Call Strategy.ascrape]
StreamStrategy --> AsyncGen[Create Async Generator]
AsyncGen --> ProcessURL[Process Next URL]
ProcessURL --> FetchContent[Fetch Page Content]
FetchContent --> Extract[Extract Data]
Extract --> YieldResult[Yield CrawlResult]
YieldResult --> CheckMore{More URLs?}
CheckMore -->|Yes| ProcessURL
CheckMore -->|No| StreamEnd([End Stream])
%% Collecting Path
Decision -->|Collect| CollectInit[Initialize Collection Mode]
CollectInit --> CollectStrategy[Call Strategy.ascrape]
CollectStrategy --> CollectGen[Create Async Generator]
CollectGen --> ProcessURLColl[Process Next URL]
ProcessURLColl --> FetchContentColl[Fetch Page Content]
FetchContentColl --> ExtractColl[Extract Data]
ExtractColl --> StoreColl[Store in Dictionary]
StoreColl --> CheckMoreColl{More URLs?}
CheckMoreColl -->|Yes| ProcessURLColl
CheckMoreColl -->|No| CreateResult[Create ScraperResult]
CreateResult --> ReturnResult([Return Result])
%% Parallel Processing
subgraph Parallel
ProcessURL
FetchContent
Extract
ProcessURLColl
FetchContentColl
ExtractColl
end
%% Error Handling
FetchContent --> ErrorCheck{Error?}
ErrorCheck -->|Yes| LogError[Log Error]
LogError --> UpdateStats[Update Error Stats]
UpdateStats --> CheckMore
ErrorCheck -->|No| Extract
FetchContentColl --> ErrorCheckColl{Error?}
ErrorCheckColl -->|Yes| LogErrorColl[Log Error]
LogErrorColl --> UpdateStatsColl[Update Error Stats]
UpdateStatsColl --> CheckMoreColl
ErrorCheckColl -->|No| ExtractColl
%% Style definitions
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef decision fill:#fff59d,stroke:#000,stroke-width:2px;
classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px;
classDef start fill:#a5d6a7,stroke:#000,stroke-width:2px;
class Start,StreamEnd,ReturnResult start;
class Decision,CheckMore,CheckMoreColl,ErrorCheck,ErrorCheckColl decision;
class LogError,LogErrorColl,UpdateStats,UpdateStatsColl error;
class ProcessURL,FetchContent,Extract,ProcessURLColl,FetchContentColl,ExtractColl process;
```
AsyncWebScraper uses an intelligent crawling system that can navigate through websites following your specified strategy. It supports two main modes of operation:
### 1. Streaming Mode
```python
async for result in scraper.ascrape(url, stream=True):
print(f"Found data on {result.url}")
process_data(result.data)
```
- Perfect for processing large websites
- Memory efficient - handles one page at a time
- Ideal for real-time data processing
- Great for monitoring or continuous scraping tasks
### 2. Collection Mode
```python
result = await scraper.ascrape(url)
print(f"Scraped {len(result.crawled_urls)} pages")
process_all_data(result.extracted_data)
```
- Collects all data before returning
- Best for when you need the complete dataset
- Easier to work with for batch processing
- Includes comprehensive statistics
## Key Features
- **Smart Crawling**: Automatically follows relevant links while avoiding duplicates
- **Parallel Processing**: Scrapes multiple pages simultaneously for better performance
- **Memory Efficient**: Choose between streaming and collecting based on your needs
- **Error Resilient**: Continues working even if some pages fail to load
- **Progress Tracking**: Monitor the scraping progress in real-time
- **Customizable**: Configure crawling strategy, filters, and scoring to match your needs
## Quick Start
```python
from crawl4ai.scraper import AsyncWebScraper, BFSStrategy
from crawl4ai.async_webcrawler import AsyncWebCrawler
# Initialize the scraper
crawler = AsyncWebCrawler()
strategy = BFSStrategy(
max_depth=2, # How deep to crawl
url_pattern="*.example.com/*" # What URLs to follow
)
scraper = AsyncWebScraper(crawler, strategy)
# Start scraping
async def main():
# Collect all results
result = await scraper.ascrape("https://example.com")
print(f"Found {len(result.extracted_data)} pages")
# Or stream results
async for page in scraper.ascrape("https://example.com", stream=True):
print(f"Processing {page.url}")
```
## Best Practices
1. **Choose the Right Mode**
- Use streaming for large websites or real-time processing
- Use collecting for smaller sites or when you need the complete dataset
2. **Configure Depth**
- Start with a small depth (2-3) and increase if needed
- Higher depths mean exponentially more pages to crawl
3. **Set Appropriate Filters**
- Use URL patterns to stay within relevant sections
- Set content type filters to only process useful pages
4. **Handle Resources Responsibly**
- Enable parallel processing for faster results
- Consider the target website's capacity
- Implement appropriate delays between requests
## Common Use Cases
- **Content Aggregation**: Collect articles, blog posts, or news from multiple pages
- **Data Extraction**: Gather product information, prices, or specifications
- **Site Mapping**: Create a complete map of a website's structure
- **Content Monitoring**: Track changes or updates across multiple pages
- **Data Mining**: Extract and analyze patterns across web pages
## Advanced Features
- Custom scoring algorithms for prioritizing important pages
- URL filters for focusing on specific site sections
- Content type filtering for processing only relevant pages
- Progress tracking for monitoring long-running scrapes
Need more help? Check out our [examples repository](https://github.com/example/crawl4ai/examples) or join our [community Discord](https://discord.gg/example).

View File

@@ -0,0 +1,244 @@
# BFS Scraper Strategy: Smart Web Traversal
The BFS (Breadth-First Search) Scraper Strategy provides an intelligent way to traverse websites systematically. It crawls websites level by level, ensuring thorough coverage while respecting web crawling etiquette.
```mermaid
flowchart TB
Start([Start]) --> Init[Initialize BFS Strategy]
Init --> InitStats[Initialize CrawlStats]
InitStats --> InitQueue[Initialize Priority Queue]
InitQueue --> AddStart[Add Start URL to Queue]
AddStart --> CheckState{Queue Empty or\nTasks Pending?}
CheckState -->|No| Cleanup[Cleanup & Stats]
Cleanup --> End([End])
CheckState -->|Yes| CheckCancel{Cancel\nRequested?}
CheckCancel -->|Yes| Cleanup
CheckCancel -->|No| CheckConcurrent{Under Max\nConcurrent?}
CheckConcurrent -->|No| WaitComplete[Wait for Task Completion]
WaitComplete --> YieldResult[Yield Result]
YieldResult --> CheckState
CheckConcurrent -->|Yes| GetNextURL[Get Next URL from Queue]
GetNextURL --> ValidateURL{Already\nVisited?}
ValidateURL -->|Yes| CheckState
ValidateURL -->|No| ProcessURL[Process URL]
subgraph URL_Processing [URL Processing]
ProcessURL --> CheckValid{URL Valid?}
CheckValid -->|No| UpdateStats[Update Skip Stats]
CheckValid -->|Yes| CheckRobots{Allowed by\nrobots.txt?}
CheckRobots -->|No| UpdateRobotStats[Update Robot Stats]
CheckRobots -->|Yes| ApplyDelay[Apply Politeness Delay]
ApplyDelay --> FetchContent[Fetch Content with Rate Limit]
FetchContent --> CheckError{Error?}
CheckError -->|Yes| Retry{Retry\nNeeded?}
Retry -->|Yes| FetchContent
Retry -->|No| UpdateFailStats[Update Fail Stats]
CheckError -->|No| ExtractLinks[Extract & Process Links]
ExtractLinks --> ScoreURLs[Score New URLs]
ScoreURLs --> AddToQueue[Add to Priority Queue]
end
ProcessURL --> CreateTask{Parallel\nProcessing?}
CreateTask -->|Yes| AddTask[Add to Pending Tasks]
CreateTask -->|No| DirectProcess[Process Directly]
AddTask --> CheckState
DirectProcess --> YieldResult
UpdateStats --> CheckState
UpdateRobotStats --> CheckState
UpdateFailStats --> CheckState
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef decision fill:#fff59d,stroke:#000,stroke-width:2px;
classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px;
classDef stats fill:#a5d6a7,stroke:#000,stroke-width:2px;
class Start,End stats;
class CheckState,CheckCancel,CheckConcurrent,ValidateURL,CheckValid,CheckRobots,CheckError,Retry,CreateTask decision;
class UpdateStats,UpdateRobotStats,UpdateFailStats,InitStats,Cleanup stats;
class ProcessURL,FetchContent,ExtractLinks,ScoreURLs process;
```
## How It Works
The BFS strategy crawls a website by:
1. Starting from a root URL
2. Processing all URLs at the current depth
3. Moving to URLs at the next depth level
4. Continuing until maximum depth is reached
This ensures systematic coverage of the website while maintaining control over the crawling process.
## Key Features
### 1. Smart URL Processing
```python
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=my_filters,
url_scorer=my_scorer,
max_concurrent=5
)
```
- Controls crawl depth
- Filters unwanted URLs
- Scores URLs for priority
- Manages concurrent requests
### 2. Polite Crawling
The strategy automatically implements web crawling best practices:
- Respects robots.txt
- Implements rate limiting
- Adds politeness delays
- Manages concurrent requests
### 3. Link Processing Control
```python
strategy = BFSScraperStrategy(
...,
process_external_links=False # Only process internal links
)
```
- Control whether to follow external links
- Default: internal links only
- Enable external links when needed
## Configuration Options
| Parameter | Description | Default |
|-----------|-------------|---------|
| max_depth | Maximum crawl depth | Required |
| filter_chain | URL filtering rules | Required |
| url_scorer | URL priority scoring | Required |
| max_concurrent | Max parallel requests | 5 |
| min_crawl_delay | Seconds between requests | 1 |
| process_external_links | Follow external links | False |
## Best Practices
1. **Set Appropriate Depth**
- Start with smaller depths (2-3)
- Increase based on needs
- Consider site structure
2. **Configure Filters**
- Use URL patterns
- Filter by content type
- Avoid unwanted sections
3. **Tune Performance**
- Adjust max_concurrent
- Set appropriate delays
- Monitor resource usage
4. **Handle External Links**
- Keep external_links=False for focused crawls
- Enable only when needed
- Consider additional filtering
## Example Usage
```python
from crawl4ai.scraper import BFSScraperStrategy
from crawl4ai.scraper.filters import FilterChain
from crawl4ai.scraper.scorers import BasicURLScorer
# Configure strategy
strategy = BFSScraperStrategy(
max_depth=3,
filter_chain=FilterChain([
URLPatternFilter("*.example.com/*"),
ContentTypeFilter(["text/html"])
]),
url_scorer=BasicURLScorer(),
max_concurrent=5,
min_crawl_delay=1,
process_external_links=False
)
# Use with AsyncWebScraper
scraper = AsyncWebScraper(crawler, strategy)
results = await scraper.ascrape("https://example.com")
```
## Common Use Cases
### 1. Site Mapping
```python
strategy = BFSScraperStrategy(
max_depth=5,
filter_chain=site_filter,
url_scorer=depth_scorer,
process_external_links=False
)
```
Perfect for creating complete site maps or understanding site structure.
### 2. Content Aggregation
```python
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=content_filter,
url_scorer=relevance_scorer,
max_concurrent=3
)
```
Ideal for collecting specific types of content (articles, products, etc.).
### 3. Link Analysis
```python
strategy = BFSScraperStrategy(
max_depth=1,
filter_chain=link_filter,
url_scorer=link_scorer,
process_external_links=True
)
```
Useful for analyzing both internal and external link structures.
## Advanced Features
### Progress Monitoring
```python
async for result in scraper.ascrape(url):
print(f"Current depth: {strategy.stats.current_depth}")
print(f"Processed URLs: {strategy.stats.urls_processed}")
```
### Custom URL Scoring
```python
class CustomScorer(URLScorer):
def score(self, url: str) -> float:
# Lower scores = higher priority
return score_based_on_criteria(url)
```
## Troubleshooting
1. **Slow Crawling**
- Increase max_concurrent
- Adjust min_crawl_delay
- Check network conditions
2. **Missing Content**
- Verify max_depth
- Check filter settings
- Review URL patterns
3. **High Resource Usage**
- Reduce max_concurrent
- Increase crawl delay
- Add more specific filters

View File

@@ -0,0 +1,342 @@
# URL Filters and Scorers
The crawl4ai library provides powerful URL filtering and scoring capabilities that help you control and prioritize your web crawling. This guide explains how to use these features effectively.
```mermaid
flowchart TB
Start([URL Input]) --> Chain[Filter Chain]
subgraph Chain Process
Chain --> Pattern{URL Pattern\nFilter}
Pattern -->|Match| Content{Content Type\nFilter}
Pattern -->|No Match| Reject1[Reject URL]
Content -->|Allowed| Domain{Domain\nFilter}
Content -->|Not Allowed| Reject2[Reject URL]
Domain -->|Allowed| Accept[Accept URL]
Domain -->|Blocked| Reject3[Reject URL]
end
subgraph Statistics
Pattern --> UpdatePattern[Update Pattern Stats]
Content --> UpdateContent[Update Content Stats]
Domain --> UpdateDomain[Update Domain Stats]
Accept --> UpdateChain[Update Chain Stats]
Reject1 --> UpdateChain
Reject2 --> UpdateChain
Reject3 --> UpdateChain
end
Accept --> End([End])
Reject1 --> End
Reject2 --> End
Reject3 --> End
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef decision fill:#fff59d,stroke:#000,stroke-width:2px;
classDef reject fill:#ef9a9a,stroke:#000,stroke-width:2px;
classDef accept fill:#a5d6a7,stroke:#000,stroke-width:2px;
class Start,End accept;
class Pattern,Content,Domain decision;
class Reject1,Reject2,Reject3 reject;
class Chain,UpdatePattern,UpdateContent,UpdateDomain,UpdateChain process;
```
## URL Filters
URL filters help you control which URLs are crawled. Multiple filters can be chained together to create sophisticated filtering rules.
### Available Filters
1. **URL Pattern Filter**
```python
pattern_filter = URLPatternFilter([
"*.example.com/*", # Glob pattern
"*/article/*", # Path pattern
re.compile(r"blog-\d+") # Regex pattern
])
```
- Supports glob patterns and regex
- Multiple patterns per filter
- Pattern pre-compilation for performance
2. **Content Type Filter**
```python
content_filter = ContentTypeFilter([
"text/html",
"application/pdf"
], check_extension=True)
```
- Filter by MIME types
- Extension checking
- Support for multiple content types
3. **Domain Filter**
```python
domain_filter = DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com"]
)
```
- Allow/block specific domains
- Subdomain support
- Efficient domain matching
### Creating Filter Chains
```python
# Create and configure a filter chain
filter_chain = FilterChain([
URLPatternFilter(["*.example.com/*"]),
ContentTypeFilter(["text/html"]),
DomainFilter(blocked_domains=["ads.*"])
])
# Add more filters
filter_chain.add_filter(
URLPatternFilter(["*/article/*"])
)
```
```mermaid
flowchart TB
Start([URL Input]) --> Composite[Composite Scorer]
subgraph Scoring Process
Composite --> Keywords[Keyword Relevance]
Composite --> Path[Path Depth]
Composite --> Content[Content Type]
Composite --> Fresh[Freshness]
Composite --> Domain[Domain Authority]
Keywords --> KeywordScore[Calculate Score]
Path --> PathScore[Calculate Score]
Content --> ContentScore[Calculate Score]
Fresh --> FreshScore[Calculate Score]
Domain --> DomainScore[Calculate Score]
KeywordScore --> Weight1[Apply Weight]
PathScore --> Weight2[Apply Weight]
ContentScore --> Weight3[Apply Weight]
FreshScore --> Weight4[Apply Weight]
DomainScore --> Weight5[Apply Weight]
end
Weight1 --> Combine[Combine Scores]
Weight2 --> Combine
Weight3 --> Combine
Weight4 --> Combine
Weight5 --> Combine
Combine --> Normalize{Normalize?}
Normalize -->|Yes| NormalizeScore[Normalize Combined Score]
Normalize -->|No| FinalScore[Final Score]
NormalizeScore --> FinalScore
FinalScore --> Stats[Update Statistics]
Stats --> End([End])
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef scorer fill:#fff59d,stroke:#000,stroke-width:2px;
classDef calc fill:#a5d6a7,stroke:#000,stroke-width:2px;
classDef decision fill:#ef9a9a,stroke:#000,stroke-width:2px;
class Start,End calc;
class Keywords,Path,Content,Fresh,Domain scorer;
class KeywordScore,PathScore,ContentScore,FreshScore,DomainScore process;
class Normalize decision;
```
## URL Scorers
URL scorers help prioritize which URLs to crawl first. Higher scores indicate higher priority.
### Available Scorers
1. **Keyword Relevance Scorer**
```python
keyword_scorer = KeywordRelevanceScorer(
keywords=["python", "programming"],
weight=1.0,
case_sensitive=False
)
```
- Score based on keyword matches
- Case sensitivity options
- Weighted scoring
2. **Path Depth Scorer**
```python
path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth
weight=0.7
)
```
- Score based on URL path depth
- Configurable optimal depth
- Diminishing returns for deeper paths
3. **Content Type Scorer**
```python
content_scorer = ContentTypeScorer({
r'\.html$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
})
```
- Score based on file types
- Configurable type weights
- Pattern matching support
4. **Freshness Scorer**
```python
freshness_scorer = FreshnessScorer(weight=0.9)
```
- Score based on date indicators in URLs
- Multiple date format support
- Recency weighting
5. **Domain Authority Scorer**
```python
authority_scorer = DomainAuthorityScorer({
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
})
```
- Score based on domain importance
- Configurable domain weights
- Default weight for unknown domains
### Combining Scorers
```python
# Create a composite scorer
composite_scorer = CompositeScorer([
KeywordRelevanceScorer(["python"], weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.7),
FreshnessScorer(weight=0.8)
], normalize=True)
```
## Best Practices
### Filter Configuration
1. **Start Restrictive**
```python
# Begin with strict filters
filter_chain = FilterChain([
DomainFilter(allowed_domains=["example.com"]),
ContentTypeFilter(["text/html"])
])
```
2. **Layer Filters**
```python
# Add more specific filters
filter_chain.add_filter(
URLPatternFilter(["*/article/*", "*/blog/*"])
)
```
3. **Monitor Filter Statistics**
```python
# Check filter performance
for filter in filter_chain.filters:
print(f"{filter.name}: {filter.stats.rejected_urls} rejected")
```
### Scorer Configuration
1. **Balance Weights**
```python
# Balanced scoring configuration
scorer = create_balanced_scorer()
```
2. **Customize for Content**
```python
# News site configuration
news_scorer = CompositeScorer([
KeywordRelevanceScorer(["news", "article"], weight=1.0),
FreshnessScorer(weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.5)
])
```
3. **Monitor Scoring Statistics**
```python
# Check scoring distribution
print(f"Average score: {scorer.stats.average_score}")
print(f"Score range: {scorer.stats.min_score} - {scorer.stats.max_score}")
```
## Common Use Cases
### Blog Crawling
```python
blog_config = {
'filters': FilterChain([
URLPatternFilter(["*/blog/*", "*/post/*"]),
ContentTypeFilter(["text/html"])
]),
'scorer': CompositeScorer([
FreshnessScorer(weight=1.0),
KeywordRelevanceScorer(["blog", "article"], weight=0.8)
])
}
```
### Documentation Sites
```python
docs_config = {
'filters': FilterChain([
URLPatternFilter(["*/docs/*", "*/guide/*"]),
ContentTypeFilter(["text/html", "application/pdf"])
]),
'scorer': CompositeScorer([
PathDepthScorer(optimal_depth=3, weight=1.0),
KeywordRelevanceScorer(["guide", "tutorial"], weight=0.9)
])
}
```
### E-commerce Sites
```python
ecommerce_config = {
'filters': FilterChain([
URLPatternFilter(["*/product/*", "*/category/*"]),
DomainFilter(blocked_domains=["ads.*", "tracker.*"])
]),
'scorer': CompositeScorer([
PathDepthScorer(optimal_depth=2, weight=1.0),
ContentTypeScorer({
r'/product/': 1.0,
r'/category/': 0.8
})
])
}
```
## Advanced Topics
### Custom Filters
```python
class CustomFilter(URLFilter):
def apply(self, url: str) -> bool:
# Your custom filtering logic
return True
```
### Custom Scorers
```python
class CustomScorer(URLScorer):
def _calculate_score(self, url: str) -> float:
# Your custom scoring logic
return 1.0
```
For more examples, check our [example repository](https://github.com/example/crawl4ai/examples).

206
docs/scrapper/how_to_use.md Normal file
View File

@@ -0,0 +1,206 @@
# Scraper Examples Guide
This guide provides two complete examples of using the crawl4ai scraper: a basic implementation for simple use cases and an advanced implementation showcasing all features.
## Basic Example
The basic example demonstrates a simple blog scraping scenario:
```python
from crawl4ai.scraper import AsyncWebScraper, BFSScraperStrategy, FilterChain
# Create simple filter chain
filter_chain = FilterChain([
URLPatternFilter("*/blog/*"),
ContentTypeFilter(["text/html"])
])
# Initialize strategy
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=filter_chain,
url_scorer=None,
max_concurrent=3
)
# Create and run scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
result = await scraper.ascrape("https://example.com/blog/")
```
### Features Demonstrated
- Basic URL filtering
- Simple content type filtering
- Depth control
- Concurrent request limiting
- Result collection
## Advanced Example
The advanced example shows a sophisticated news site scraping setup with all features enabled:
```python
# Create comprehensive filter chain
filter_chain = FilterChain([
DomainFilter(
allowed_domains=["example.com"],
blocked_domains=["ads.example.com"]
),
URLPatternFilter([
"*/article/*",
re.compile(r"\d{4}/\d{2}/.*")
]),
ContentTypeFilter(["text/html"])
])
# Create intelligent scorer
scorer = CompositeScorer([
KeywordRelevanceScorer(
keywords=["news", "breaking"],
weight=1.0
),
PathDepthScorer(optimal_depth=3, weight=0.7),
FreshnessScorer(weight=0.9)
])
# Initialize advanced strategy
strategy = BFSScraperStrategy(
max_depth=4,
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=5
)
```
### Features Demonstrated
1. **Advanced Filtering**
- Domain filtering
- Pattern matching
- Content type control
2. **Intelligent Scoring**
- Keyword relevance
- Path optimization
- Freshness priority
3. **Monitoring**
- Progress tracking
- Error handling
- Statistics collection
4. **Resource Management**
- Concurrent processing
- Rate limiting
- Cleanup handling
## Running the Examples
```bash
# Basic usage
python basic_scraper_example.py
# Advanced usage with logging
PYTHONPATH=. python advanced_scraper_example.py
```
## Example Output
### Basic Example
```
Crawled 15 pages:
- https://example.com/blog/post1: 24560 bytes
- https://example.com/blog/post2: 18920 bytes
...
```
### Advanced Example
```
INFO: Starting crawl of https://example.com/news/
INFO: Processed: https://example.com/news/breaking/story1
DEBUG: KeywordScorer: 0.85
DEBUG: FreshnessScorer: 0.95
INFO: Progress: 10 URLs processed
...
INFO: Scraping completed:
INFO: - URLs processed: 50
INFO: - Errors: 2
INFO: - Total content size: 1240.50 KB
```
## Customization
### Adding Custom Filters
```python
class CustomFilter(URLFilter):
def apply(self, url: str) -> bool:
# Your custom filtering logic
return True
filter_chain.add_filter(CustomFilter())
```
### Custom Scoring Logic
```python
class CustomScorer(URLScorer):
def _calculate_score(self, url: str) -> float:
# Your custom scoring logic
return 1.0
scorer = CompositeScorer([
CustomScorer(weight=1.0),
...
])
```
## Best Practices
1. **Start Simple**
- Begin with basic filtering
- Add features incrementally
- Test thoroughly at each step
2. **Monitor Performance**
- Watch memory usage
- Track processing times
- Adjust concurrency as needed
3. **Handle Errors**
- Implement proper error handling
- Log important events
- Track error statistics
4. **Optimize Resources**
- Set appropriate delays
- Limit concurrent requests
- Use streaming for large crawls
## Troubleshooting
Common issues and solutions:
1. **Too Many Requests**
```python
strategy = BFSScraperStrategy(
max_concurrent=3, # Reduce concurrent requests
min_crawl_delay=2 # Increase delay between requests
)
```
2. **Memory Issues**
```python
# Use streaming mode for large crawls
async for result in scraper.ascrape(url, stream=True):
process_result(result)
```
3. **Missing Content**
```python
# Check your filter chain
filter_chain = FilterChain([
URLPatternFilter("*"), # Broaden patterns
ContentTypeFilter(["*"]) # Accept all content
])
```
For more examples and use cases, visit our [GitHub repository](https://github.com/example/crawl4ai/examples).

View File

@@ -0,0 +1,184 @@
# basic_scraper_example.py
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def basic_scraper_example():
"""
Basic example: Scrape a blog site for articles
- Crawls only HTML pages
- Stays within the blog section
- Collects all results at once
"""
# Create a simple filter chain
filter_chain = FilterChain([
# Only crawl pages within the blog section
URLPatternFilter("*/blog/*"),
# Only process HTML pages
ContentTypeFilter(["text/html"])
])
# Initialize the strategy with basic configuration
strategy = BFSScraperStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
max_concurrent=3 # Limit concurrent requests
)
# Create the crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Start scraping
try:
result = await scraper.ascrape("https://example.com/blog/")
# Process results
print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
# advanced_scraper_example.py
import logging
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def advanced_scraper_example():
"""
Advanced example: Intelligent news site scraping
- Uses all filter types
- Implements sophisticated scoring
- Streams results
- Includes monitoring and logging
"""
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("advanced_scraper")
# Create sophisticated filter chain
filter_chain = FilterChain([
# Domain control
DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com", "tracker.example.com"]
),
# URL patterns
URLPatternFilter([
"*/article/*",
"*/news/*",
"*/blog/*",
re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs
]),
# Content types
ContentTypeFilter([
"text/html",
"application/xhtml+xml"
])
])
# Create composite scorer
scorer = CompositeScorer([
# Prioritize by keywords
KeywordRelevanceScorer(
keywords=["news", "breaking", "update", "latest"],
weight=1.0
),
# Prefer optimal URL structure
PathDepthScorer(
optimal_depth=3,
weight=0.7
),
# Prioritize fresh content
FreshnessScorer(weight=0.9)
])
# Initialize strategy with advanced configuration
strategy = BFSScraperStrategy(
max_depth=4,
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=5,
min_crawl_delay=1
)
# Create crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Track statistics
stats = {
'processed': 0,
'errors': 0,
'total_size': 0
}
try:
# Use streaming mode
async for result in scraper.ascrape("https://example.com/news/", stream=True):
stats['processed'] += 1
if result.success:
stats['total_size'] += len(result.html)
logger.info(f"Processed: {result.url}")
# Print scoring information
for scorer_name, score in result.scores.items():
logger.debug(f"{scorer_name}: {score:.2f}")
else:
stats['errors'] += 1
logger.error(f"Failed to process {result.url}: {result.error_message}")
# Log progress regularly
if stats['processed'] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e:
logger.error(f"Scraping error: {e}")
finally:
# Print final statistics
logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics
for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics
logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}")
if __name__ == "__main__":
import asyncio
# Run basic example
print("Running basic scraper example...")
asyncio.run(basic_scraper_example())
print("\nRunning advanced scraper example...")
asyncio.run(advanced_scraper_example())

184
tests/test_scraper.py Normal file
View File

@@ -0,0 +1,184 @@
# basic_scraper_example.py
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def basic_scraper_example():
"""
Basic example: Scrape a blog site for articles
- Crawls only HTML pages
- Stays within the blog section
- Collects all results at once
"""
# Create a simple filter chain
filter_chain = FilterChain([
# Only crawl pages within the blog section
URLPatternFilter("*/blog/*"),
# Only process HTML pages
ContentTypeFilter(["text/html"])
])
# Initialize the strategy with basic configuration
strategy = BFSScraperStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
max_concurrent=3 # Limit concurrent requests
)
# Create the crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Start scraping
try:
result = await scraper.ascrape("https://example.com/blog/")
# Process results
print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
# advanced_scraper_example.py
import logging
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def advanced_scraper_example():
"""
Advanced example: Intelligent news site scraping
- Uses all filter types
- Implements sophisticated scoring
- Streams results
- Includes monitoring and logging
"""
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("advanced_scraper")
# Create sophisticated filter chain
filter_chain = FilterChain([
# Domain control
DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com", "tracker.example.com"]
),
# URL patterns
URLPatternFilter([
"*/article/*",
"*/news/*",
"*/blog/*",
re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs
]),
# Content types
ContentTypeFilter([
"text/html",
"application/xhtml+xml"
])
])
# Create composite scorer
scorer = CompositeScorer([
# Prioritize by keywords
KeywordRelevanceScorer(
keywords=["news", "breaking", "update", "latest"],
weight=1.0
),
# Prefer optimal URL structure
PathDepthScorer(
optimal_depth=3,
weight=0.7
),
# Prioritize fresh content
FreshnessScorer(weight=0.9)
])
# Initialize strategy with advanced configuration
strategy = BFSScraperStrategy(
max_depth=4,
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=5,
min_crawl_delay=1
)
# Create crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Track statistics
stats = {
'processed': 0,
'errors': 0,
'total_size': 0
}
try:
# Use streaming mode
async for result in scraper.ascrape("https://example.com/news/", stream=True):
stats['processed'] += 1
if result.success:
stats['total_size'] += len(result.html)
logger.info(f"Processed: {result.url}")
# Print scoring information
for scorer_name, score in result.scores.items():
logger.debug(f"{scorer_name}: {score:.2f}")
else:
stats['errors'] += 1
logger.error(f"Failed to process {result.url}: {result.error_message}")
# Log progress regularly
if stats['processed'] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e:
logger.error(f"Scraping error: {e}")
finally:
# Print final statistics
logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics
for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics
logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}")
if __name__ == "__main__":
import asyncio
# Run basic example
print("Running basic scraper example...")
asyncio.run(basic_scraper_example())
print("\nRunning advanced scraper example...")
asyncio.run(advanced_scraper_example())