diff --git a/crawl4ai/deep_crawling/__init__.py b/crawl4ai/deep_crawling/__init__.py index c6c10f81..bbe27eff 100644 --- a/crawl4ai/deep_crawling/__init__.py +++ b/crawl4ai/deep_crawling/__init__.py @@ -1,172 +1,11 @@ -# deep_crawl_strategy.py -from abc import ABC, abstractmethod -from typing import AsyncGenerator, Optional, Set, List, Dict, TypeVar, Union - -from ..models import CrawlResult -from typing import TYPE_CHECKING -from functools import wraps -from contextvars import ContextVar - -if TYPE_CHECKING: - from ..async_configs import CrawlerRunConfig - from ..async_webcrawler import AsyncWebCrawler - from .bfs_strategy import BFSDeepCrawlStrategy - -CrawlResultT = TypeVar("CrawlResultT", bound=CrawlResult) -# In batch mode we return List[CrawlResult] and in stream mode an AsyncGenerator. -RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] - - -class DeepCrawlDecorator: - """Decorator that adds deep crawling capability to arun method.""" - deep_crawl_active = ContextVar("deep_crawl_active", default=False) - - def __init__(self, crawler: "AsyncWebCrawler"): - self.crawler = crawler - - def __call__(self, original_arun): - @wraps(original_arun) - async def wrapped_arun(url: str, config: Optional["CrawlerRunConfig"] = None, **kwargs): - # If deep crawling is already active, call the original method to avoid recursion. - if config and config.deep_crawl_strategy and not self.deep_crawl_active.get(): - token = self.deep_crawl_active.set(True) - # Await the arun call to get the actual result object. - result_obj = await config.deep_crawl_strategy.arun( - crawler=self.crawler, - start_url=url, - config=config - ) - if config.stream: - async def result_wrapper(): - try: - async for result in result_obj: - yield result - finally: - self.deep_crawl_active.reset(token) - return result_wrapper() - else: - try: - return result_obj - finally: - self.deep_crawl_active.reset(token) - return await original_arun(url, config=config, **kwargs) - return wrapped_arun - -class DeepCrawlStrategy(ABC): - """ - Abstract base class for deep crawling strategies. - - Core functions: - - arun: Main entry point that returns an async generator of CrawlResults. - - shutdown: Clean up resources. - - can_process_url: Validate a URL and decide whether to process it. - - _process_links: Extract and process links from a CrawlResult. - """ - - @abstractmethod - async def _arun_batch( - self, - start_url: str, - crawler: "AsyncWebCrawler", - config: "CrawlerRunConfig", - ) -> List[CrawlResult]: - """ - Batch (non-streaming) mode: - Processes one BFS level at a time, then yields all the results. - """ - pass - - @abstractmethod - async def _arun_stream( - self, - start_url: str, - crawler: "AsyncWebCrawler", - config: "CrawlerRunConfig", - ) -> AsyncGenerator[CrawlResult, None]: - """ - Streaming mode: - Processes one BFS level at a time and yields results immediately as they arrive. - """ - pass - - async def arun( - self, - start_url: str, - crawler: "AsyncWebCrawler", - config: Optional["CrawlerRunConfig"] = None, - ) -> RunManyReturn: - """ - Traverse the given URL using the specified crawler. - - Args: - start_url (str): The URL from which to start crawling. - crawler (AsyncWebCrawler): The crawler instance to use. - crawler_run_config (Optional[CrawlerRunConfig]): Crawler configuration. - - Returns: - AsyncGenerator[CrawlResult, None]: An async generator yielding crawl results. - """ - if config is None: - raise ValueError("CrawlerRunConfig must be provided") - - if config.stream: - return self._arun_stream(start_url, crawler, config) - else: - return await self._arun_batch(start_url, crawler, config) - - @abstractmethod - async def shutdown(self) -> None: - """ - Clean up resources used by the deep crawl strategy. - """ - pass - - @abstractmethod - async def can_process_url(self, url: str, depth: int) -> bool: - """ - Validate the URL format and apply custom filtering logic. - - Args: - url (str): The URL to validate. - depth (int): The current depth in the crawl. - - Returns: - bool: True if the URL should be processed, False otherwise. - """ - pass - - @abstractmethod - async def link_discovery( - self, - result: CrawlResult, - source_url: str, - current_depth: int, - visited: Set[str], - next_level: List[tuple], - depths: Dict[str, int], - ) -> None: - """ - Extract and process links from the given crawl result. - - This method should: - - Validate each extracted URL using can_process_url. - - Optionally score URLs. - - Append valid URLs (and their parent references) to the next_level list. - - Update the depths dictionary with the new depth for each URL. - - Args: - result (CrawlResult): The result from a crawl operation. - source_url (str): The URL from which this result was obtained. - current_depth (int): The depth at which the source URL was processed. - visited (Set[str]): Set of already visited URLs. - next_level (List[tuple]): List of tuples (url, parent_url) for the next BFS level. - depths (Dict[str, int]): Mapping of URLs to their current depth. - """ - pass - +# deep_crawling/__init__.py +from .base_strategy import DeepCrawlDecorator, DeepCrawlStrategy +from .bfs_strategy import BFSDeepCrawlStrategy +from .bff_strategy import BestFirstCrawlingStrategy __all__ = [ "DeepCrawlDecorator", "DeepCrawlStrategy", - "BFSDeepCrawlStrategy" + "BFSDeepCrawlStrategy", + "BestFirstCrawlingStrategy", ] \ No newline at end of file diff --git a/crawl4ai/deep_crawling/base_strategy.py b/crawl4ai/deep_crawling/base_strategy.py new file mode 100644 index 00000000..b7564bcd --- /dev/null +++ b/crawl4ai/deep_crawling/base_strategy.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import AsyncGenerator, Optional, Set, List, Dict +from functools import wraps +from contextvars import ContextVar +from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn + + +class DeepCrawlDecorator: + """Decorator that adds deep crawling capability to arun method.""" + deep_crawl_active = ContextVar("deep_crawl_active", default=False) + + def __init__(self, crawler: AsyncWebCrawler): + self.crawler = crawler + + def __call__(self, original_arun): + @wraps(original_arun) + async def wrapped_arun(url: str, config: Optional[CrawlerRunConfig] = None, **kwargs): + # If deep crawling is already active, call the original method to avoid recursion. + if config and config.deep_crawl_strategy and not self.deep_crawl_active.get(): + token = self.deep_crawl_active.set(True) + # Await the arun call to get the actual result object. + result_obj = await config.deep_crawl_strategy.arun( + crawler=self.crawler, + start_url=url, + config=config + ) + if config.stream: + async def result_wrapper(): + try: + async for result in result_obj: + yield result + finally: + self.deep_crawl_active.reset(token) + return result_wrapper() + else: + try: + return result_obj + finally: + self.deep_crawl_active.reset(token) + return await original_arun(url, config=config, **kwargs) + return wrapped_arun + +class DeepCrawlStrategy(ABC): + """ + Abstract base class for deep crawling strategies. + + Core functions: + - arun: Main entry point that returns an async generator of CrawlResults. + - shutdown: Clean up resources. + - can_process_url: Validate a URL and decide whether to process it. + - _process_links: Extract and process links from a CrawlResult. + """ + + @abstractmethod + async def _arun_batch( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> List[CrawlResult]: + """ + Batch (non-streaming) mode: + Processes one BFS level at a time, then yields all the results. + """ + pass + + @abstractmethod + async def _arun_stream( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlResult, None]: + """ + Streaming mode: + Processes one BFS level at a time and yields results immediately as they arrive. + """ + pass + + async def arun( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: Optional[CrawlerRunConfig] = None, + ) -> RunManyReturn: + """ + Traverse the given URL using the specified crawler. + + Args: + start_url (str): The URL from which to start crawling. + crawler (AsyncWebCrawler): The crawler instance to use. + crawler_run_config (Optional[CrawlerRunConfig]): Crawler configuration. + + Returns: + AsyncGenerator[CrawlResult, None]: An async generator yielding crawl results. + """ + if config is None: + raise ValueError("CrawlerRunConfig must be provided") + + if config.stream: + return self._arun_stream(start_url, crawler, config) + else: + return await self._arun_batch(start_url, crawler, config) + + @abstractmethod + async def shutdown(self) -> None: + """ + Clean up resources used by the deep crawl strategy. + """ + pass + + @abstractmethod + async def can_process_url(self, url: str, depth: int) -> bool: + """ + Validate the URL format and apply custom filtering logic. + + Args: + url (str): The URL to validate. + depth (int): The current depth in the crawl. + + Returns: + bool: True if the URL should be processed, False otherwise. + """ + pass + + @abstractmethod + async def link_discovery( + self, + result: CrawlResult, + source_url: str, + current_depth: int, + visited: Set[str], + next_level: List[tuple], + depths: Dict[str, int], + ) -> None: + """ + Extract and process links from the given crawl result. + + This method should: + - Validate each extracted URL using can_process_url. + - Optionally score URLs. + - Append valid URLs (and their parent references) to the next_level list. + - Update the depths dictionary with the new depth for each URL. + + Args: + result (CrawlResult): The result from a crawl operation. + source_url (str): The URL from which this result was obtained. + current_depth (int): The depth at which the source URL was processed. + visited (Set[str]): Set of already visited URLs. + next_level (List[tuple]): List of tuples (url, parent_url) for the next BFS level. + depths (Dict[str, int]): Mapping of URLs to their current depth. + """ + pass + diff --git a/crawl4ai/deep_crawling/bff_strategy.py b/crawl4ai/deep_crawling/bff_strategy.py new file mode 100644 index 00000000..aef38881 --- /dev/null +++ b/crawl4ai/deep_crawling/bff_strategy.py @@ -0,0 +1,221 @@ +# best_first_crawling_strategy.py +import asyncio +import logging +from datetime import datetime +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple +from urllib.parse import urlparse + +from ..models import TraversalStats +from .filters import FastFilterChain +from .scorers import FastURLScorer +from . import DeepCrawlStrategy + +from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn + + +# Configurable batch size for processing items from the priority queue +BATCH_SIZE = 10 + + +class BestFirstCrawlingStrategy(DeepCrawlStrategy): + """ + Best-First Crawling Strategy using a priority queue. + + This strategy prioritizes URLs based on their score, ensuring that higher-value + pages are crawled first. It reimplements the core traversal loop to use a priority + queue while keeping URL validation and link discovery consistent with our design. + + Core methods: + - arun: Returns either a list (batch mode) or an async generator (stream mode). + - _arun_best_first: Core generator that uses a priority queue to yield CrawlResults. + - can_process_url: Validates URLs and applies filtering (inherited behavior). + - link_discovery: Extracts and validates links from a CrawlResult. + """ + def __init__( + self, + max_depth: int, + filter_chain: FastFilterChain = FastFilterChain(), + url_scorer: Optional[FastURLScorer] = None, + include_external: bool = False, + logger: Optional[logging.Logger] = None, + ): + self.max_depth = max_depth + self.filter_chain = filter_chain + self.url_scorer = url_scorer + self.include_external = include_external + self.logger = logger or logging.getLogger(__name__) + self.stats = TraversalStats(start_time=datetime.now()) + self._cancel_event = asyncio.Event() + + async def can_process_url(self, url: str, depth: int) -> bool: + """ + Validate the URL format and apply filtering. + For the starting URL (depth 0), filtering is bypassed. + """ + try: + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + raise ValueError("Missing scheme or netloc") + if parsed.scheme not in ("http", "https"): + raise ValueError("Invalid scheme") + if "." not in parsed.netloc: + raise ValueError("Invalid domain") + except Exception as e: + self.logger.warning(f"Invalid URL: {url}, error: {e}") + return False + + if depth != 0 and not self.filter_chain.apply(url): + return False + + return True + + async def link_discovery( + self, + result: CrawlResult, + source_url: str, + current_depth: int, + visited: Set[str], + next_links: List[Tuple[str, Optional[str]]], + depths: Dict[str, int], + ) -> None: + """ + Extract links from the crawl result, validate them, and append new URLs + (with their parent references) to next_links. + Also updates the depths dictionary. + """ + new_depth = current_depth + 1 + if new_depth > self.max_depth: + return + + # Retrieve internal links; include external links if enabled. + links = result.links.get("internal", []) + if self.include_external: + links += result.links.get("external", []) + + for link in links: + url = link.get("href") + if url in visited: + continue + if not await self.can_process_url(url, new_depth): + self.stats.urls_skipped += 1 + continue + + # Record the new depth. + depths[url] = new_depth + next_links.append((url, source_url)) + + async def _arun_best_first( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlResult, None]: + """ + Core best-first crawl method using a priority queue. + + The queue items are tuples of (score, depth, url, parent_url). Lower scores + are treated as higher priority. URLs are processed in batches for efficiency. + """ + queue: asyncio.PriorityQueue = asyncio.PriorityQueue() + # Push the initial URL with score 0 and depth 0. + await queue.put((0, 0, start_url, None)) + visited: Set[str] = set() + depths: Dict[str, int] = {start_url: 0} + + while not queue.empty() and not self._cancel_event.is_set(): + batch: List[Tuple[float, int, str, Optional[str]]] = [] + # Retrieve up to BATCH_SIZE items from the priority queue. + for _ in range(BATCH_SIZE): + if queue.empty(): + break + item = await queue.get() + score, depth, url, parent_url = item + if url in visited: + continue + visited.add(url) + batch.append(item) + + if not batch: + continue + + # Process the current batch of URLs. + urls = [item[2] for item in batch] + batch_config = config.clone(deep_crawl_strategy=None, stream=True) + stream_gen = await crawler.arun_many(urls=urls, config=batch_config) + async for result in stream_gen: + result_url = result.url + # Find the corresponding tuple from the batch. + corresponding = next((item for item in batch if item[2] == result_url), None) + if not corresponding: + continue + score, depth, url, parent_url = corresponding + result.metadata = result.metadata or {} + result.metadata["depth"] = depth + result.metadata["parent_url"] = parent_url + result.metadata["score"] = score + yield result + # Discover new links from this result. + new_links: List[Tuple[str, Optional[str]]] = [] + await self.link_discovery(result, result_url, depth, visited, new_links, depths) + for new_url, new_parent in new_links: + new_depth = depths.get(new_url, depth + 1) + new_score = self.url_scorer.score(new_url) if self.url_scorer else 0 + await queue.put((new_score, new_depth, new_url, new_parent)) + + # End of crawl. + + async def _arun_batch( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> List[CrawlResult]: + """ + Best-first crawl in batch mode. + + Aggregates all CrawlResults into a list. + """ + results: List[CrawlResult] = [] + async for result in self._arun_best_first(start_url, crawler, config): + results.append(result) + return results + + async def _arun_stream( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, + ) -> AsyncGenerator[CrawlResult, None]: + """ + Best-first crawl in streaming mode. + + Yields CrawlResults as they become available. + """ + async for result in self._arun_best_first(start_url, crawler, config): + yield result + + async def arun( + self, + start_url: str, + crawler: AsyncWebCrawler, + config: Optional[CrawlerRunConfig] = None, + ) -> "RunManyReturn": + """ + Main entry point for best-first crawling. + + Returns either a list (batch mode) or an async generator (stream mode) + of CrawlResults. + """ + if config is None: + raise ValueError("CrawlerRunConfig must be provided") + if config.stream: + return self._arun_stream(start_url, crawler, config) + else: + return await self._arun_batch(start_url, crawler, config) + + async def shutdown(self) -> None: + """ + Signal cancellation and clean up resources. + """ + self._cancel_event.set() + self.stats.end_time = datetime.now() diff --git a/crawl4ai/deep_crawling/bfs_strategy.py b/crawl4ai/deep_crawling/bfs_strategy.py index 5d45145e..fd2a7aae 100644 --- a/crawl4ai/deep_crawling/bfs_strategy.py +++ b/crawl4ai/deep_crawling/bfs_strategy.py @@ -5,15 +5,11 @@ from datetime import datetime from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple from urllib.parse import urlparse -from ..models import CrawlResult, TraversalStats +from ..models import TraversalStats from .filters import FastFilterChain from .scorers import FastURLScorer - -from typing import TYPE_CHECKING from . import DeepCrawlStrategy -if TYPE_CHECKING: - from ..async_configs import CrawlerRunConfig - from ..async_webcrawler import AsyncWebCrawler +from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult class BFSDeepCrawlStrategy(DeepCrawlStrategy): """ @@ -107,8 +103,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): async def _arun_batch( self, start_url: str, - crawler: "AsyncWebCrawler", - config: "CrawlerRunConfig", + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, ) -> List[CrawlResult]: """ Batch (non-streaming) mode: @@ -148,8 +144,8 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy): async def _arun_stream( self, start_url: str, - crawler: "AsyncWebCrawler", - config: "CrawlerRunConfig", + crawler: AsyncWebCrawler, + config: CrawlerRunConfig, ) -> AsyncGenerator[CrawlResult, None]: """ Streaming mode: diff --git a/crawl4ai/deep_crawling/crazy.py b/crawl4ai/deep_crawling/crazy.py index 4b94739f..4ba94d65 100644 --- a/crawl4ai/deep_crawling/crazy.py +++ b/crawl4ai/deep_crawling/crazy.py @@ -1,12 +1,12 @@ from __future__ import annotations # I just got crazy, trying to wrute K&R C but in Python. Right now I feel like I'm in a quantum state. +# I probably won't use this; I just want to leave it here. A century later, the future human race will be like, "WTF?" -# from typing import TYPE_CHECKING +# ------ Imports That Will Make You Question Reality ------ # from functools import wraps from contextvars import ContextVar import inspect -from httpx import get from crawl4ai import CacheMode from crawl4ai.async_configs import CrawlerRunConfig from crawl4ai.models import CrawlResult, TraversalStats diff --git a/crawl4ai/deep_crawling/dfs_strategy.py b/crawl4ai/deep_crawling/dfs_strategy.py new file mode 100644 index 00000000..20a6e466 --- /dev/null +++ b/crawl4ai/deep_crawling/dfs_strategy.py @@ -0,0 +1,95 @@ +# dfs_deep_crawl_strategy.py +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple + +from ..models import CrawlResult +from .bfs_strategy import BFSDeepCrawlStrategy # Inherit common logic: can_process_url, link_discovery, etc. + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from ..async_configs import CrawlerRunConfig + from ..async_webcrawler import AsyncWebCrawler + + +class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): + """ + Depth-First Search (DFS) deep crawling strategy. + + Inherits URL validation and link discovery from BFSDeepCrawlStrategy. + Overrides _arun_batch and _arun_stream to use a stack (LIFO) for DFS traversal. + """ + async def _arun_batch( + self, + start_url: str, + crawler: "AsyncWebCrawler", + config: "CrawlerRunConfig", + ) -> List[CrawlResult]: + """ + Batch (non-streaming) DFS mode. + Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list. + """ + visited: Set[str] = set() + # Stack items: (url, parent_url, depth) + stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] + depths: Dict[str, int] = {start_url: 0} + results: List[CrawlResult] = [] + + while stack and not self._cancel_event.is_set(): + url, parent, depth = stack.pop() + if url in visited or depth > self.max_depth: + continue + visited.add(url) + + # Clone config to disable recursive deep crawling. + batch_config = config.clone(deep_crawl_strategy=None, stream=False) + url_results = await crawler.arun_many(urls=[url], config=batch_config) + for result in url_results: + result.metadata = result.metadata or {} + result.metadata["depth"] = depth + result.metadata["parent_url"] = parent + if self.url_scorer: + result.metadata["score"] = self.url_scorer.score(url) + results.append(result) + + new_links: List[Tuple[str, Optional[str]]] = [] + await self.link_discovery(result, url, depth, visited, new_links, depths) + # Push new links in reverse order so the first discovered is processed next. + for new_url, new_parent in reversed(new_links): + new_depth = depths.get(new_url, depth + 1) + stack.append((new_url, new_parent, new_depth)) + return results + + async def _arun_stream( + self, + start_url: str, + crawler: "AsyncWebCrawler", + config: "CrawlerRunConfig", + ) -> AsyncGenerator[CrawlResult, None]: + """ + Streaming DFS mode. + Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available. + """ + visited: Set[str] = set() + stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] + depths: Dict[str, int] = {start_url: 0} + + while stack and not self._cancel_event.is_set(): + url, parent, depth = stack.pop() + if url in visited or depth > self.max_depth: + continue + visited.add(url) + + stream_config = config.clone(deep_crawl_strategy=None, stream=True) + stream_gen = await crawler.arun_many(urls=[url], config=stream_config) + async for result in stream_gen: + result.metadata = result.metadata or {} + result.metadata["depth"] = depth + result.metadata["parent_url"] = parent + if self.url_scorer: + result.metadata["score"] = self.url_scorer.score(url) + yield result + + new_links: List[Tuple[str, Optional[str]]] = [] + await self.link_discovery(result, url, depth, visited, new_links, depths) + for new_url, new_parent in reversed(new_links): + new_depth = depths.get(new_url, depth + 1) + stack.append((new_url, new_parent, new_depth)) diff --git a/crawl4ai/deep_crawling/filters.py b/crawl4ai/deep_crawling/filters.py index 8a78990c..f74b1c61 100644 --- a/crawl4ai/deep_crawling/filters.py +++ b/crawl4ai/deep_crawling/filters.py @@ -1,16 +1,18 @@ from abc import ABC, abstractmethod -from typing import List, Pattern, Set, Union, FrozenSet -import re, time +from typing import List, Pattern, Set, Union from urllib.parse import urlparse from array import array +import re import logging from functools import lru_cache import fnmatch from dataclasses import dataclass -from typing import ClassVar import weakref import mimetypes - +import math +from collections import defaultdict +from typing import Dict +from ..utils import HeadPeekr @dataclass class FilterStats: @@ -624,6 +626,163 @@ class FastDomainFilter(FastURLFilter): return result + +class ContentRelevanceFilter(URLFilter): + """BM25-based relevance filter using head section content""" + + __slots__ = ('query_terms', 'threshold', 'k1', 'b', 'avgdl') + + def __init__(self, query: str, threshold: float, + k1: float = 1.2, b: float = 0.75, avgdl: int = 1000): + super().__init__(name="BM25RelevanceFilter") + self.query_terms = self._tokenize(query) + self.threshold = threshold + self.k1 = k1 # TF saturation parameter + self.b = b # Length normalization parameter + self.avgdl = avgdl # Average document length (empirical value) + + async def apply(self, url: str) -> bool: + head_content = await HeadPeekr.peek_html(url) + if not head_content: + self._update_stats(False) + return False + + # Field extraction with weighting + fields = { + 'title': HeadPeekr.get_title(head_content) or "", + 'meta': HeadPeekr.extract_meta_tags(head_content) + } + doc_text = self._build_document(fields) + + score = self._bm25(doc_text) + decision = score >= self.threshold + self._update_stats(decision) + return decision + + def _build_document(self, fields: Dict) -> str: + """Weighted document construction""" + return ' '.join([ + fields['title'] * 3, # Title weight + fields['meta'].get('description', '') * 2, + fields['meta'].get('keywords', ''), + ' '.join(fields['meta'].values()) + ]) + + def _tokenize(self, text: str) -> List[str]: + """Fast case-insensitive tokenization""" + return text.lower().split() + + def _bm25(self, document: str) -> float: + """Optimized BM25 implementation for head sections""" + doc_terms = self._tokenize(document) + doc_len = len(doc_terms) + tf = defaultdict(int) + + for term in doc_terms: + tf[term] += 1 + + score = 0.0 + for term in set(self.query_terms): + term_freq = tf[term] + idf = math.log((1 + 1) / (term_freq + 0.5) + 1) # Simplified IDF + numerator = term_freq * (self.k1 + 1) + denominator = term_freq + self.k1 * (1 - self.b + + self.b * (doc_len / self.avgdl)) + score += idf * (numerator / denominator) + + return score + + +class SEOFilter(URLFilter): + """Quantitative SEO quality assessment filter using head section analysis""" + + __slots__ = ('threshold', '_weights', '_kw_patterns') + + # Based on SEMrush/Google ranking factors research + DEFAULT_WEIGHTS = { + 'title_length': 0.15, + 'title_kw': 0.18, + 'meta_description': 0.12, + 'canonical': 0.10, + 'robot_ok': 0.20, # Most critical factor + 'schema_org': 0.10, + 'url_quality': 0.15 + } + + def __init__(self, threshold: float = 0.65, + keywords: List[str] = None, + weights: Dict[str, float] = None): + super().__init__(name="SEOFilter") + self.threshold = threshold + self._weights = weights or self.DEFAULT_WEIGHTS + self._kw_patterns = re.compile( + r'\b({})\b'.format('|'.join(map(re.escape, keywords or []))), + re.I + ) if keywords else None + + async def apply(self, url: str) -> bool: + head_content = await HeadPeekr.peek_html(url) + if not head_content: + self._update_stats(False) + return False + + meta = HeadPeekr.extract_meta_tags(head_content) + title = HeadPeekr.get_title(head_content) or '' + parsed_url = urlparse(url) + + scores = { + 'title_length': self._score_title_length(title), + 'title_kw': self._score_keyword_presence(title), + 'meta_description': self._score_meta_description(meta.get('description', '')), + 'canonical': self._score_canonical(meta.get('canonical'), url), + 'robot_ok': 1.0 if 'noindex' not in meta.get('robots', '') else 0.0, + 'schema_org': self._score_schema_org(head_content), + 'url_quality': self._score_url_quality(parsed_url) + } + + total_score = sum(weight * scores[factor] + for factor, weight in self._weights.items()) + + decision = total_score >= self.threshold + self._update_stats(decision) + return decision + + def _score_title_length(self, title: str) -> float: + length = len(title) + if 50 <= length <= 60: return 1.0 + if 40 <= length < 50 or 60 < length <= 70: return 0.7 + return 0.3 # Poor length + + def _score_keyword_presence(self, text: str) -> float: + if not self._kw_patterns: return 0.0 + matches = len(self._kw_patterns.findall(text)) + return min(matches * 0.3, 1.0) # Max 3 matches + + def _score_meta_description(self, desc: str) -> float: + length = len(desc) + if 140 <= length <= 160: return 1.0 + return 0.5 if 120 <= length <= 200 else 0.2 + + def _score_canonical(self, canonical: str, original: str) -> float: + if not canonical: return 0.5 # Neutral score + return 1.0 if canonical == original else 0.2 + + def _score_schema_org(self, html: str) -> float: + # Detect any schema.org markup in head + return 1.0 if re.search(r']+type=["\']application/ld\+json', html) else 0.0 + + def _score_url_quality(self, parsed_url) -> float: + score = 1.0 + path = parsed_url.path.lower() + + # Penalty factors + if len(path) > 80: score *= 0.7 + if re.search(r'\d{4}', path): score *= 0.8 # Numbers in path + if parsed_url.query: score *= 0.6 # URL parameters + if '_' in path: score *= 0.9 # Underscores vs hyphens + + return score + def create_fast_filter_chain() -> FastFilterChain: """Create an optimized filter chain with filters ordered by rejection rate""" return FastFilterChain( @@ -647,8 +806,6 @@ def create_fast_filter_chain() -> FastFilterChain: def run_performance_test(): import time - import random - from itertools import cycle # Generate test URLs base_urls = [ @@ -863,6 +1020,20 @@ def test_pattern_filter(): else: print("\nāŒ Some accuracy tests failed!") +async def test_content_relevancy_filter(): + # Initialize with query and threshold (tune based on your corpus) + relevance_filter = ContentRelevanceFilter( + query="machine learning", + threshold=2.5 + ) + + # In your crawler loop + for url in ["https://example.com", "https://example.com/blog/post-123"]: + if await relevance_filter.apply(url): + print(f"āœ… Relevant: {url}") + else: + print(f"āŒ Not Relevant: {url}") + if __name__ == "__main__": run_performance_test() # test_pattern_filter() \ No newline at end of file diff --git a/crawl4ai/types.py b/crawl4ai/types.py new file mode 100644 index 00000000..7c2586a3 --- /dev/null +++ b/crawl4ai/types.py @@ -0,0 +1,14 @@ +from typing import TYPE_CHECKING, Union + +AsyncWebCrawler = Union['AsyncWebCrawlerType'] # Note the string literal +CrawlerRunConfig = Union['CrawlerRunConfigType'] +CrawlResult = Union['CrawlResultType'] +RunManyReturn = Union['RunManyReturnType'] + +if TYPE_CHECKING: + from . import ( + AsyncWebCrawler as AsyncWebCrawlerType, + CrawlerRunConfig as CrawlerRunConfigType, + CrawlResult as CrawlResultType, + RunManyReturn as RunManyReturnType, + ) \ No newline at end of file diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py index 41deffb0..5f3e05ee 100644 --- a/crawl4ai/utils.py +++ b/crawl4ai/utils.py @@ -1,22 +1,25 @@ -from ast import Call import time from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from bs4 import BeautifulSoup, Comment, element, Tag, NavigableString import json import html +import lxml import re import os import platform from .prompts import PROMPT_EXTRACT_BLOCKS from array import array -from .config import * +from .html2text import html2text, CustomHTML2Text +# from .config import * +from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, IMAGE_SCORE_THRESHOLD, DEFAULT_PROVIDER, PROVIDER_MODELS +import httpx +from socket import gaierror from pathlib import Path -from typing import Dict, Any, List, Tuple, Union, Optional, Callable +from typing import Dict, Any, List, Optional, Callable from urllib.parse import urljoin import requests from requests.exceptions import InvalidSchema -from typing import Dict, Any import xxhash from colorama import Fore, Style, init import textwrap @@ -24,20 +27,20 @@ import cProfile import pstats from functools import wraps import asyncio -from lxml import html, etree + import sqlite3 import hashlib -from urllib.parse import urljoin, urlparse + from urllib.robotparser import RobotFileParser import aiohttp -from pathlib import Path + from packaging import version from . import __version__ -from typing import Sequence, List -from array import array +from typing import Sequence + from itertools import chain from collections import deque -from typing import Callable, Generator, Iterable, List, Optional +from typing import Generator, Iterable def chunk_documents( documents: Iterable[str], @@ -194,7 +197,7 @@ class VersionManager: return None try: return version.parse(self.version_file.read_text().strip()) - except: + except Exception as _ex: return None def update_version(self): @@ -286,7 +289,7 @@ class RobotsParser: domain = parsed.netloc if not domain: return True - except: + except Exception as _ex: return True # Fast path - check cache first @@ -306,7 +309,7 @@ class RobotsParser: self._cache_rules(domain, rules) else: return True - except: + except Exception as _ex: # On any error (timeout, connection failed, etc), allow access return True @@ -1374,7 +1377,7 @@ def get_content_of_website_optimized( src = img.get("src", "") if base64_pattern.match(src): img["src"] = base64_pattern.sub("", src) - except: + except Exception as _ex: pass cleaned_html = str(body).replace("\n\n", "\n").replace(" ", " ") @@ -1412,7 +1415,7 @@ def extract_metadata_using_lxml(html, doc=None): if doc is None: try: - doc = lhtml.document_fromstring(html) + doc = lxml.html.document_fromstring(html) except Exception: return {} @@ -1728,10 +1731,10 @@ def extract_blocks_batch(batch_data, provider="groq/llama3-70b-8192", api_token= messages = [] - for url, html in batch_data: + for url, _html in batch_data: variable_values = { "URL": url, - "HTML": html, + "HTML": _html, } prompt_with_variables = PROMPT_EXTRACT_BLOCKS @@ -1913,7 +1916,7 @@ def fast_format_html(html_string): indent = 0 indent_str = " " # Two spaces for indentation formatted = [] - in_content = False + # in_content = False # Split by < and > to separate tags and content parts = html_string.replace(">", ">\n").replace("<", "\n<").split("\n") @@ -2466,17 +2469,74 @@ def truncate(value, threshold): def optimize_html(html_str, threshold=200): root = html.fromstring(html_str) - for element in root.iter(): + for _element in root.iter(): # Process attributes - for attr in list(element.attrib): - element.attrib[attr] = truncate(element.attrib[attr], threshold) + for attr in list(_element.attrib): + _element.attrib[attr] = truncate(_element.attrib[attr], threshold) # Process text content - if element.text and len(element.text) > threshold: - element.text = truncate(element.text, threshold) + if _element.text and len(_element.text) > threshold: + _element.text = truncate(_element.text, threshold) # Process tail text - if element.tail and len(element.tail) > threshold: - element.tail = truncate(element.tail, threshold) + if _element.tail and len(_element.tail) > threshold: + _element.tail = truncate(_element.tail, threshold) - return html.tostring(root, encoding='unicode', pretty_print=False) \ No newline at end of file + return html.tostring(root, encoding='unicode', pretty_print=False) + +class HeadPeekr: + @staticmethod + async def fetch_head_section(url, timeout=0.3): + headers = { + "User-Agent": "Mozilla/5.0 (compatible; CrawlBot/1.0)", + "Accept": "text/html", + "Connection": "close" # Force close after response + } + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(url, headers=headers, follow_redirects=True) + + # Handle redirects explicitly by using the final URL + if response.url != url: + url = str(response.url) + response = await client.get(url, headers=headers) + + content = b"" + async for chunk in response.aiter_bytes(): + content += chunk + if b"" in content: + break # Stop after detecting + return content.split(b"")[0] + b"" + except (httpx.HTTPError, gaierror) : + return None + + @staticmethod + async def peek_html(url, timeout=0.3): + head_section = await HeadPeekr.fetch_head_section(url, timeout=timeout) + if head_section: + return head_section.decode("utf-8", errors="ignore") + return None + + @staticmethod + def extract_meta_tags(head_content: str): + meta_tags = {} + + # Find all meta tags + meta_pattern = r']+>' + for meta_tag in re.finditer(meta_pattern, head_content): + tag = meta_tag.group(0) + + # Extract name/property and content + name_match = re.search(r'name=["\'](.*?)["\']', tag) + property_match = re.search(r'property=["\'](.*?)["\']', tag) + content_match = re.search(r'content=["\'](.*?)["\']', tag) + + if content_match and (name_match or property_match): + key = name_match.group(1) if name_match else property_match.group(1) + meta_tags[key] = content_match.group(1) + + return meta_tags + + def get_title(head_content: str): + title_match = re.search(r'(.*?)', head_content, re.IGNORECASE | re.DOTALL) + return title_match.group(1) if title_match else None diff --git a/tests/20241401/test_deep_crawl.py b/tests/20241401/test_deep_crawl.py index 1facbb7c..2f533cc5 100644 --- a/tests/20241401/test_deep_crawl.py +++ b/tests/20241401/test_deep_crawl.py @@ -2,8 +2,10 @@ import asyncio import time -from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, CacheMode -from crawl4ai.deep_crawling.bfs_strategy import BFSDeepCrawlStrategy +from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, CacheMode +from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy +from crawl4ai.deep_crawling import BFSDeepCrawlStrategy +# from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy async def main(): @@ -15,7 +17,8 @@ async def main(): ), stream=False, verbose=True, - cache_mode=CacheMode.BYPASS + cache_mode=CacheMode.BYPASS, + scraping_strategy=LXMLWebScrapingStrategy() ) async with AsyncWebCrawler() as crawler: