feat(scraper): Enhance URL filtering and scoring systems

Implement comprehensive URL filtering and scoring capabilities:

Filters:
- Add URLPatternFilter with glob/regex support
- Implement ContentTypeFilter with MIME type checking
- Add DomainFilter for domain control
- Create FilterChain with stats tracking

Scorers:
- Complete KeywordRelevanceScorer implementation
- Add PathDepthScorer for URL structure scoring
- Implement ContentTypeScorer for file type priorities
- Add FreshnessScorer for date-based scoring
- Add DomainAuthorityScorer for domain weighting
- Create CompositeScorer for combined strategies

Features:
- Add statistics tracking for both filters and scorers
- Implement logging support throughout
- Add resource cleanup methods
- Create comprehensive documentation
- Include performance optimizations

Tests and docs included.
Note: Review URL normalization overlap with recent crawler changes.

- Quick Start is created and added
This commit is contained in:
UncleCode
2024-11-08 18:45:12 +08:00
parent d11c004fbb
commit bae4665949
10 changed files with 1451 additions and 157 deletions

View File

@@ -1,138 +0,0 @@
from .scraper_strategy import ScraperStrategy
from .filters import FilterChain
from .scorers import URLScorer
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
import asyncio
import validators
from urllib.parse import urljoin,urlparse,urlunparse
from urllib.robotparser import RobotFileParser
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
import logging
from typing import Dict, AsyncGenerator
logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
# For Crawl Politeness
self.last_crawl_time = defaultdict(float)
self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain
# For Robots.txt Compliance
self.robot_parsers = {}
# Robots.txt Parser
def get_robot_parser(self, url: str) -> RobotFileParser:
domain = urlparse(url)
scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided
netloc = domain.netloc
if netloc not in self.robot_parsers:
rp = RobotFileParser()
rp.set_url(f"{scheme}://{netloc}/robots.txt")
try:
rp.read()
except Exception as e:
# Log the type of error, message, and the URL
logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}")
return None
self.robot_parsers[netloc] = rp
return self.robot_parsers[netloc]
# Retry with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]:
def normalize_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse(parsed._replace(fragment=""))
# URL Validation
if not validators.url(url):
logging.warning(f"Invalid URL: {url}")
return None
# Robots.txt Compliance
robot_parser = self.get_robot_parser(url)
if robot_parser is None:
logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.")
else:
# If robots.txt was fetched, check if crawling is allowed
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness
domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain]
if time_since_last_crawl < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl)
self.last_crawl_time[domain] = time.time()
# Rate Limiting
async with rate_limiter:
# Error Handling
try:
crawl_result = await self.retry_crawl(crawler, url)
except Exception as e:
logging.error(f"Error crawling {url}: {str(e)}")
crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e))
if not crawl_result.success:
# Logging and Monitoring
logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}")
return crawl_result
# Process links
for link_type in ["internal", "external"]:
for link in crawl_result.links[link_type]:
absolute_link = urljoin(url, link['href'])
normalized_link = normalize_url(absolute_link)
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
new_depth = depths[url] + 1
if new_depth <= self.max_depth:
# URL Scoring
score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link))
depths[normalized_link] = new_depth
return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]:
queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url))
visited = set()
depths = {start_url: 0}
pending_tasks = set()
while not queue.empty() or pending_tasks:
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
_, depth, url = await queue.get()
if url not in visited:
# Adding URL to the visited set here itself, (instead of after result generation)
# so that other tasks are not queued for same URL, found at different depth before
# crawling and extraction of this task is completed.
visited.add(url)
if parallel_processing:
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
pending_tasks.add(task)
else:
result = await self.process_url(url, depth, crawler, queue, visited, depths)
if result:
yield result
# Wait for the first task to complete and yield results incrementally as each task is completed
if pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result:
yield result

View File

@@ -1,3 +1,205 @@
from .url_filter import URLFilter, FilterChain
from .content_type_filter import ContentTypeFilter
from .url_pattern_filter import URLPatternFilter
# from .url_filter import URLFilter, FilterChain
# from .content_type_filter import ContentTypeFilter
# from .url_pattern_filter import URLPatternFilter
from abc import ABC, abstractmethod
from typing import List, Pattern, Set, Union
import re
from urllib.parse import urlparse
import mimetypes
import logging
from dataclasses import dataclass
import fnmatch
@dataclass
class FilterStats:
"""Statistics for filter applications"""
total_urls: int = 0
rejected_urls: int = 0
passed_urls: int = 0
class URLFilter(ABC):
"""Base class for URL filters"""
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FilterStats()
self.logger = logging.getLogger(f"urlfilter.{self.name}")
@abstractmethod
def apply(self, url: str) -> bool:
"""Apply the filter to a URL"""
pass
def _update_stats(self, passed: bool):
"""Update filter statistics"""
self.stats.total_urls += 1
if passed:
self.stats.passed_urls += 1
else:
self.stats.rejected_urls += 1
class FilterChain:
"""Chain of URL filters."""
def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or []
self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain")
def add_filter(self, filter_: URLFilter) -> 'FilterChain':
"""Add a filter to the chain"""
self.filters.append(filter_)
return self # Enable method chaining
def apply(self, url: str) -> bool:
"""Apply all filters in the chain"""
self.stats.total_urls += 1
for filter_ in self.filters:
if not filter_.apply(url):
self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False
self.stats.passed_urls += 1
return True
class URLPatternFilter(URLFilter):
"""Filter URLs based on glob patterns or regex.
pattern_filter = URLPatternFilter([
"*.example.com/*", # Glob pattern
"*/article/*", # Path pattern
re.compile(r"blog-\d+") # Regex pattern
])
- Supports glob patterns and regex
- Multiple patterns per filter
- Pattern pre-compilation for performance
"""
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True):
super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob
self._compiled_patterns = []
for pattern in self.patterns:
if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern))
else:
self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern)
def _glob_to_regex(self, pattern: str) -> Pattern:
"""Convert glob pattern to regex"""
return re.compile(fnmatch.translate(pattern))
def apply(self, url: str) -> bool:
"""Check if URL matches any of the patterns"""
matches = any(pattern.search(url) for pattern in self._compiled_patterns)
self._update_stats(matches)
return matches
class ContentTypeFilter(URLFilter):
"""Filter URLs based on expected content type.
content_filter = ContentTypeFilter([
"text/html",
"application/pdf"
], check_extension=True)
- Filter by MIME types
- Extension checking
- Support for multiple content types
"""
def __init__(self, allowed_types: Union[str, List[str]],
check_extension: bool = True):
super().__init__()
self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types
self.check_extension = check_extension
self._normalize_types()
def _normalize_types(self):
"""Normalize content type strings"""
self.allowed_types = [t.lower() for t in self.allowed_types]
def _check_extension(self, url: str) -> bool:
"""Check URL's file extension"""
ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else ''
if not ext:
return True # No extension, might be dynamic content
guessed_type = mimetypes.guess_type(url)[0]
return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types)
def apply(self, url: str) -> bool:
"""Check if URL's content type is allowed"""
result = True
if self.check_extension:
result = self._check_extension(url)
self._update_stats(result)
return result
class DomainFilter(URLFilter):
"""Filter URLs based on allowed/blocked domains.
domain_filter = DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com"]
)
- Allow/block specific domains
- Subdomain support
- Efficient domain matching
"""
def __init__(self, allowed_domains: Union[str, List[str]] = None,
blocked_domains: Union[str, List[str]] = None):
super().__init__()
self.allowed_domains = set(self._normalize_domains(allowed_domains)) if allowed_domains else None
self.blocked_domains = set(self._normalize_domains(blocked_domains)) if blocked_domains else set()
def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]:
"""Normalize domain strings"""
if isinstance(domains, str):
domains = [domains]
return [d.lower().strip() for d in domains]
def _extract_domain(self, url: str) -> str:
"""Extract domain from URL"""
return urlparse(url).netloc.lower()
def apply(self, url: str) -> bool:
"""Check if URL's domain is allowed"""
domain = self._extract_domain(url)
if domain in self.blocked_domains:
self._update_stats(False)
return False
if self.allowed_domains is not None and domain not in self.allowed_domains:
self._update_stats(False)
return False
self._update_stats(True)
return True
# Example usage:
def create_common_filter_chain() -> FilterChain:
"""Create a commonly used filter chain"""
return FilterChain([
URLPatternFilter([
"*.html", "*.htm", # HTML files
"*/article/*", "*/blog/*" # Common content paths
]),
ContentTypeFilter([
"text/html",
"application/xhtml+xml"
]),
DomainFilter(
blocked_domains=["ads.*", "analytics.*"]
)
])

View File

@@ -1,8 +1,43 @@
from .url_filter import URLFilter
from typing import List, Union
from urllib.parse import urlparse
import mimetypes
class ContentTypeFilter(URLFilter):
def __init__(self, contentType: str):
self.contentType = contentType
"""Filter URLs based on expected content type"""
def __init__(self, allowed_types: Union[str, List[str]],
check_extension: bool = True):
super().__init__()
self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types
self.check_extension = check_extension
self._normalize_types()
def _normalize_types(self):
"""Normalize content type strings"""
self.allowed_types = [t.lower() for t in self.allowed_types]
def _check_extension(self, url: str) -> bool:
"""Check URL's file extension"""
ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else ''
if not ext:
return True # No extension, might be dynamic content
guessed_type = mimetypes.guess_type(url)[0]
return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types)
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later
return True
"""Check if URL's content type is allowed"""
result = True
if self.check_extension:
result = self._check_extension(url)
self._update_stats(result)
return result
# class ContentTypeFilter(URLFilter):
# def __init__(self, contentType: str):
# self.contentType = contentType
# def apply(self, url: str) -> bool:
# #TODO: This is a stub. Will implement this later
# return True

View File

@@ -1,16 +1,72 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
from typing import List
@dataclass
class FilterStats:
"""Statistics for filter applications"""
total_urls: int = 0
rejected_urls: int = 0
passed_urls: int = 0
class URLFilter(ABC):
"""Base class for URL filters"""
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FilterStats()
self.logger = logging.getLogger(f"urlfilter.{self.name}")
@abstractmethod
def apply(self, url: str) -> bool:
"""Apply the filter to a URL"""
pass
class FilterChain:
def __init__(self):
self.filters = []
def _update_stats(self, passed: bool):
"""Update filter statistics"""
self.stats.total_urls += 1
if passed:
self.stats.passed_urls += 1
else:
self.stats.rejected_urls += 1
def add_filter(self, filter: URLFilter):
self.filters.append(filter)
class FilterChain:
"""Chain of URL filters"""
def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or []
self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain")
def add_filter(self, filter_: URLFilter) -> 'FilterChain':
"""Add a filter to the chain"""
self.filters.append(filter_)
return self # Enable method chaining
def apply(self, url: str) -> bool:
return all(filter.apply(url) for filter in self.filters)
"""Apply all filters in the chain"""
self.stats.total_urls += 1
for filter_ in self.filters:
if not filter_.apply(url):
self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False
self.stats.passed_urls += 1
return True
# class URLFilter(ABC):
# @abstractmethod
# def apply(self, url: str) -> bool:
# pass
# class FilterChain:
# def __init__(self):
# self.filters = []
# def add_filter(self, filter: URLFilter):
# self.filters.append(filter)
# def apply(self, url: str) -> bool:
# return all(filter.apply(url) for filter in self.filters)

View File

@@ -1,9 +1,39 @@
from .url_filter import URLFilter
from re import Pattern
from typing import List, Union
import re
import fnmatch
class URLPatternFilter(URLFilter):
def __init__(self, pattern: Pattern):
self.pattern = pattern
"""Filter URLs based on glob patterns or regex"""
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True):
super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob
self._compiled_patterns = []
for pattern in self.patterns:
if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern))
else:
self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern)
def _glob_to_regex(self, pattern: str) -> Pattern:
"""Convert glob pattern to regex"""
return re.compile(fnmatch.translate(pattern))
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later.
return True
"""Check if URL matches any of the patterns"""
matches = any(pattern.search(url) for pattern in self._compiled_patterns)
self._update_stats(matches)
return matches
# class URLPatternFilter(URLFilter):
# def __init__(self, pattern: Pattern):
# self.pattern = pattern
# def apply(self, url: str) -> bool:
# #TODO: This is a stub. Will implement this later.
# return True

View File

@@ -1,2 +1,268 @@
from .url_scorer import URLScorer
from .keyword_relevance_scorer import KeywordRelevanceScorer
# from .url_scorer import URLScorer
# from .keyword_relevance_scorer import KeywordRelevanceScorer
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Union
from dataclasses import dataclass
from urllib.parse import urlparse, unquote
import re
from collections import defaultdict
import math
import logging
@dataclass
class ScoringStats:
"""Statistics for URL scoring"""
urls_scored: int = 0
total_score: float = 0.0
min_score: float = float('inf')
max_score: float = float('-inf')
def update(self, score: float):
"""Update scoring statistics"""
self.urls_scored += 1
self.total_score += score
self.min_score = min(self.min_score, score)
self.max_score = max(self.max_score, score)
@property
def average_score(self) -> float:
"""Calculate average score"""
return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0
class URLScorer(ABC):
"""Base class for URL scoring strategies"""
def __init__(self, weight: float = 1.0, name: str = None):
self.weight = weight
self.name = name or self.__class__.__name__
self.stats = ScoringStats()
self.logger = logging.getLogger(f"urlscorer.{self.name}")
@abstractmethod
def _calculate_score(self, url: str) -> float:
"""Calculate the raw score for a URL"""
pass
def score(self, url: str) -> float:
"""Calculate the weighted score for a URL"""
raw_score = self._calculate_score(url)
weighted_score = raw_score * self.weight
self.stats.update(weighted_score)
return weighted_score
class CompositeScorer(URLScorer):
"""Combines multiple scorers with weights"""
def __init__(self, scorers: List[URLScorer], normalize: bool = True):
super().__init__(name="CompositeScorer")
self.scorers = scorers
self.normalize = normalize
def _calculate_score(self, url: str) -> float:
scores = [scorer.score(url) for scorer in self.scorers]
total_score = sum(scores)
if self.normalize and scores:
total_score /= len(scores)
return total_score
class KeywordRelevanceScorer(URLScorer):
"""Score URLs based on keyword relevance.
keyword_scorer = KeywordRelevanceScorer(
keywords=["python", "programming"],
weight=1.0,
case_sensitive=False
)
- Score based on keyword matches
- Case sensitivity options
- Weighted scoring
"""
def __init__(self, keywords: List[str], weight: float = 1.0,
case_sensitive: bool = False):
super().__init__(weight=weight)
self.keywords = keywords
self.case_sensitive = case_sensitive
self._compile_keywords()
def _compile_keywords(self):
"""Prepare keywords for matching"""
flags = 0 if self.case_sensitive else re.IGNORECASE
self.patterns = [re.compile(re.escape(k), flags) for k in self.keywords]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on keyword matches"""
decoded_url = unquote(url)
total_matches = sum(
1 for pattern in self.patterns
if pattern.search(decoded_url)
)
# Normalize score between 0 and 1
return total_matches / len(self.patterns) if self.patterns else 0.0
class PathDepthScorer(URLScorer):
"""Score URLs based on their path depth.
path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth
weight=0.7
)
- Score based on URL path depth
- Configurable optimal depth
- Diminishing returns for deeper paths
"""
def __init__(self, optimal_depth: int = 3, weight: float = 1.0):
super().__init__(weight=weight)
self.optimal_depth = optimal_depth
def _calculate_score(self, url: str) -> float:
"""Calculate score based on path depth"""
path = urlparse(url).path
depth = len([x for x in path.split('/') if x])
# Score decreases as we move away from optimal depth
distance_from_optimal = abs(depth - self.optimal_depth)
return 1.0 / (1.0 + distance_from_optimal)
class ContentTypeScorer(URLScorer):
"""Score URLs based on content type preferences.
content_scorer = ContentTypeScorer({
r'\.html$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
})
- Score based on file types
- Configurable type weights
- Pattern matching support
"""
def __init__(self, type_weights: Dict[str, float], weight: float = 1.0):
super().__init__(weight=weight)
self.type_weights = type_weights
self._compile_patterns()
def _compile_patterns(self):
"""Prepare content type patterns"""
self.patterns = {
re.compile(pattern): weight
for pattern, weight in self.type_weights.items()
}
def _calculate_score(self, url: str) -> float:
"""Calculate score based on content type matching"""
for pattern, weight in self.patterns.items():
if pattern.search(url):
return weight
return 0.0
class FreshnessScorer(URLScorer):
"""Score URLs based on freshness indicators.
freshness_scorer = FreshnessScorer(weight=0.9)
Score based on date indicators in URLs
Multiple date format support
Recency weighting"""
def __init__(self, weight: float = 1.0):
super().__init__(weight=weight)
self.date_patterns = [
r'/(\d{4})/(\d{2})/(\d{2})/', # yyyy/mm/dd
r'(\d{4})[-_](\d{2})[-_](\d{2})', # yyyy-mm-dd
r'/(\d{4})/', # year only
]
self._compile_patterns()
def _compile_patterns(self):
"""Prepare date patterns"""
self.compiled_patterns = [re.compile(p) for p in self.date_patterns]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on date indicators"""
for pattern in self.compiled_patterns:
if match := pattern.search(url):
year = int(match.group(1))
# Score higher for more recent years
return 1.0 - (2024 - year) * 0.1
return 0.5 # Default score for URLs without dates
class DomainAuthorityScorer(URLScorer):
"""Score URLs based on domain authority.
authority_scorer = DomainAuthorityScorer({
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
})
Score based on domain importance
Configurable domain weights
Default weight for unknown domains"""
def __init__(self, domain_weights: Dict[str, float],
default_weight: float = 0.5, weight: float = 1.0):
super().__init__(weight=weight)
self.domain_weights = domain_weights
self.default_weight = default_weight
def _calculate_score(self, url: str) -> float:
"""Calculate score based on domain authority"""
domain = urlparse(url).netloc.lower()
return self.domain_weights.get(domain, self.default_weight)
def create_balanced_scorer() -> CompositeScorer:
"""Create a balanced composite scorer"""
return CompositeScorer([
KeywordRelevanceScorer(
keywords=["article", "blog", "news", "research"],
weight=1.0
),
PathDepthScorer(
optimal_depth=3,
weight=0.7
),
ContentTypeScorer(
type_weights={
r'\.html?$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
},
weight=0.8
),
FreshnessScorer(
weight=0.9
)
])
# Example Usage:
"""
# Create a composite scorer
scorer = CompositeScorer([
KeywordRelevanceScorer(["python", "programming"], weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.7),
FreshnessScorer(weight=0.8),
DomainAuthorityScorer(
domain_weights={
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
},
weight=0.9
)
])
# Score a URL
score = scorer.score("https://python.org/article/2024/01/new-features")
# Access statistics
print(f"Average score: {scorer.stats.average_score}")
print(f"URLs scored: {scorer.stats.urls_scored}")
"""