feat(scraper): Enhance URL filtering and scoring systems

Implement comprehensive URL filtering and scoring capabilities:

Filters:
- Add URLPatternFilter with glob/regex support
- Implement ContentTypeFilter with MIME type checking
- Add DomainFilter for domain control
- Create FilterChain with stats tracking

Scorers:
- Complete KeywordRelevanceScorer implementation
- Add PathDepthScorer for URL structure scoring
- Implement ContentTypeScorer for file type priorities
- Add FreshnessScorer for date-based scoring
- Add DomainAuthorityScorer for domain weighting
- Create CompositeScorer for combined strategies

Features:
- Add statistics tracking for both filters and scorers
- Implement logging support throughout
- Add resource cleanup methods
- Create comprehensive documentation
- Include performance optimizations

Tests and docs included.
Note: Review URL normalization overlap with recent crawler changes.
This commit is contained in:
UncleCode
2024-11-08 19:02:28 +08:00
parent bae4665949
commit 0d357ab7d2
10 changed files with 186 additions and 281 deletions

View File

@@ -1,2 +1,3 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy
from .bfs_scraper_strategy import BFSScraperStrategy
from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter

View File

@@ -1,43 +0,0 @@
from .url_filter import URLFilter
from typing import List, Union
from urllib.parse import urlparse
import mimetypes
class ContentTypeFilter(URLFilter):
"""Filter URLs based on expected content type"""
def __init__(self, allowed_types: Union[str, List[str]],
check_extension: bool = True):
super().__init__()
self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types
self.check_extension = check_extension
self._normalize_types()
def _normalize_types(self):
"""Normalize content type strings"""
self.allowed_types = [t.lower() for t in self.allowed_types]
def _check_extension(self, url: str) -> bool:
"""Check URL's file extension"""
ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else ''
if not ext:
return True # No extension, might be dynamic content
guessed_type = mimetypes.guess_type(url)[0]
return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types)
def apply(self, url: str) -> bool:
"""Check if URL's content type is allowed"""
result = True
if self.check_extension:
result = self._check_extension(url)
self._update_stats(result)
return result
# class ContentTypeFilter(URLFilter):
# def __init__(self, contentType: str):
# self.contentType = contentType
# def apply(self, url: str) -> bool:
# #TODO: This is a stub. Will implement this later
# return True

View File

@@ -1,72 +0,0 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
from typing import List
@dataclass
class FilterStats:
"""Statistics for filter applications"""
total_urls: int = 0
rejected_urls: int = 0
passed_urls: int = 0
class URLFilter(ABC):
"""Base class for URL filters"""
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FilterStats()
self.logger = logging.getLogger(f"urlfilter.{self.name}")
@abstractmethod
def apply(self, url: str) -> bool:
"""Apply the filter to a URL"""
pass
def _update_stats(self, passed: bool):
"""Update filter statistics"""
self.stats.total_urls += 1
if passed:
self.stats.passed_urls += 1
else:
self.stats.rejected_urls += 1
class FilterChain:
"""Chain of URL filters"""
def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or []
self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain")
def add_filter(self, filter_: URLFilter) -> 'FilterChain':
"""Add a filter to the chain"""
self.filters.append(filter_)
return self # Enable method chaining
def apply(self, url: str) -> bool:
"""Apply all filters in the chain"""
self.stats.total_urls += 1
for filter_ in self.filters:
if not filter_.apply(url):
self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False
self.stats.passed_urls += 1
return True
# class URLFilter(ABC):
# @abstractmethod
# def apply(self, url: str) -> bool:
# pass
# class FilterChain:
# def __init__(self):
# self.filters = []
# def add_filter(self, filter: URLFilter):
# self.filters.append(filter)
# def apply(self, url: str) -> bool:
# return all(filter.apply(url) for filter in self.filters)

View File

@@ -1,39 +0,0 @@
from .url_filter import URLFilter
from re import Pattern
from typing import List, Union
import re
import fnmatch
class URLPatternFilter(URLFilter):
"""Filter URLs based on glob patterns or regex"""
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True):
super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob
self._compiled_patterns = []
for pattern in self.patterns:
if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern))
else:
self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern)
def _glob_to_regex(self, pattern: str) -> Pattern:
"""Convert glob pattern to regex"""
return re.compile(fnmatch.translate(pattern))
def apply(self, url: str) -> bool:
"""Check if URL matches any of the patterns"""
matches = any(pattern.search(url) for pattern in self._compiled_patterns)
self._update_stats(matches)
return matches
# class URLPatternFilter(URLFilter):
# def __init__(self, pattern: Pattern):
# self.pattern = pattern
# def apply(self, url: str) -> bool:
# #TODO: This is a stub. Will implement this later.
# return True

View File

@@ -1,9 +0,0 @@
from .url_scorer import URLScorer
from typing import List
class KeywordRelevanceScorer(URLScorer):
def __init__(self,keywords: List[str]):
self.keyworkds = keywords
def score(self, url: str) -> float:
#TODO: This is a stub. Will implement this later.
return 1

View File

@@ -1,6 +0,0 @@
from abc import ABC, abstractmethod
class URLScorer(ABC):
@abstractmethod
def score(self, url: str) -> float:
pass