2025 feb alpha 1 (#685)

* spelling change in prompt

* gpt-4o-mini support

* Remove leading Y before here

* prompt spell correction

* (Docs) Fix numbered list end-of-line formatting

Added the missing "two spaces" to add a line break

* fix: access downloads_path through browser_config in _handle_download method - Fixes #585

* crawl

* fix: https://github.com/unclecode/crawl4ai/issues/592

* fix: https://github.com/unclecode/crawl4ai/issues/583

* Docs update: https://github.com/unclecode/crawl4ai/issues/649

* fix: https://github.com/unclecode/crawl4ai/issues/570

* Docs: updated example for content-selection to reflect new changes in yc newsfeed css

* Refactor: Removed old filters and replaced with optimised filters

* fix:Fixed imports as per the new names of filters

* Tests: For deep crawl filters

* Refactor: Remove old scorers and replace with optimised ones: Fix imports forall filters and scorers.

* fix: awaiting on filters that are async in nature eg: content relevance and seo filters

* fix: https://github.com/unclecode/crawl4ai/issues/592

* fix: https://github.com/unclecode/crawl4ai/issues/715

---------

Co-authored-by: DarshanTank <darshan.tank@gnani.ai>
Co-authored-by: Tuhin Mallick <tuhin.mllk@gmail.com>
Co-authored-by: Serhat Soydan <ssoydan@gmail.com>
Co-authored-by: cardit1 <maneesh@cardit.in>
Co-authored-by: Tautik Agrahari <tautikagrahari@gmail.com>
This commit is contained in:
Aravind
2025-02-19 11:43:17 +05:30
committed by GitHub
parent c171891999
commit dad592c801
19 changed files with 833 additions and 1350 deletions

View File

@@ -17,11 +17,16 @@ from .extraction_strategy import (
LLMExtractionStrategy,
CosineStrategy,
JsonCssExtractionStrategy,
JsonXPathExtractionStrategy
JsonXPathExtractionStrategy,
)
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
from .content_filter_strategy import PruningContentFilter, BM25ContentFilter, LLMContentFilter, RelevantContentFilter
from .content_filter_strategy import (
PruningContentFilter,
BM25ContentFilter,
LLMContentFilter,
RelevantContentFilter,
)
from .models import CrawlResult, MarkdownGenerationResult
from .async_dispatcher import (
MemoryAdaptiveDispatcher,
@@ -29,20 +34,25 @@ from .async_dispatcher import (
RateLimiter,
CrawlerMonitor,
DisplayMode,
BaseDispatcher
BaseDispatcher,
)
from .docker_client import Crawl4aiDockerClient
from .hub import CrawlerHub
from .deep_crawling import (
DeepCrawlStrategy,
BFSDeepCrawlStrategy,
FastFilterChain,
FastContentTypeFilter,
FastDomainFilter,
FastURLFilter,
FastFilterStats,
FastKeywordRelevanceScorer,
FastURLScorer,
FilterChain,
ContentTypeFilter,
DomainFilter,
URLFilter,
FilterStats,
SEOFilter,
KeywordRelevanceScorer,
URLScorer,
CompositeScorer,
DomainAuthorityScorer,
FreshnessScorer,
PathDepthScorer,
BestFirstCrawlingStrategy,
DFSDeepCrawlStrategy,
DeepCrawlDecorator,
@@ -54,13 +64,18 @@ __all__ = [
"BFSDeepCrawlStrategy",
"BestFirstCrawlingStrategy",
"DFSDeepCrawlStrategy",
"FastFilterChain",
"FastContentTypeFilter",
"FastDomainFilter",
"FastFilterStats",
"FastURLFilter",
"FastKeywordRelevanceScorer",
"FastURLScorer",
"FilterChain",
"ContentTypeFilter",
"DomainFilter",
"FilterStats",
"URLFilter",
"SEOFilter",
"KeywordRelevanceScorer",
"URLScorer",
"CompositeScorer",
"DomainAuthorityScorer",
"FreshnessScorer",
"PathDepthScorer",
"DeepCrawlDecorator",
"CrawlResult",
"CrawlerHub",

View File

@@ -886,7 +886,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
try:
viewport_height = page.viewport_size.get(
viewport_size = page.viewport_size
if viewport_size is None:
await page.set_viewport_size(
{"width": self.browser_config.viewport_width, "height": self.browser_config.viewport_height}
)
viewport_size = page.viewport_size
viewport_height = viewport_size.get(
"height", self.browser_config.viewport_height
)
current_position = viewport_height
@@ -946,7 +953,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
try:
suggested_filename = download.suggested_filename
download_path = os.path.join(self.downloads_path, suggested_filename)
download_path = os.path.join(self.browser_config.downloads_path, suggested_filename)
self.logger.info(
message="Downloading {filename} to {path}",

View File

@@ -166,7 +166,7 @@ class AsyncWebCrawler:
)
# Initialize crawler strategy
params = {k: v for k, v in kwargs.items() if k in ["browser_congig", "logger"]}
params = {k: v for k, v in kwargs.items() if k in ["browser_config", "logger"]}
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
logger=self.logger,

View File

@@ -4,15 +4,22 @@ from .bfs_strategy import BFSDeepCrawlStrategy
from .bff_strategy import BestFirstCrawlingStrategy
from .dfs_strategy import DFSDeepCrawlStrategy
from .filters import (
FastFilterChain,
FastContentTypeFilter,
FastDomainFilter,
FastURLFilter,
FastFilterStats,
FilterChain,
ContentTypeFilter,
DomainFilter,
URLFilter,
FilterStats,
ContentRelevanceFilter,
SEOFilter
)
from .scorers import (
FastKeywordRelevanceScorer,
FastURLScorer,
KeywordRelevanceScorer,
URLScorer,
CompositeScorer,
DomainAuthorityScorer,
FreshnessScorer,
PathDepthScorer,
ContentTypeScorer
)
__all__ = [
@@ -21,11 +28,18 @@ __all__ = [
"BFSDeepCrawlStrategy",
"BestFirstCrawlingStrategy",
"DFSDeepCrawlStrategy",
"FastFilterChain",
"FastContentTypeFilter",
"FastDomainFilter",
"FastURLFilter",
"FastFilterStats",
"FastKeywordRelevanceScorer",
"FastURLScorer",
]
"FilterChain",
"ContentTypeFilter",
"DomainFilter",
"URLFilter",
"FilterStats",
"ContentRelevanceFilter",
"SEOFilter",
"KeywordRelevanceScorer",
"URLScorer",
"CompositeScorer",
"DomainAuthorityScorer",
"FreshnessScorer",
"PathDepthScorer",
"ContentTypeScorer",
]

View File

@@ -6,8 +6,8 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
from urllib.parse import urlparse
from ..models import TraversalStats
from .filters import FastFilterChain
from .scorers import FastURLScorer
from .filters import FilterChain
from .scorers import URLScorer
from . import DeepCrawlStrategy
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
@@ -34,8 +34,8 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
def __init__(
self,
max_depth: int,
filter_chain: FastFilterChain = FastFilterChain(),
url_scorer: Optional[FastURLScorer] = None,
filter_chain: FilterChain = FilterChain(),
url_scorer: Optional[URLScorer] = None,
include_external: bool = False,
logger: Optional[logging.Logger] = None,
):
@@ -64,7 +64,7 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.logger.warning(f"Invalid URL: {url}, error: {e}")
return False
if depth != 0 and not self.filter_chain.apply(url):
if depth != 0 and not await self.filter_chain.apply(url):
return False
return True

View File

@@ -6,8 +6,8 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
from urllib.parse import urlparse
from ..models import TraversalStats
from .filters import FastFilterChain
from .scorers import FastURLScorer
from .filters import FilterChain
from .scorers import URLScorer
from . import DeepCrawlStrategy
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult
@@ -23,8 +23,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
def __init__(
self,
max_depth: int,
filter_chain: FastFilterChain = FastFilterChain(),
url_scorer: Optional[FastURLScorer] = None,
filter_chain: FilterChain = FilterChain(),
url_scorer: Optional[URLScorer] = None,
include_external: bool = False,
logger: Optional[logging.Logger] = None,
):
@@ -53,7 +53,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.logger.warning(f"Invalid URL: {url}, error: {e}")
return False
if depth != 0 and not self.filter_chain.apply(url):
if depth != 0 and not await self.filter_chain.apply(url):
return False
return True

View File

@@ -374,7 +374,7 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
parsed = urlparse(url)
return (parsed.scheme in {'http', 'https'}
and '.' in parsed.netloc
and self.filter_chain.apply(url))
and await self.filter_chain.apply(url))
except Exception:
return False

File diff suppressed because it is too large Load Diff

View File

@@ -23,35 +23,7 @@ _FRESHNESS_SCORES = [
0.5, # 5 years ago
]
# Pre-computed normalization factors for powers of 2
_POW2_NORM = [1.0, 0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625]
@dataclass
class ScoringStats:
# PERF: Dataclass introduces overhead with property access and __init__
# PERF: Float operations and comparisons are expensive for high-frequency updates
# PERF: Property calculation on every access is inefficient
# PERF: Storing min/max adds memory overhead and comparison costs
# PERF: Using inf/-inf creates unnecessary float objects
urls_scored: int = 0
total_score: float = 0.0
min_score: float = float("inf") # Expensive object creation
max_score: float = float("-inf")
def update(self, score: float):
"""Update scoring statistics"""
self.urls_scored += 1
self.total_score += score
self.min_score = min(self.min_score, score)
self.max_score = max(self.max_score, score)
@property
def average_score(self) -> float:
"""Calculate average score"""
return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0
class FastScoringStats:
__slots__ = ('_urls_scored', '_total_score', '_min_score', '_max_score')
def __init__(self):
@@ -88,32 +60,7 @@ class FastScoringStats:
if self._max_score is None:
self._max_score = self._total_score / self._urls_scored if self._urls_scored else 0.0
return self._max_score
class URLScorer(ABC):
# PERF: Property access overhead for weight
# PERF: Unnecessary name attribute
# PERF: Stats object creation overhead
# PERF: Logger creation for each instance
# PERF: Abstract method overhead
def __init__(self, weight: float = 1.0, name: str = None):
self.weight = weight
self.name = name or self.__class__.__name__
self.stats = ScoringStats()
self.logger = logging.getLogger(f"urlscorer.{self.name}")
@abstractmethod
def _calculate_score(self, url: str) -> float:
pass
def score(self, url: str) -> float:
raw_score = self._calculate_score(url)
weighted_score = raw_score * self.weight
self.stats.update(weighted_score)
return weighted_score
# Optimized base class
class FastURLScorer(ABC):
__slots__ = ('_weight', '_stats')
def __init__(self, weight: float = 1.0):
@@ -142,31 +89,6 @@ class FastURLScorer(ABC):
return self._weight
class CompositeScorer(URLScorer):
# PERF: Unnecessary list iteration for each score
# PERF: Creates new list for scores
# PERF: Division on every normalization
# PERF: No parallelization for independent scorers
# PERF: No short circuit for zero scores
# PERF: No weighting optimization
# PERF: No caching of combined scores
# PERF: List allocation for scores storag
"""Combines multiple scorers with weights"""
def __init__(self, scorers: List[URLScorer], normalize: bool = True):
super().__init__(name="CompositeScorer")
self.scorers = scorers
self.normalize = normalize
def _calculate_score(self, url: str) -> float:
scores = [scorer.score(url) for scorer in self.scorers]
total_score = sum(scores)
if self.normalize and scores:
total_score /= len(scores)
return total_score
class FastCompositeScorer(FastURLScorer):
__slots__ = ('_scorers', '_normalize', '_weights_array', '_score_array')
def __init__(self, scorers: List[URLScorer], normalize: bool = True):
@@ -235,51 +157,7 @@ class FastCompositeScorer(FastURLScorer):
self.stats.update(score)
return score
class KeywordRelevanceScorer(URLScorer):
# PERF: Regex compilation and pattern matching is expensive
# PERF: List comprehension with pattern search has high overhead
# PERF: URL decoding on every calculation
# PERF: Division operation for normalization is costly
# PERF: Case insensitive regex adds overhead
# PERF: No pattern caching or reuse
# PERF: Using inheritance adds method lookup overhead
"""Score URLs based on keyword relevance.
keyword_scorer = KeywordRelevanceScorer(
keywords=["python", "programming"],
weight=1.0,
case_sensitive=False
)
- Score based on keyword matches
- Case sensitivity options
- Weighted scoring
"""
def __init__(
self, keywords: List[str], weight: float = 1.0, case_sensitive: bool = False
):
super().__init__(weight=weight)
self.keywords = keywords
self.case_sensitive = case_sensitive
self._compile_keywords()
def _compile_keywords(self):
"""Prepare keywords for matching"""
flags = 0 if self.case_sensitive else re.IGNORECASE
self.patterns = [re.compile(re.escape(k), flags) for k in self.keywords]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on keyword matches"""
decoded_url = unquote(url)
total_matches = sum(
1 for pattern in self.patterns if pattern.search(decoded_url)
)
# Normalize score between 0 and 1
return total_matches / len(self.patterns) if self.patterns else 0.0
class FastKeywordRelevanceScorer(FastURLScorer):
class KeywordRelevanceScorer(URLScorer):
__slots__ = ('_weight', '_stats', '_keywords', '_case_sensitive')
def __init__(self, keywords: List[str], weight: float = 1.0, case_sensitive: bool = False):
@@ -310,39 +188,6 @@ class FastKeywordRelevanceScorer(FastURLScorer):
return matches / len(self._keywords)
class PathDepthScorer(URLScorer):
# PERF: URL parsing on every call is expensive
# PERF: Split and list comprehension creates temporary lists
# PERF: abs() call adds function overhead
# PERF: Division and addition in score calculation are expensive for high frequency
# PERF: Path parts filtering creates extra list
# PERF: Inherits URLScorer adding method lookup overhead
# PERF: No caching of parsed URLs or calculated depths
"""Score URLs based on their path depth.
path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth
weight=0.7
)
- Score based on URL path depth
- Configurable optimal depth
- Diminishing returns for deeper paths
"""
def __init__(self, optimal_depth: int = 3, weight: float = 1.0):
super().__init__(weight=weight)
self.optimal_depth = optimal_depth
def _calculate_score(self, url: str) -> float:
"""Calculate score based on path depth"""
path = urlparse(url).path
depth = len([x for x in path.split("/") if x])
# Score decreases as we move away from optimal depth
distance_from_optimal = abs(depth - self.optimal_depth)
return 1.0 / (1.0 + distance_from_optimal)
class FastPathDepthScorer(FastURLScorer):
__slots__ = ('_weight', '_stats', '_optimal_depth') # Remove _url_cache
def __init__(self, optimal_depth: int = 3, weight: float = 1.0):
@@ -400,45 +245,6 @@ class FastPathDepthScorer(FastURLScorer):
return 1.0 / (1.0 + distance)
class ContentTypeScorer(URLScorer):
# PERF: Regex compilation on every initialization
# PERF: Dict lookup and regex search for every URL
# PERF: Pattern iteration adds loop overhead
# PERF: No pattern priority or short-circuit
# PERF: Dict storage has lookup overhead
# PERF: Missing extension fast path check
# PERF: Unnecessary regex for simple extensions
"""Score URLs based on content type preferences.
content_scorer = ContentTypeScorer({
r'\.html$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
})
- Score based on file types
- Configurable type weights
- Pattern matching support
"""
def __init__(self, type_weights: Dict[str, float], weight: float = 1.0):
super().__init__(weight=weight)
self.type_weights = type_weights
self._compile_patterns()
def _compile_patterns(self):
"""Prepare content type patterns"""
self.patterns = {
re.compile(pattern): weight for pattern, weight in self.type_weights.items()
}
def _calculate_score(self, url: str) -> float:
"""Calculate score based on content type matching"""
for pattern, weight in self.patterns.items():
if pattern.search(url):
return weight
return 0.0
class FastContentTypeScorer(FastURLScorer):
__slots__ = ('_weight', '_exact_types', '_regex_types')
def __init__(self, type_weights: Dict[str, float], weight: float = 1.0):
@@ -524,45 +330,6 @@ class FastContentTypeScorer(FastURLScorer):
return 0.0
class FreshnessScorer(URLScorer):
# PERF: Multiple regex compilations for each pattern
# PERF: Tries all patterns sequentially
# PERF: Regex pattern matching is expensive
# PERF: Int conversion and arithmetic for every match
# PERF: Repeated constant value (2024) hardcoded
# PERF: No URL caching
# PERF: Complex patterns with redundant groups
# PERF: Unnecessary list of patterns when could combine
"""Score URLs based on freshness indicators.
freshness_scorer = FreshnessScorer(weight=0.9)
Score based on date indicators in URLs
Multiple date format support
Recency weighting"""
def __init__(self, weight: float = 1.0):
super().__init__(weight=weight)
self.date_patterns = [
r"/(\d{4})/(\d{2})/(\d{2})/", # yyyy/mm/dd
r"(\d{4})[-_](\d{2})[-_](\d{2})", # yyyy-mm-dd
r"/(\d{4})/", # year only
]
self._compile_patterns()
def _compile_patterns(self):
"""Prepare date patterns"""
self.compiled_patterns = [re.compile(p) for p in self.date_patterns]
def _calculate_score(self, url: str) -> float:
"""Calculate score based on date indicators"""
for pattern in self.compiled_patterns:
if match := pattern.search(url):
year = int(match.group(1))
# Score higher for more recent years
return 1.0 - (2024 - year) * 0.1
return 0.5 # Default score for URLs without dates
class FastFreshnessScorer(FastURLScorer):
__slots__ = ('_weight', '_date_pattern', '_current_year')
def __init__(self, weight: float = 1.0, current_year: int = 2024):
@@ -645,41 +412,6 @@ class FastFreshnessScorer(FastURLScorer):
return max(0.1, 1.0 - year_diff * 0.1)
class DomainAuthorityScorer(URLScorer):
# PERF: URL parsing on every score calculation
# PERF: Repeated domain extraction
# PERF: Case conversion on every lookup
# PERF: Dict lookup without caching
# PERF: Processes full URL when only needs domain
# PERF: No fast path for common domains
# PERF: Netloc includes port which requires extra processing
"""Score URLs based on domain authority.
authority_scorer = DomainAuthorityScorer({
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
})
Score based on domain importance
Configurable domain weights
Default weight for unknown domains"""
def __init__(
self,
domain_weights: Dict[str, float],
default_weight: float = 0.5,
weight: float = 1.0,
):
super().__init__(weight=weight)
self.domain_weights = domain_weights
self.default_weight = default_weight
def _calculate_score(self, url: str) -> float:
"""Calculate score based on domain authority"""
domain = urlparse(url).netloc.lower()
return self.domain_weights.get(domain, self.default_weight)
class FastDomainAuthorityScorer(FastURLScorer):
__slots__ = ('_weight', '_domain_weights', '_default_weight', '_top_domains')
def __init__(
@@ -784,419 +516,4 @@ class FastDomainAuthorityScorer(FastURLScorer):
return score
# Regular path: check all domains
return self._domain_weights.get(domain, self._default_weight)
def create_balanced_scorer() -> CompositeScorer:
"""Create a balanced composite scorer"""
return CompositeScorer(
[
KeywordRelevanceScorer(
keywords=["article", "blog", "news", "research"], weight=1.0
),
PathDepthScorer(optimal_depth=3, weight=0.7),
ContentTypeScorer(
type_weights={r"\.html?$": 1.0, r"\.pdf$": 0.8, r"\.xml$": 0.6},
weight=0.8,
),
FreshnessScorer(weight=0.9),
]
)
def create_balanced_fast_freshness_scorer() -> CompositeScorer:
"""Create a balanced composite scorer with fast freshness scorer"""
return FastCompositeScorer(
[
FastKeywordRelevanceScorer(
keywords=["article", "blog", "news", "research"], weight=1.0
),
FastPathDepthScorer(optimal_depth=3, weight=0.7),
FastContentTypeScorer(
type_weights={r"\.html?$": 1.0, r"\.pdf$": 0.8, r"\.xml$": 0.6},
weight=0.8,
),
FastFreshnessScorer(weight=0.9),
]
)
# Example Usage:
"""
# Create a composite scorer
scorer = CompositeScorer([
KeywordRelevanceScorer(["python", "programming"], weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.7),
FreshnessScorer(weight=0.8),
DomainAuthorityScorer(
domain_weights={
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
},
weight=0.9
)
])
# Score a URL
score = scorer.score("https://python.org/article/2024/01/new-features")
# Access statistics
print(f"Average score: {scorer.stats.average_score}")
print(f"URLs scored: {scorer.stats.urls_scored}")
"""
def run_scorer_performance_test():
import time
import random
from itertools import cycle
import sys
# Generate varied test URLs
base_urls = [
# News/blog articles with dates
"https://example.com/2024/01/article-123",
"https://news.com/2023-12-31/breaking-news",
"https://blog.site.com/2022_11_15/tech-update",
# Different content types
"https://docs.example.com/report.pdf",
"https://site.com/page.html?q=test",
"https://api.service.com/data.json",
# Various domain authorities
"https://python.org/downloads",
"https://github.com/repo/code",
"https://medium.com/@user/post",
# Different path depths
"https://site.com/category/subcategory/product/detail",
"https://shop.com/items",
"https://edu.org/courses/cs/intro/lecture1",
]
# Create variations
test_urls = []
years = list(range(2020, 2025))
domains = ["example.com", "python.org", "github.com", "medium.com"]
extensions = ["html", "pdf", "php", "jsx"]
for base in base_urls:
test_urls.append(base)
# Add year variations
for year in years:
test_urls.append(f"https://blog.com/{year}/post-{random.randint(1,999)}")
# Add domain variations
for domain in domains:
test_urls.append(f"https://{domain}/article-{random.randint(1,999)}")
# Add extension variations
for ext in extensions:
test_urls.append(f"https://site.com/doc-{random.randint(1,999)}.{ext}")
# Multiply dataset
test_urls = test_urls * 5000 # Creates ~300k URLs
def benchmark(name: str, scorer, urls, warmup=True):
if warmup:
for url in urls[:100]: # Warmup with subset
scorer.score(url)
start = time.perf_counter_ns()
for url in urls:
scorer.score(url)
elapsed = (time.perf_counter_ns() - start) / 1_000_000 # Convert to ms
print(
f"{name:<35} {elapsed:>8.3f} ms ({len(urls)/elapsed*1000:,.0f} URLs/sec)"
)
return elapsed
print("\nBenchmarking original vs optimized scorers...")
print("-" * 75)
# Initialize test data
domain_weights = {"python.org": 1.0, "github.com": 0.9, "medium.com": 0.7}
type_weights = {".html$": 1.0, ".pdf$": 0.8, ".php$": 0.6}
keywords = ["python", "article", "blog", "docs"]
# Original implementations
keyword_scorer = KeywordRelevanceScorer(keywords=keywords, weight=1.0)
path_scorer = PathDepthScorer(optimal_depth=3, weight=0.7)
content_scorer = ContentTypeScorer(type_weights=type_weights, weight=0.8)
freshness_scorer = FreshnessScorer(weight=0.9)
domain_scorer = DomainAuthorityScorer(domain_weights=domain_weights, weight=1.0)
# Fast implementations
fast_keyword_scorer = FastKeywordRelevanceScorer(keywords=keywords, weight=1.0)
fast_path_scorer = FastPathDepthScorer(optimal_depth=3, weight=0.7)
fast_content_scorer = FastContentTypeScorer(type_weights=type_weights, weight=0.8)
fast_freshness_scorer = FastFreshnessScorer(weight=0.9)
fast_domain_scorer = FastDomainAuthorityScorer(domain_weights=domain_weights, weight=1.0)
# Test subset for individual scorers
test_subset = test_urls[:1000]
print("\nIndividual Scorer Performance (first 1000 URLs):")
print("\nKeyword Relevance Scorers:")
benchmark("Original Keyword Scorer", keyword_scorer, test_subset)
benchmark("Optimized Keyword Scorer", fast_keyword_scorer, test_subset)
print("\nPath Depth Scorers:")
benchmark("Original Path Scorer", path_scorer, test_subset)
benchmark("Optimized Path Scorer", fast_path_scorer, test_subset)
print("\nContent Type Scorers:")
benchmark("Original Content Scorer", content_scorer, test_subset)
benchmark("Optimized Content Scorer", fast_content_scorer, test_subset)
print("\nFreshness Scorers:")
benchmark("Original Freshness Scorer", freshness_scorer, test_subset)
benchmark("Optimized Freshness Scorer", fast_freshness_scorer, test_subset)
print("\nDomain Authority Scorers:")
benchmark("Original Domain Scorer", domain_scorer, test_subset)
benchmark("Optimized Domain Scorer", fast_domain_scorer, test_subset)
# Test composite scorers
print("\nComposite Scorer Performance (all URLs):")
original_composite = CompositeScorer([
keyword_scorer, path_scorer, content_scorer,
freshness_scorer, domain_scorer
])
fast_composite = FastCompositeScorer([
fast_keyword_scorer, fast_path_scorer, fast_content_scorer,
fast_freshness_scorer, fast_domain_scorer
])
benchmark("Original Composite Scorer", original_composite, test_urls)
benchmark("Optimized Composite Scorer", fast_composite, test_urls)
# Memory usage
print("\nMemory Usage per Scorer:")
print(f"Original Keyword Scorer: {sys.getsizeof(keyword_scorer):,} bytes")
print(f"Optimized Keyword Scorer: {sys.getsizeof(fast_keyword_scorer):,} bytes")
print(f"Original Path Scorer: {sys.getsizeof(path_scorer):,} bytes")
print(f"Optimized Path Scorer: {sys.getsizeof(fast_path_scorer):,} bytes")
print(f"Original Content Scorer: {sys.getsizeof(content_scorer):,} bytes")
print(f"Optimized Content Scorer: {sys.getsizeof(fast_content_scorer):,} bytes")
print(f"Original Freshness Scorer: {sys.getsizeof(freshness_scorer):,} bytes")
print(f"Optimized Freshness Scorer: {sys.getsizeof(fast_freshness_scorer):,} bytes")
print(f"Original Domain Scorer: {sys.getsizeof(domain_scorer):,} bytes")
print(f"Optimized Domain Scorer: {sys.getsizeof(fast_domain_scorer):,} bytes")
print(f"Original Composite: {sys.getsizeof(original_composite):,} bytes")
print(f"Optimized Composite: {sys.getsizeof(fast_composite):,} bytes")
def test_scorers():
import time
from itertools import chain
test_cases = [
# Keyword Scorer Tests
{
"scorer_type": "keyword",
"config": {
"keywords": ["python", "blog"],
"weight": 1.0,
"case_sensitive": False
},
"urls": {
"https://example.com/python-blog": 1.0,
"https://example.com/PYTHON-BLOG": 1.0,
"https://example.com/python-only": 0.5,
"https://example.com/other": 0.0
}
},
# Path Depth Scorer Tests
{
"scorer_type": "path_depth",
"config": {
"optimal_depth": 2,
"weight": 1.0
},
"urls": {
"https://example.com/a/b": 1.0,
"https://example.com/a": 0.5,
"https://example.com/a/b/c": 0.5,
"https://example.com": 0.33333333
}
},
# Content Type Scorer Tests
{
"scorer_type": "content_type",
"config": {
"type_weights": {
".html$": 1.0,
".pdf$": 0.8,
".jpg$": 0.6
},
"weight": 1.0
},
"urls": {
"https://example.com/doc.html": 1.0,
"https://example.com/doc.pdf": 0.8,
"https://example.com/img.jpg": 0.6,
"https://example.com/other.txt": 0.0
}
},
# Freshness Scorer Tests
{
"scorer_type": "freshness",
"config": {
"weight": 1.0, # Remove current_year since original doesn't support it
},
"urls": {
"https://example.com/2024/01/post": 1.0,
"https://example.com/2023/12/post": 0.9,
"https://example.com/2022/post": 0.8,
"https://example.com/no-date": 0.5
}
},
# Domain Authority Scorer Tests
{
"scorer_type": "domain",
"config": {
"domain_weights": {
"python.org": 1.0,
"github.com": 0.8,
"medium.com": 0.6
},
"default_weight": 0.3,
"weight": 1.0
},
"urls": {
"https://python.org/about": 1.0,
"https://github.com/repo": 0.8,
"https://medium.com/post": 0.6,
"https://unknown.com": 0.3
}
}
]
def create_scorer(scorer_type, config):
if scorer_type == "keyword":
return (
KeywordRelevanceScorer(**config),
FastKeywordRelevanceScorer(**config)
)
elif scorer_type == "path_depth":
return (
PathDepthScorer(**config),
FastPathDepthScorer(**config)
)
elif scorer_type == "content_type":
return (
ContentTypeScorer(**config),
FastContentTypeScorer(**config)
)
elif scorer_type == "freshness":
return (
FreshnessScorer(**config),
FastFreshnessScorer(**config, current_year=2024)
)
elif scorer_type == "domain":
return (
DomainAuthorityScorer(**config),
FastDomainAuthorityScorer(**config)
)
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for test_case in test_cases:
print(f"\nTesting {test_case['scorer_type']} scorer:")
original, fast = create_scorer(
test_case['scorer_type'],
test_case['config']
)
for url, expected in test_case['urls'].items():
orig_score = round(original.score(url), 8)
fast_score = round(fast.score(url), 8)
expected = round(expected, 8)
if abs(orig_score - expected) > 0.00001:
print(f"❌ Original Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {orig_score}")
all_passed = False
else:
print(f"✅ Original Passed: URL '{url}'")
if abs(fast_score - expected) > 0.00001:
print(f"❌ Fast Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {fast_score}")
all_passed = False
else:
print(f"✅ Fast Passed: URL '{url}'")
return all_passed
def run_composite_test():
print("\nTesting Composite Scorer:")
print("-" * 50)
# Create test data
test_urls = {
"https://python.org/blog/2024/01/new-release.html":0.86666667,
"https://github.com/repo/old-code.pdf": 0.62,
"https://unknown.com/random": 0.26
}
# Create composite scorers with all types
original_scorers = []
fast_scorers = []
for test_case in test_cases:
orig, fast = create_scorer(
test_case['scorer_type'],
test_case['config']
)
original_scorers.append(orig)
fast_scorers.append(fast)
original_composite = CompositeScorer(original_scorers, normalize=True)
fast_composite = FastCompositeScorer(fast_scorers, normalize=True)
all_passed = True
for url, expected in test_urls.items():
orig_score = round(original_composite.score(url), 8)
fast_score = round(fast_composite.score(url), 8)
if abs(orig_score - expected) > 0.00001:
print(f"❌ Original Composite Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {orig_score}")
all_passed = False
else:
print(f"✅ Original Composite Passed: URL '{url}'")
if abs(fast_score - expected) > 0.00001:
print(f"❌ Fast Composite Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {fast_score}")
all_passed = False
else:
print(f"✅ Fast Composite Passed: URL '{url}'")
return all_passed
# Run tests
print("Running Scorer Tests...")
accuracy_passed = run_accuracy_test()
composite_passed = run_composite_test()
if accuracy_passed and composite_passed:
print("\n✨ All tests passed!")
# Note: Already have performance tests in run_scorer_performance_test()
else:
print("\n❌ Some tests failed!")
if __name__ == "__main__":
run_scorer_performance_test()
# test_scorers()
return self._domain_weights.get(domain, self._default_weight)

View File

@@ -510,6 +510,7 @@ class HTML2Text(html.parser.HTMLParser):
if tag == "a" and not self.ignore_links:
if start:
self.inside_link = True
if (
"href" in attrs
and attrs["href"] is not None
@@ -526,6 +527,7 @@ class HTML2Text(html.parser.HTMLParser):
else:
self.astack.append(None)
else:
self.inside_link = False
if self.astack:
a = self.astack.pop()
if self.maybe_automatic_link and not self.empty_link:
@@ -610,13 +612,22 @@ class HTML2Text(html.parser.HTMLParser):
self.o("[" + str(a_props.count) + "]")
if tag == "dl" and start:
self.p()
if tag == "dt" and not start:
self.pbr()
if tag == "dd" and start:
self.o(" ")
if tag == "dd" and not start:
self.pbr()
self.p() # Add paragraph break before list starts
self.p_p = 0 # Reset paragraph state
elif tag == "dt" and start:
if self.p_p == 0: # If not first term
self.o("\n\n") # Add spacing before new term-definition pair
self.p_p = 0 # Reset paragraph state
elif tag == "dt" and not start:
self.o("\n") # Single newline between term and definition
elif tag == "dd" and start:
self.o(" ") # Indent definition
elif tag == "dd" and not start:
self.p_p = 0
if tag in ["ol", "ul"]:
# Google Docs create sub lists as top level lists
@@ -1026,6 +1037,7 @@ class CustomHTML2Text(HTML2Text):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.inside_link = False
self.preserve_tags = set() # Set of tags to preserve
self.current_preserved_tag = None
self.preserved_content = []
@@ -1105,11 +1117,17 @@ class CustomHTML2Text(HTML2Text):
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o("`") # Markdown inline code start
if not self.inside_link:
self.o("`") # Only output backtick if not inside a link
self.inside_code = True
else:
self.o("`") # Markdown inline code end
if not self.inside_link:
self.o("`") # Only output backtick if not inside a link
self.inside_code = False
# If inside a link, let the parent class handle the content
if self.inside_link:
super().handle_tag(tag, attrs, start)
else:
super().handle_tag(tag, attrs, start)

View File

@@ -179,7 +179,7 @@ class DefaultMarkdownGenerator(MarkdownGenerationStrategy):
"ignore_emphasis": False,
"ignore_links": False,
"ignore_images": False,
"protect_links": True,
"protect_links": False,
"single_line_break": True,
"mark_code": True,
"escape_snob": False,

View File

@@ -198,7 +198,7 @@ Avoid Common Mistakes:
- Do NOT add any comments using "//" or "#" in the JSON output. It causes parsing errors.
- Make sure the JSON is properly formatted with curly braces, square brackets, and commas in the right places.
- Do not miss closing </blocks> tag at the end of the JSON output.
- Do not generate the Python coee show me how to do the task, this is your task to extract the information and return it in JSON format.
- Do not generate the Python code show me how to do the task, this is your task to extract the information and return it in JSON format.
Result
Output the final list of JSON objects, wrapped in <blocks>...</blocks> XML tags. Make sure to close the tag properly."""

View File

@@ -7,8 +7,8 @@ Crawl4AI offers multiple power-user features that go beyond simple crawling. Thi
2. **Capturing PDFs & Screenshots**
3. **Handling SSL Certificates**
4. **Custom Headers**
5. **Session Persistence & Local Storage**
6. **Robots.txt Compliance**
5. **Session Persistence & Local Storage**
6. **Robots.txt Compliance**
> **Prerequisites**
> - You have a basic grasp of [AsyncWebCrawler Basics](../core/simple-crawling.md)

View File

@@ -168,10 +168,10 @@ async def main():
"name": "News Items",
"baseSelector": "tr.athing",
"fields": [
{"name": "title", "selector": "a.storylink", "type": "text"},
{"name": "title", "selector": "span.titleline a", "type": "text"},
{
"name": "link",
"selector": "a.storylink",
"selector": "span.titleline a",
"type": "attribute",
"attribute": "href"
}

View File

@@ -135,14 +135,14 @@ html = "<div class='product'><h2>Gaming Laptop</h2><span class='price'>$999.99</
# Using OpenAI (requires API token)
schema = JsonCssExtractionStrategy.generate_schema(
html,
llm_provider="openai/gpt-4o", # Default provider
provider="openai/gpt-4o", # Default provider
api_token="your-openai-token" # Required for OpenAI
)
# Or using Ollama (open source, no token needed)
schema = JsonCssExtractionStrategy.generate_schema(
html,
llm_provider="ollama/llama3.3", # Open source alternative
provider="ollama/llama3.3", # Open source alternative
api_token=None # Not needed for Ollama
)

View File

@@ -434,7 +434,7 @@ html = """
css_schema = JsonCssExtractionStrategy.generate_schema(
html,
schema_type="css", # This is the default
llm_provider="openai/gpt-4o", # Default provider
provider="openai/gpt-4o", # Default provider
api_token="your-openai-token" # Required for OpenAI
)
@@ -442,7 +442,7 @@ css_schema = JsonCssExtractionStrategy.generate_schema(
xpath_schema = JsonXPathExtractionStrategy.generate_schema(
html,
schema_type="xpath",
llm_provider="ollama/llama3.3", # Open source alternative
provider="ollama/llama3.3", # Open source alternative
api_token=None # Not needed for Ollama
)

View File

@@ -0,0 +1,46 @@
import asyncio
import time
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, CacheMode
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy
from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter, DomainFilter, ContentTypeFilter, ContentRelevanceFilter
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
# from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy
async def main():
"""Example deep crawl of documentation site."""
filter_chain = FilterChain([
URLPatternFilter(patterns=["*2025*"]),
DomainFilter(allowed_domains=["techcrunch.com"]),
ContentRelevanceFilter(query="Use of artificial intelligence in Defence applications", threshold=1),
ContentTypeFilter(allowed_types=["text/html","application/javascript"])
])
config = CrawlerRunConfig(
deep_crawl_strategy = BestFirstCrawlingStrategy(
max_depth=2,
include_external=False,
filter_chain=filter_chain,
url_scorer=KeywordRelevanceScorer(keywords=["anduril", "defence", "AI"]),
),
stream=False,
verbose=True,
cache_mode=CacheMode.BYPASS,
scraping_strategy=LXMLWebScrapingStrategy()
)
async with AsyncWebCrawler() as crawler:
print("Starting deep crawl in streaming mode:")
config.stream = True
start_time = time.perf_counter()
async for result in await crawler.arun(
url="https://techcrunch.com",
config=config
):
print(f"{result.url} (Depth: {result.metadata.get('depth', 0)})")
print(f"Duration: {time.perf_counter() - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,279 @@
from crawl4ai.deep_crawling.filters import ContentRelevanceFilter, URLPatternFilter, DomainFilter, ContentTypeFilter, SEOFilter
async def test_pattern_filter():
# Test cases as list of tuples instead of dict for multiple patterns
test_cases = [
# Simple suffix patterns (*.html)
("*.html", {
"https://example.com/page.html": True,
"https://example.com/path/doc.html": True,
"https://example.com/page.htm": False,
"https://example.com/page.html?param=1": True,
}),
# Path prefix patterns (/foo/*)
("*/article/*", {
"https://example.com/article/123": True,
"https://example.com/blog/article/456": True,
"https://example.com/articles/789": False,
"https://example.com/article": False,
}),
# Complex patterns
("blog-*-[0-9]", {
"https://example.com/blog-post-1": True,
"https://example.com/blog-test-9": True,
"https://example.com/blog-post": False,
"https://example.com/blog-post-x": False,
}),
# Multiple patterns case
(["*.pdf", "*/download/*"], {
"https://example.com/doc.pdf": True,
"https://example.com/download/file.txt": True,
"https://example.com/path/download/doc": True,
"https://example.com/uploads/file.txt": False,
}),
# Edge cases
("*", {
"https://example.com": True,
"": True,
"http://test.com/path": True,
}),
# Complex regex
(r"^https?://.*\.example\.com/\d+", {
"https://sub.example.com/123": True,
"http://test.example.com/456": True,
"https://example.com/789": False,
"https://sub.example.com/abc": False,
})
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for patterns, test_urls in test_cases:
filter_obj = URLPatternFilter(patterns)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"❌ Failed: Pattern '{patterns}' with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"✅ Passed: Pattern '{patterns}' with URL '{url}'")
return all_passed
# Run tests
print("Running Pattern Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n✨ All accuracy tests passed!")
else:
print("\n❌ Some accuracy tests failed!")
async def test_domain_filter():
from itertools import chain
# Test cases
test_cases = [
# Allowed domains
({"allowed": "example.com"}, {
"https://example.com/page": True,
"http://example.com": True,
"https://sub.example.com": False,
"https://other.com": False,
}),
({"allowed": ["example.com", "test.com"]}, {
"https://example.com/page": True,
"https://test.com/home": True,
"https://other.com": False,
}),
# Blocked domains
({"blocked": "malicious.com"}, {
"https://malicious.com": False,
"https://safe.com": True,
"http://malicious.com/login": False,
}),
({"blocked": ["spam.com", "ads.com"]}, {
"https://spam.com": False,
"https://ads.com/banner": False,
"https://example.com": True,
}),
# Allowed and Blocked combination
({"allowed": "example.com", "blocked": "sub.example.com"}, {
"https://example.com": True,
"https://sub.example.com": False,
"https://other.com": False,
}),
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for params, test_urls in test_cases:
filter_obj = DomainFilter(
allowed_domains=params.get("allowed"),
blocked_domains=params.get("blocked"),
)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"\u274C Failed: Params {params} with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: Params {params} with URL '{url}'")
return all_passed
# Run tests
print("Running Domain Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n\u2728 All accuracy tests passed!")
else:
print("\n\u274C Some accuracy tests failed!")
async def test_content_relevance_filter():
relevance_filter = ContentRelevanceFilter(
query="What was the cause of american civil war?",
threshold=1
)
test_cases = {
"https://en.wikipedia.org/wiki/Cricket": False,
"https://en.wikipedia.org/wiki/American_Civil_War": True,
}
print("\nRunning Content Relevance Filter Tests...")
print("-" * 50)
all_passed = True
for url, expected in test_cases.items():
result = await relevance_filter.apply(url)
if result != expected:
print(f"\u274C Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: URL '{url}'")
if all_passed:
print("\n\u2728 All content relevance tests passed!")
else:
print("\n\u274C Some content relevance tests failed!")
async def test_content_type_filter():
from itertools import chain
# Test cases
test_cases = [
# Allowed single type
({"allowed": "image/png"}, {
"https://example.com/image.png": True,
"https://example.com/photo.jpg": False,
"https://example.com/document.pdf": False,
}),
# Multiple allowed types
({"allowed": ["image/jpeg", "application/pdf"]}, {
"https://example.com/photo.jpg": True,
"https://example.com/document.pdf": True,
"https://example.com/script.js": False,
}),
# No extension should be allowed
({"allowed": "application/json"}, {
"https://example.com/api/data": True,
"https://example.com/data.json": True,
"https://example.com/page.html": False,
}),
# Unknown extensions should not be allowed
({"allowed": "application/octet-stream"}, {
"https://example.com/file.unknown": True,
"https://example.com/archive.zip": False,
"https://example.com/software.exe": False,
}),
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for params, test_urls in test_cases:
filter_obj = ContentTypeFilter(
allowed_types=params.get("allowed"),
)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"\u274C Failed: Params {params} with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: Params {params} with URL '{url}'")
return all_passed
# Run tests
print("Running Content Type Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n\u2728 All accuracy tests passed!")
else:
print("\n\u274C Some accuracy tests failed!")
async def test_seo_filter():
seo_filter = SEOFilter(threshold=0.5, keywords=["SEO", "search engines", "Optimization"])
test_cases = {
"https://en.wikipedia.org/wiki/Search_engine_optimization": True,
"https://en.wikipedia.org/wiki/Randomness": False,
}
print("\nRunning SEO Filter Tests...")
print("-" * 50)
all_passed = True
for url, expected in test_cases.items():
result = await seo_filter.apply(url)
if result != expected:
print(f"\u274C Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: URL '{url}'")
if all_passed:
print("\n\u2728 All SEO filter tests passed!")
else:
print("\n\u274C Some SEO filter tests failed!")
import asyncio
if __name__ == "__main__":
asyncio.run(test_pattern_filter())
asyncio.run(test_domain_filter())
asyncio.run(test_content_type_filter())
asyncio.run(test_content_relevance_filter())
asyncio.run(test_seo_filter())

View File

@@ -0,0 +1,179 @@
from crawl4ai.deep_crawling.scorers import CompositeScorer, ContentTypeScorer, DomainAuthorityScorer, FreshnessScorer, KeywordRelevanceScorer, PathDepthScorer
def test_scorers():
test_cases = [
# Keyword Scorer Tests
{
"scorer_type": "keyword",
"config": {
"keywords": ["python", "blog"],
"weight": 1.0,
"case_sensitive": False
},
"urls": {
"https://example.com/python-blog": 1.0,
"https://example.com/PYTHON-BLOG": 1.0,
"https://example.com/python-only": 0.5,
"https://example.com/other": 0.0
}
},
# Path Depth Scorer Tests
{
"scorer_type": "path_depth",
"config": {
"optimal_depth": 2,
"weight": 1.0
},
"urls": {
"https://example.com/a/b": 1.0,
"https://example.com/a": 0.5,
"https://example.com/a/b/c": 0.5,
"https://example.com": 0.33333333
}
},
# Content Type Scorer Tests
{
"scorer_type": "content_type",
"config": {
"type_weights": {
".html$": 1.0,
".pdf$": 0.8,
".jpg$": 0.6
},
"weight": 1.0
},
"urls": {
"https://example.com/doc.html": 1.0,
"https://example.com/doc.pdf": 0.8,
"https://example.com/img.jpg": 0.6,
"https://example.com/other.txt": 0.0
}
},
# Freshness Scorer Tests
{
"scorer_type": "freshness",
"config": {
"weight": 1.0, # Remove current_year since original doesn't support it
},
"urls": {
"https://example.com/2024/01/post": 1.0,
"https://example.com/2023/12/post": 0.9,
"https://example.com/2022/post": 0.8,
"https://example.com/no-date": 0.5
}
},
# Domain Authority Scorer Tests
{
"scorer_type": "domain",
"config": {
"domain_weights": {
"python.org": 1.0,
"github.com": 0.8,
"medium.com": 0.6
},
"default_weight": 0.3,
"weight": 1.0
},
"urls": {
"https://python.org/about": 1.0,
"https://github.com/repo": 0.8,
"https://medium.com/post": 0.6,
"https://unknown.com": 0.3
}
}
]
def create_scorer(scorer_type, config):
if scorer_type == "keyword":
return KeywordRelevanceScorer(**config)
elif scorer_type == "path_depth":
return PathDepthScorer(**config)
elif scorer_type == "content_type":
return ContentTypeScorer(**config)
elif scorer_type == "freshness":
return FreshnessScorer(**config,current_year=2024)
elif scorer_type == "domain":
return DomainAuthorityScorer(**config)
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for test_case in test_cases:
print(f"\nTesting {test_case['scorer_type']} scorer:")
scorer = create_scorer(
test_case['scorer_type'],
test_case['config']
)
for url, expected in test_case['urls'].items():
score = round(scorer.score(url), 8)
expected = round(expected, 8)
if abs(score - expected) > 0.00001:
print(f"❌ Scorer Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {score}")
all_passed = False
else:
print(f"✅ Scorer Passed: URL '{url}'")
return all_passed
def run_composite_test():
print("\nTesting Composite Scorer:")
print("-" * 50)
# Create test data
test_urls = {
"https://python.org/blog/2024/01/new-release.html":0.86666667,
"https://github.com/repo/old-code.pdf": 0.62,
"https://unknown.com/random": 0.26
}
# Create composite scorers with all types
scorers = []
for test_case in test_cases:
scorer = create_scorer(
test_case['scorer_type'],
test_case['config']
)
scorers.append(scorer)
composite = CompositeScorer(scorers, normalize=True)
all_passed = True
for url, expected in test_urls.items():
score = round(composite.score(url), 8)
if abs(score - expected) > 0.00001:
print(f"❌ Composite Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {score}")
all_passed = False
else:
print(f"✅ Composite Passed: URL '{url}'")
return all_passed
# Run tests
print("Running Scorer Tests...")
accuracy_passed = run_accuracy_test()
composite_passed = run_composite_test()
if accuracy_passed and composite_passed:
print("\n✨ All tests passed!")
# Note: Already have performance tests in run_scorer_performance_test()
else:
print("\n❌ Some tests failed!")
if __name__ == "__main__":
test_scorers()