Refactor: Removed all scheduling logic from scraper. From now scraper expects arun_many to handle all scheduling. Scraper will only do traversal, validations, compliance checks, URL filtering and scoring etc. Reformatted some of the scraper files with Black code formatter

This commit is contained in:
Aravind Karnam
2025-01-21 17:49:51 +05:30
parent 26d78d8512
commit 67fa06c09b
9 changed files with 316 additions and 243 deletions

View File

@@ -84,3 +84,4 @@ SHOW_DEPRECATION_WARNINGS = True
SCREENSHOT_HEIGHT_TRESHOLD = 10000 SCREENSHOT_HEIGHT_TRESHOLD = 10000
PAGE_TIMEOUT = 60000 PAGE_TIMEOUT = 60000
DOWNLOAD_PAGE_TIMEOUT = 60000 DOWNLOAD_PAGE_TIMEOUT = 60000
SCRAPER_BATCH_SIZE = 5

View File

@@ -1,5 +1,16 @@
from .async_web_scraper import AsyncWebScraper from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy from .bfs_scraper_strategy import BFSScraperStrategy
from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter, DomainFilter from .filters import (
from .scorers import KeywordRelevanceScorer, PathDepthScorer, FreshnessScorer, CompositeScorer URLFilter,
from .scraper_strategy import ScraperStrategy FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
)
from .scorers import (
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from .scraper_strategy import ScraperStrategy

View File

@@ -6,34 +6,37 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@dataclass @dataclass
class ScrapingProgress: class ScrapingProgress:
"""Tracks the progress of a scraping operation.""" """Tracks the progress of a scraping operation."""
processed_urls: int = 0 processed_urls: int = 0
failed_urls: int = 0 failed_urls: int = 0
current_url: Optional[str] = None current_url: Optional[str] = None
class AsyncWebScraper: class AsyncWebScraper:
""" """
A high-level web scraper that combines an async crawler with a scraping strategy. A high-level web scraper that combines an async crawler with a scraping strategy.
Args: Args:
crawler (AsyncWebCrawler): The async web crawler implementation crawler (AsyncWebCrawler): The async web crawler implementation
strategy (ScraperStrategy): The scraping strategy to use strategy (ScraperStrategy): The scraping strategy to use
logger (Optional[logging.Logger]): Custom logger for the scraper logger (Optional[logging.Logger]): Custom logger for the scraper
""" """
def __init__( def __init__(
self, self,
crawler: AsyncWebCrawler, crawler: AsyncWebCrawler,
strategy: ScraperStrategy, strategy: ScraperStrategy,
logger: Optional[logging.Logger] = None logger: Optional[logging.Logger] = None,
): ):
if not isinstance(crawler, AsyncWebCrawler): if not isinstance(crawler, AsyncWebCrawler):
raise TypeError("crawler must be an instance of AsyncWebCrawler") raise TypeError("crawler must be an instance of AsyncWebCrawler")
if not isinstance(strategy, ScraperStrategy): if not isinstance(strategy, ScraperStrategy):
raise TypeError("strategy must be an instance of ScraperStrategy") raise TypeError("strategy must be an instance of ScraperStrategy")
self.crawler = crawler self.crawler = crawler
self.strategy = strategy self.strategy = strategy
self.logger = logger or logging.getLogger(__name__) self.logger = logger or logging.getLogger(__name__)
@@ -55,30 +58,28 @@ class AsyncWebScraper:
raise raise
async def ascrape( async def ascrape(
self, self, url: str, stream: bool = False
url: str,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
""" """
Scrape a website starting from the given URL. Scrape a website starting from the given URL.
Args: Args:
url: Starting URL for scraping url: Starting URL for scraping
stream: If True, yield results as they come; if False, collect all results stream: If True, yield results as they come; if False, collect all results
Returns: Returns:
Either an async generator yielding CrawlResults or a final ScraperResult Either an async generator yielding CrawlResults or a final ScraperResult
""" """
self._progress = ScrapingProgress() # Reset progress self._progress = ScrapingProgress() # Reset progress
async with self._error_handling_context(url): async with self._error_handling_context(url):
if stream: if stream:
return self._ascrape_yielding(url) return self._ascrape_yielding(url)
return await self._ascrape_collecting(url) return await self._ascrape_collecting(url)
async def _ascrape_yielding( async def _ascrape_yielding(
self, self,
url: str, url: str,
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
"""Stream scraping results as they become available.""" """Stream scraping results as they become available."""
try: try:
@@ -92,28 +93,28 @@ class AsyncWebScraper:
raise raise
async def _ascrape_collecting( async def _ascrape_collecting(
self, self,
url: str, url: str,
) -> ScraperResult: ) -> ScraperResult:
"""Collect all scraping results before returning.""" """Collect all scraping results before returning."""
extracted_data = {} extracted_data = {}
try: try:
result_generator = self.strategy.ascrape(url, self.crawler) result_generator = self.strategy.ascrape(url, self.crawler)
async for res in result_generator: async for res in result_generator:
self._progress.processed_urls += 1 self._progress.processed_urls += 1
self._progress.current_url = res.url self._progress.current_url = res.url
extracted_data[res.url] = res extracted_data[res.url] = res
return ScraperResult( return ScraperResult(
url=url, url=url,
crawled_urls=list(extracted_data.keys()), crawled_urls=list(extracted_data.keys()),
extracted_data=extracted_data, extracted_data=extracted_data,
stats={ stats={
'processed_urls': self._progress.processed_urls, "processed_urls": self._progress.processed_urls,
'failed_urls': self._progress.failed_urls "failed_urls": self._progress.failed_urls,
} },
) )
except Exception as e: except Exception as e:
self.logger.error(f"Error in collecting scrape: {str(e)}") self.logger.error(f"Error in collecting scrape: {str(e)}")
raise raise

View File

@@ -7,16 +7,19 @@ from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser from urllib.robotparser import RobotFileParser
import validators import validators
from crawl4ai.async_configs import CrawlerRunConfig from ..async_configs import CrawlerRunConfig
from .models import CrawlResult from .models import CrawlResult
from .filters import FilterChain from .filters import FilterChain
from .scorers import URLScorer from .scorers import URLScorer
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
from ..config import SCRAPER_BATCH_SIZE
@dataclass @dataclass
class CrawlStats: class CrawlStats:
"""Statistics for the crawling process""" """Statistics for the crawling process"""
start_time: datetime start_time: datetime
urls_processed: int = 0 urls_processed: int = 0
urls_failed: int = 0 urls_failed: int = 0
@@ -25,6 +28,7 @@ class CrawlStats:
current_depth: int = 0 current_depth: int = 0
robots_blocked: int = 0 robots_blocked: int = 0
class BFSScraperStrategy(ScraperStrategy): class BFSScraperStrategy(ScraperStrategy):
"""Breadth-First Search scraping strategy with politeness controls""" """Breadth-First Search scraping strategy with politeness controls"""
@@ -34,13 +38,13 @@ class BFSScraperStrategy(ScraperStrategy):
filter_chain: FilterChain, filter_chain: FilterChain,
url_scorer: URLScorer, url_scorer: URLScorer,
process_external_links: bool = False, process_external_links: bool = False,
logger: Optional[logging.Logger] = None logger: Optional[logging.Logger] = None,
): ):
self.max_depth = max_depth self.max_depth = max_depth
self.filter_chain = filter_chain self.filter_chain = filter_chain
self.url_scorer = url_scorer self.url_scorer = url_scorer
self.logger = logger or logging.getLogger(__name__) self.logger = logger or logging.getLogger(__name__)
# Crawl control # Crawl control
self.stats = CrawlStats(start_time=datetime.now()) self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event() self._cancel_event = asyncio.Event()
@@ -74,11 +78,11 @@ class BFSScraperStrategy(ScraperStrategy):
async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]: async def _get_robot_parser(self, url: str) -> Optional[RobotFileParser]:
"""Get or create robots.txt parser for domain. """Get or create robots.txt parser for domain.
This is our robots.txt manager that: This is our robots.txt manager that:
- Uses domain-level caching of robot parsers - Uses domain-level caching of robot parsers
- Creates and caches new parsers as needed - Creates and caches new parsers as needed
- Handles failed robots.txt fetches gracefully - Handles failed robots.txt fetches gracefully
- Returns None if robots.txt can't be fetched, allowing crawling to proceed - Returns None if robots.txt can't be fetched, allowing crawling to proceed
""" """
domain = urlparse(url).netloc domain = urlparse(url).netloc
if domain not in self.robot_parsers: if domain not in self.robot_parsers:
@@ -100,7 +104,7 @@ class BFSScraperStrategy(ScraperStrategy):
depth: int, depth: int,
queue: asyncio.PriorityQueue, queue: asyncio.PriorityQueue,
visited: Set[str], visited: Set[str],
depths: Dict[str, int] depths: Dict[str, int],
): ):
"""Process extracted links from crawl result. """Process extracted links from crawl result.
This is our link processor that: This is our link processor that:
@@ -116,7 +120,7 @@ class BFSScraperStrategy(ScraperStrategy):
if self.process_external_links: if self.process_external_links:
links_to_process += result.links["external"] links_to_process += result.links["external"]
for link in links_to_process: for link in links_to_process:
url = link['href'] url = link["href"]
if not await self.can_process_url(url, depth): if not await self.can_process_url(url, depth):
self.stats.urls_skipped += 1 self.stats.urls_skipped += 1
continue continue
@@ -132,8 +136,7 @@ class BFSScraperStrategy(ScraperStrategy):
await queue.put((score, new_depth, url)) await queue.put((score, new_depth, url))
depths[url] = new_depth depths[url] = new_depth
self.stats.total_depth_reached = max( self.stats.total_depth_reached = max(
self.stats.total_depth_reached, self.stats.total_depth_reached, new_depth
new_depth
) )
async def ascrape( async def ascrape(
@@ -142,7 +145,7 @@ class BFSScraperStrategy(ScraperStrategy):
crawler: AsyncWebCrawler, crawler: AsyncWebCrawler,
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy""" """Implement BFS crawling strategy"""
# Initialize crawl state # Initialize crawl state
""" """
queue: A priority queue where items are tuples of (score, depth, url) queue: A priority queue where items are tuples of (score, depth, url)
@@ -151,57 +154,76 @@ class BFSScraperStrategy(ScraperStrategy):
URL: The actual URL to crawl URL: The actual URL to crawl
visited: Keeps track of URLs we've already seen to avoid cycles visited: Keeps track of URLs we've already seen to avoid cycles
depths: Maps URLs to their depths from the start URL depths: Maps URLs to their depths from the start URL
pending_tasks: Tracks currently running crawl tasks active_crawls: Tracks currently running crawl tasks
""" """
queue = asyncio.PriorityQueue() queue = asyncio.PriorityQueue()
await queue.put((0, 0, start_url)) await queue.put((0, 0, start_url))
visited: Set[str] = set() visited: Set[str] = set()
depths = {start_url: 0} depths = {start_url: 0}
active_crawls = set() # Track URLs currently being processed
try: try:
while not queue.empty() and not self._cancel_event.is_set(): while (
not queue.empty() or active_crawls
) and not self._cancel_event.is_set():
""" """
This sets up our main control loop which: This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty()) - Continues while there are URLs to process (not queue.empty())
- Or while there are tasks still running (pending_tasks) - Or while there are active crawls still running (arun_many)
- Can be interrupted via cancellation (not self._cancel_event.is_set()) - Can be interrupted via cancellation (not self._cancel_event.is_set())
""" """
n = 3 # Collect batch of jobs to process
jobs = [] jobs = []
for _ in range(n): # Fill batch with available jobs
if self.queue.empty(): while len(jobs) < SCRAPER_BATCH_SIZE and not queue.empty():
break score, depth, url = await queue.get()
jobs.append(await self.queue.get()) if url not in active_crawls: # Only add if not currently processing
jobs.append((score, depth, url))
# Filter jobs directly, ensuring uniqueness and checking against visited active_crawls.add(url)
filtered_jobs = [] self.stats.current_depth = depth
for job in jobs:
_, depth, url = job if not jobs:
self.stats.current_depth = depth # If no jobs but active crawls exist, wait a bit and continue
if url not in visited: if active_crawls:
visited.add(url) await asyncio.sleep(0.1)
filtered_jobs.append(job) continue
crawler_config = CrawlerRunConfig(cache_mode="BYPASS") # Process batch
async for result in await crawler.arun_many(urls=[url for _, _, url in filtered_jobs], crawler_config = CrawlerRunConfig(cache_mode="BYPASS", stream=True)
config=crawler_config.clone(stream=True)): try:
print(f"Received result for: {result.url} - Success: {result.success}") async for result in await crawler.arun_many(
source_url, depth = next((url, depth) for _, depth, url in filtered_jobs if url == result.source_url) urls=[url for _, _, url in jobs], config=crawler_config
await self._process_links(result, source_url, depth, queue, visited, depths) ):
yield result source_url, depth = next(
(url, depth) for _, depth, url in jobs if url == result.url
)
active_crawls.remove(source_url) # Remove from active set
if result.success:
await self._process_links(
result, source_url, depth, queue, visited, depths
)
yield result
else:
self.logger.warning(
f"Failed to crawl {result.url}: {result.error_message}"
)
except Exception as e:
# Remove failed URLs from active set
for _, _, url in jobs:
active_crawls.discard(url)
self.logger.error(f"Batch processing error: {e}")
# Continue processing other batches
continue
except Exception as e: except Exception as e:
self.logger.error(f"Error in crawl process: {e}") self.logger.error(f"Error in crawl process: {e}")
raise raise
finally: finally:
# Clean up any remaining tasks
# for task in pending_tasks:
# task.cancel()
self.stats.end_time = datetime.now() self.stats.end_time = datetime.now()
async def shutdown(self): async def shutdown(self):
"""Clean up resources and stop crawling""" """Clean up resources and stop crawling"""
self._cancel_event.set() self._cancel_event.set()
# Clear caches and close connections # Clear caches and close connections
self.robot_parsers.clear() self.robot_parsers.clear()

View File

@@ -11,16 +11,19 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
import fnmatch import fnmatch
@dataclass @dataclass
class FilterStats: class FilterStats:
"""Statistics for filter applications""" """Statistics for filter applications"""
total_urls: int = 0 total_urls: int = 0
rejected_urls: int = 0 rejected_urls: int = 0
passed_urls: int = 0 passed_urls: int = 0
class URLFilter(ABC): class URLFilter(ABC):
"""Base class for URL filters""" """Base class for URL filters"""
def __init__(self, name: str = None): def __init__(self, name: str = None):
self.name = name or self.__class__.__name__ self.name = name or self.__class__.__name__
self.stats = FilterStats() self.stats = FilterStats()
@@ -39,15 +42,16 @@ class URLFilter(ABC):
else: else:
self.stats.rejected_urls += 1 self.stats.rejected_urls += 1
class FilterChain: class FilterChain:
"""Chain of URL filters.""" """Chain of URL filters."""
def __init__(self, filters: List[URLFilter] = None): def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or [] self.filters = filters or []
self.stats = FilterStats() self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain") self.logger = logging.getLogger("urlfilter.chain")
def add_filter(self, filter_: URLFilter) -> 'FilterChain': def add_filter(self, filter_: URLFilter) -> "FilterChain":
"""Add a filter to the chain""" """Add a filter to the chain"""
self.filters.append(filter_) self.filters.append(filter_)
return self # Enable method chaining return self # Enable method chaining
@@ -55,19 +59,20 @@ class FilterChain:
def apply(self, url: str) -> bool: def apply(self, url: str) -> bool:
"""Apply all filters in the chain""" """Apply all filters in the chain"""
self.stats.total_urls += 1 self.stats.total_urls += 1
for filter_ in self.filters: for filter_ in self.filters:
if not filter_.apply(url): if not filter_.apply(url):
self.stats.rejected_urls += 1 self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}") self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False return False
self.stats.passed_urls += 1 self.stats.passed_urls += 1
return True return True
class URLPatternFilter(URLFilter): class URLPatternFilter(URLFilter):
"""Filter URLs based on glob patterns or regex. """Filter URLs based on glob patterns or regex.
pattern_filter = URLPatternFilter([ pattern_filter = URLPatternFilter([
"*.example.com/*", # Glob pattern "*.example.com/*", # Glob pattern
"*/article/*", # Path pattern "*/article/*", # Path pattern
@@ -76,21 +81,26 @@ class URLPatternFilter(URLFilter):
- Supports glob patterns and regex - Supports glob patterns and regex
- Multiple patterns per filter - Multiple patterns per filter
- Pattern pre-compilation for performance - Pattern pre-compilation for performance
""" """
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]], def __init__(
use_glob: bool = True): self,
patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True,
):
super().__init__() super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob self.use_glob = use_glob
self._compiled_patterns = [] self._compiled_patterns = []
for pattern in self.patterns: for pattern in self.patterns:
if isinstance(pattern, str) and use_glob: if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern)) self._compiled_patterns.append(self._glob_to_regex(pattern))
else: else:
self._compiled_patterns.append(re.compile(pattern) if isinstance(pattern, str) else pattern) self._compiled_patterns.append(
re.compile(pattern) if isinstance(pattern, str) else pattern
)
def _glob_to_regex(self, pattern: str) -> Pattern: def _glob_to_regex(self, pattern: str) -> Pattern:
"""Convert glob pattern to regex""" """Convert glob pattern to regex"""
@@ -102,9 +112,10 @@ class URLPatternFilter(URLFilter):
self._update_stats(matches) self._update_stats(matches)
return matches return matches
class ContentTypeFilter(URLFilter): class ContentTypeFilter(URLFilter):
"""Filter URLs based on expected content type. """Filter URLs based on expected content type.
content_filter = ContentTypeFilter([ content_filter = ContentTypeFilter([
"text/html", "text/html",
"application/pdf" "application/pdf"
@@ -114,11 +125,14 @@ class ContentTypeFilter(URLFilter):
- Extension checking - Extension checking
- Support for multiple content types - Support for multiple content types
""" """
def __init__(self, allowed_types: Union[str, List[str]], def __init__(
check_extension: bool = True): self, allowed_types: Union[str, List[str]], check_extension: bool = True
):
super().__init__() super().__init__()
self.allowed_types = [allowed_types] if isinstance(allowed_types, str) else allowed_types self.allowed_types = (
[allowed_types] if isinstance(allowed_types, str) else allowed_types
)
self.check_extension = check_extension self.check_extension = check_extension
self._normalize_types() self._normalize_types()
@@ -128,12 +142,18 @@ class ContentTypeFilter(URLFilter):
def _check_extension(self, url: str) -> bool: def _check_extension(self, url: str) -> bool:
"""Check URL's file extension""" """Check URL's file extension"""
ext = urlparse(url).path.split('.')[-1].lower() if '.' in urlparse(url).path else '' ext = (
urlparse(url).path.split(".")[-1].lower()
if "." in urlparse(url).path
else ""
)
if not ext: if not ext:
return True # No extension, might be dynamic content return True # No extension, might be dynamic content
guessed_type = mimetypes.guess_type(url)[0] guessed_type = mimetypes.guess_type(url)[0]
return any(allowed in (guessed_type or '').lower() for allowed in self.allowed_types) return any(
allowed in (guessed_type or "").lower() for allowed in self.allowed_types
)
def apply(self, url: str) -> bool: def apply(self, url: str) -> bool:
"""Check if URL's content type is allowed""" """Check if URL's content type is allowed"""
@@ -143,9 +163,10 @@ class ContentTypeFilter(URLFilter):
self._update_stats(result) self._update_stats(result)
return result return result
class DomainFilter(URLFilter): class DomainFilter(URLFilter):
"""Filter URLs based on allowed/blocked domains. """Filter URLs based on allowed/blocked domains.
domain_filter = DomainFilter( domain_filter = DomainFilter(
allowed_domains=["example.com", "blog.example.com"], allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com"] blocked_domains=["ads.example.com"]
@@ -155,12 +176,19 @@ class DomainFilter(URLFilter):
- Subdomain support - Subdomain support
- Efficient domain matching - Efficient domain matching
""" """
def __init__(self, allowed_domains: Union[str, List[str]] = None, def __init__(
blocked_domains: Union[str, List[str]] = None): self,
allowed_domains: Union[str, List[str]] = None,
blocked_domains: Union[str, List[str]] = None,
):
super().__init__() super().__init__()
self.allowed_domains = set(self._normalize_domains(allowed_domains)) if allowed_domains else None self.allowed_domains = (
self.blocked_domains = set(self._normalize_domains(blocked_domains)) if blocked_domains else set() set(self._normalize_domains(allowed_domains)) if allowed_domains else None
)
self.blocked_domains = (
set(self._normalize_domains(blocked_domains)) if blocked_domains else set()
)
def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]: def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]:
"""Normalize domain strings""" """Normalize domain strings"""
@@ -175,31 +203,33 @@ class DomainFilter(URLFilter):
def apply(self, url: str) -> bool: def apply(self, url: str) -> bool:
"""Check if URL's domain is allowed""" """Check if URL's domain is allowed"""
domain = self._extract_domain(url) domain = self._extract_domain(url)
if domain in self.blocked_domains: if domain in self.blocked_domains:
self._update_stats(False) self._update_stats(False)
return False return False
if self.allowed_domains is not None and domain not in self.allowed_domains: if self.allowed_domains is not None and domain not in self.allowed_domains:
self._update_stats(False) self._update_stats(False)
return False return False
self._update_stats(True) self._update_stats(True)
return True return True
# Example usage: # Example usage:
def create_common_filter_chain() -> FilterChain: def create_common_filter_chain() -> FilterChain:
"""Create a commonly used filter chain""" """Create a commonly used filter chain"""
return FilterChain([ return FilterChain(
URLPatternFilter([ [
"*.html", "*.htm", # HTML files URLPatternFilter(
"*/article/*", "*/blog/*" # Common content paths [
]), "*.html",
ContentTypeFilter([ "*.htm", # HTML files
"text/html", "*/article/*",
"application/xhtml+xml" "*/blog/*", # Common content paths
]), ]
DomainFilter( ),
blocked_domains=["ads.*", "analytics.*"] ContentTypeFilter(["text/html", "application/xhtml+xml"]),
) DomainFilter(blocked_domains=["ads.*", "analytics.*"]),
]) ]
)

View File

@@ -2,7 +2,8 @@ from pydantic import BaseModel
from typing import List, Dict from typing import List, Dict
from ..models import CrawlResult from ..models import CrawlResult
class ScraperResult(BaseModel): class ScraperResult(BaseModel):
url: str url: str
crawled_urls: List[str] crawled_urls: List[str]
extracted_data: Dict[str,CrawlResult] extracted_data: Dict[str, CrawlResult]

View File

@@ -10,29 +10,32 @@ from collections import defaultdict
import math import math
import logging import logging
@dataclass @dataclass
class ScoringStats: class ScoringStats:
"""Statistics for URL scoring""" """Statistics for URL scoring"""
urls_scored: int = 0 urls_scored: int = 0
total_score: float = 0.0 total_score: float = 0.0
min_score: float = float('inf') min_score: float = float("inf")
max_score: float = float('-inf') max_score: float = float("-inf")
def update(self, score: float): def update(self, score: float):
"""Update scoring statistics""" """Update scoring statistics"""
self.urls_scored += 1 self.urls_scored += 1
self.total_score += score self.total_score += score
self.min_score = min(self.min_score, score) self.min_score = min(self.min_score, score)
self.max_score = max(self.max_score, score) self.max_score = max(self.max_score, score)
@property @property
def average_score(self) -> float: def average_score(self) -> float:
"""Calculate average score""" """Calculate average score"""
return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0 return self.total_score / self.urls_scored if self.urls_scored > 0 else 0.0
class URLScorer(ABC): class URLScorer(ABC):
"""Base class for URL scoring strategies""" """Base class for URL scoring strategies"""
def __init__(self, weight: float = 1.0, name: str = None): def __init__(self, weight: float = 1.0, name: str = None):
self.weight = weight self.weight = weight
self.name = name or self.__class__.__name__ self.name = name or self.__class__.__name__
@@ -51,9 +54,10 @@ class URLScorer(ABC):
self.stats.update(weighted_score) self.stats.update(weighted_score)
return weighted_score return weighted_score
class CompositeScorer(URLScorer): class CompositeScorer(URLScorer):
"""Combines multiple scorers with weights""" """Combines multiple scorers with weights"""
def __init__(self, scorers: List[URLScorer], normalize: bool = True): def __init__(self, scorers: List[URLScorer], normalize: bool = True):
super().__init__(name="CompositeScorer") super().__init__(name="CompositeScorer")
self.scorers = scorers self.scorers = scorers
@@ -62,12 +66,13 @@ class CompositeScorer(URLScorer):
def _calculate_score(self, url: str) -> float: def _calculate_score(self, url: str) -> float:
scores = [scorer.score(url) for scorer in self.scorers] scores = [scorer.score(url) for scorer in self.scorers]
total_score = sum(scores) total_score = sum(scores)
if self.normalize and scores: if self.normalize and scores:
total_score /= len(scores) total_score /= len(scores)
return total_score return total_score
class KeywordRelevanceScorer(URLScorer): class KeywordRelevanceScorer(URLScorer):
"""Score URLs based on keyword relevance. """Score URLs based on keyword relevance.
@@ -81,9 +86,10 @@ class KeywordRelevanceScorer(URLScorer):
- Case sensitivity options - Case sensitivity options
- Weighted scoring - Weighted scoring
""" """
def __init__(self, keywords: List[str], weight: float = 1.0, def __init__(
case_sensitive: bool = False): self, keywords: List[str], weight: float = 1.0, case_sensitive: bool = False
):
super().__init__(weight=weight) super().__init__(weight=weight)
self.keywords = keywords self.keywords = keywords
self.case_sensitive = case_sensitive self.case_sensitive = case_sensitive
@@ -98,15 +104,15 @@ class KeywordRelevanceScorer(URLScorer):
"""Calculate score based on keyword matches""" """Calculate score based on keyword matches"""
decoded_url = unquote(url) decoded_url = unquote(url)
total_matches = sum( total_matches = sum(
1 for pattern in self.patterns 1 for pattern in self.patterns if pattern.search(decoded_url)
if pattern.search(decoded_url)
) )
# Normalize score between 0 and 1 # Normalize score between 0 and 1
return total_matches / len(self.patterns) if self.patterns else 0.0 return total_matches / len(self.patterns) if self.patterns else 0.0
class PathDepthScorer(URLScorer): class PathDepthScorer(URLScorer):
"""Score URLs based on their path depth. """Score URLs based on their path depth.
path_scorer = PathDepthScorer( path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth optimal_depth=3, # Preferred URL depth
weight=0.7 weight=0.7
@@ -116,7 +122,7 @@ class PathDepthScorer(URLScorer):
- Configurable optimal depth - Configurable optimal depth
- Diminishing returns for deeper paths - Diminishing returns for deeper paths
""" """
def __init__(self, optimal_depth: int = 3, weight: float = 1.0): def __init__(self, optimal_depth: int = 3, weight: float = 1.0):
super().__init__(weight=weight) super().__init__(weight=weight)
self.optimal_depth = optimal_depth self.optimal_depth = optimal_depth
@@ -124,15 +130,16 @@ class PathDepthScorer(URLScorer):
def _calculate_score(self, url: str) -> float: def _calculate_score(self, url: str) -> float:
"""Calculate score based on path depth""" """Calculate score based on path depth"""
path = urlparse(url).path path = urlparse(url).path
depth = len([x for x in path.split('/') if x]) depth = len([x for x in path.split("/") if x])
# Score decreases as we move away from optimal depth # Score decreases as we move away from optimal depth
distance_from_optimal = abs(depth - self.optimal_depth) distance_from_optimal = abs(depth - self.optimal_depth)
return 1.0 / (1.0 + distance_from_optimal) return 1.0 / (1.0 + distance_from_optimal)
class ContentTypeScorer(URLScorer): class ContentTypeScorer(URLScorer):
"""Score URLs based on content type preferences. """Score URLs based on content type preferences.
content_scorer = ContentTypeScorer({ content_scorer = ContentTypeScorer({
r'\.html$': 1.0, r'\.html$': 1.0,
r'\.pdf$': 0.8, r'\.pdf$': 0.8,
@@ -143,7 +150,7 @@ class ContentTypeScorer(URLScorer):
- Configurable type weights - Configurable type weights
- Pattern matching support - Pattern matching support
""" """
def __init__(self, type_weights: Dict[str, float], weight: float = 1.0): def __init__(self, type_weights: Dict[str, float], weight: float = 1.0):
super().__init__(weight=weight) super().__init__(weight=weight)
self.type_weights = type_weights self.type_weights = type_weights
@@ -152,8 +159,7 @@ class ContentTypeScorer(URLScorer):
def _compile_patterns(self): def _compile_patterns(self):
"""Prepare content type patterns""" """Prepare content type patterns"""
self.patterns = { self.patterns = {
re.compile(pattern): weight re.compile(pattern): weight for pattern, weight in self.type_weights.items()
for pattern, weight in self.type_weights.items()
} }
def _calculate_score(self, url: str) -> float: def _calculate_score(self, url: str) -> float:
@@ -163,21 +169,22 @@ class ContentTypeScorer(URLScorer):
return weight return weight
return 0.0 return 0.0
class FreshnessScorer(URLScorer): class FreshnessScorer(URLScorer):
"""Score URLs based on freshness indicators. """Score URLs based on freshness indicators.
freshness_scorer = FreshnessScorer(weight=0.9) freshness_scorer = FreshnessScorer(weight=0.9)
Score based on date indicators in URLs Score based on date indicators in URLs
Multiple date format support Multiple date format support
Recency weighting""" Recency weighting"""
def __init__(self, weight: float = 1.0): def __init__(self, weight: float = 1.0):
super().__init__(weight=weight) super().__init__(weight=weight)
self.date_patterns = [ self.date_patterns = [
r'/(\d{4})/(\d{2})/(\d{2})/', # yyyy/mm/dd r"/(\d{4})/(\d{2})/(\d{2})/", # yyyy/mm/dd
r'(\d{4})[-_](\d{2})[-_](\d{2})', # yyyy-mm-dd r"(\d{4})[-_](\d{2})[-_](\d{2})", # yyyy-mm-dd
r'/(\d{4})/', # year only r"/(\d{4})/", # year only
] ]
self._compile_patterns() self._compile_patterns()
@@ -194,6 +201,7 @@ class FreshnessScorer(URLScorer):
return 1.0 - (2024 - year) * 0.1 return 1.0 - (2024 - year) * 0.1
return 0.5 # Default score for URLs without dates return 0.5 # Default score for URLs without dates
class DomainAuthorityScorer(URLScorer): class DomainAuthorityScorer(URLScorer):
"""Score URLs based on domain authority. """Score URLs based on domain authority.
@@ -206,9 +214,13 @@ class DomainAuthorityScorer(URLScorer):
Score based on domain importance Score based on domain importance
Configurable domain weights Configurable domain weights
Default weight for unknown domains""" Default weight for unknown domains"""
def __init__(self, domain_weights: Dict[str, float], def __init__(
default_weight: float = 0.5, weight: float = 1.0): self,
domain_weights: Dict[str, float],
default_weight: float = 0.5,
weight: float = 1.0,
):
super().__init__(weight=weight) super().__init__(weight=weight)
self.domain_weights = domain_weights self.domain_weights = domain_weights
self.default_weight = default_weight self.default_weight = default_weight
@@ -218,29 +230,23 @@ class DomainAuthorityScorer(URLScorer):
domain = urlparse(url).netloc.lower() domain = urlparse(url).netloc.lower()
return self.domain_weights.get(domain, self.default_weight) return self.domain_weights.get(domain, self.default_weight)
def create_balanced_scorer() -> CompositeScorer: def create_balanced_scorer() -> CompositeScorer:
"""Create a balanced composite scorer""" """Create a balanced composite scorer"""
return CompositeScorer([ return CompositeScorer(
KeywordRelevanceScorer( [
keywords=["article", "blog", "news", "research"], KeywordRelevanceScorer(
weight=1.0 keywords=["article", "blog", "news", "research"], weight=1.0
), ),
PathDepthScorer( PathDepthScorer(optimal_depth=3, weight=0.7),
optimal_depth=3, ContentTypeScorer(
weight=0.7 type_weights={r"\.html?$": 1.0, r"\.pdf$": 0.8, r"\.xml$": 0.6},
), weight=0.8,
ContentTypeScorer( ),
type_weights={ FreshnessScorer(weight=0.9),
r'\.html?$': 1.0, ]
r'\.pdf$': 0.8, )
r'\.xml$': 0.6
},
weight=0.8
),
FreshnessScorer(
weight=0.9
)
])
# Example Usage: # Example Usage:
""" """
@@ -265,4 +271,4 @@ score = scorer.score("https://python.org/article/2024/01/new-features")
# Access statistics # Access statistics
print(f"Average score: {scorer.stats.average_score}") print(f"Average score: {scorer.stats.average_score}")
print(f"URLs scored: {scorer.stats.urls_scored}") print(f"URLs scored: {scorer.stats.urls_scored}")
""" """

View File

@@ -4,29 +4,28 @@ from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator from typing import Union, AsyncGenerator
class ScraperStrategy(ABC): class ScraperStrategy(ABC):
@abstractmethod @abstractmethod
async def ascrape( async def ascrape(
self, self,
url: str, url: str,
crawler: AsyncWebCrawler, crawler: AsyncWebCrawler,
parallel_processing: bool = True, stream: bool = False,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler. """Scrape the given URL using the specified crawler.
Args: Args:
url (str): The starting URL for the scrape. url (str): The starting URL for the scrape.
crawler (AsyncWebCrawler): The web crawler instance. crawler (AsyncWebCrawler): The web crawler instance.
parallel_processing (bool): Whether to use parallel processing. Defaults to True. stream (bool): If True, yields individual crawl results as they are ready;
stream (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult. if False, accumulates results and returns a final ScraperResult.
Yields: Yields:
CrawlResult: Individual crawl results if stream is True. CrawlResult: Individual crawl results if stream is True.
Returns: Returns:
ScraperResult: A summary of the scrape results containing the final extracted data ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if stream is False. and the list of crawled URLs if stream is False.
""" """
pass pass
@@ -39,4 +38,4 @@ class ScraperStrategy(ABC):
@abstractmethod @abstractmethod
async def shutdown(self): async def shutdown(self):
"""Clean up resources used by the strategy""" """Clean up resources used by the strategy"""
pass pass

View File

@@ -4,13 +4,14 @@ from crawl4ai.scraper import (
BFSScraperStrategy, BFSScraperStrategy,
FilterChain, FilterChain,
URLPatternFilter, URLPatternFilter,
ContentTypeFilter ContentTypeFilter,
) )
from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig
import re import re
browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600)
async def basic_scraper_example(): async def basic_scraper_example():
""" """
Basic example: Scrape a blog site for articles Basic example: Scrape a blog site for articles
@@ -19,37 +20,39 @@ async def basic_scraper_example():
- Collects all results at once - Collects all results at once
""" """
# Create a simple filter chain # Create a simple filter chain
filter_chain = FilterChain([ filter_chain = FilterChain(
# Only crawl pages within the blog section [
URLPatternFilter("*/tutorial/*"), # Only crawl pages within the blog section
# Only process HTML pages URLPatternFilter("*/tutorial/*"),
ContentTypeFilter(["text/html"]) # Only process HTML pages
]) ContentTypeFilter(["text/html"]),
]
)
# Initialize the strategy with basic configuration # Initialize the strategy with basic configuration
strategy = BFSScraperStrategy( strategy = BFSScraperStrategy(
max_depth=2, # Only go 2 levels deep max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain, filter_chain=filter_chain,
url_scorer=None, # Use default scoring url_scorer=None, # Use default scoring
max_concurrent=3, # Limit concurrent requests process_external_links=True,
process_external_links=True
) )
# Create the crawler and scraper # Create the crawler and scraper
async with AsyncWebCrawler(config=browser_config,verbose=True) as crawler: async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
scraper = AsyncWebScraper(crawler, strategy) scraper = AsyncWebScraper(crawler, strategy)
# Start scraping # Start scraping
try: try:
result = await scraper.ascrape("https://crawl4ai.com/mkdocs") result = await scraper.ascrape("https://crawl4ai.com/mkdocs")
# Process results # Process results
print(f"Crawled {len(result.crawled_urls)} pages:") print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items(): for url, data in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes") print(f"- {url}: {len(data.html)} bytes")
except Exception as e: except Exception as e:
print(f"Error during scraping: {e}") print(f"Error during scraping: {e}")
# advanced_scraper_example.py # advanced_scraper_example.py
import logging import logging
from crawl4ai.scraper import ( from crawl4ai.scraper import (
@@ -62,10 +65,11 @@ from crawl4ai.scraper import (
KeywordRelevanceScorer, KeywordRelevanceScorer,
PathDepthScorer, PathDepthScorer,
FreshnessScorer, FreshnessScorer,
CompositeScorer CompositeScorer,
) )
from crawl4ai.async_webcrawler import AsyncWebCrawler from crawl4ai.async_webcrawler import AsyncWebCrawler
async def advanced_scraper_example(): async def advanced_scraper_example():
""" """
Advanced example: Intelligent news site scraping Advanced example: Intelligent news site scraping
@@ -79,49 +83,44 @@ async def advanced_scraper_example():
logger = logging.getLogger("advanced_scraper") logger = logging.getLogger("advanced_scraper")
# Create sophisticated filter chain # Create sophisticated filter chain
filter_chain = FilterChain([ filter_chain = FilterChain(
# Domain control [
DomainFilter( # Domain control
allowed_domains=["techcrunch.com"], DomainFilter(
blocked_domains=["login.techcrunch.com","legal.yahoo.com"] allowed_domains=["techcrunch.com"],
), blocked_domains=["login.techcrunch.com", "legal.yahoo.com"],
# URL patterns ),
URLPatternFilter([ # URL patterns
"*/article/*", URLPatternFilter(
"*/news/*", [
"*/blog/*", "*/article/*",
re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs "*/news/*",
]), "*/blog/*",
# Content types re.compile(r"\d{4}/\d{2}/.*"), # Date-based URLs
ContentTypeFilter([ ]
"text/html", ),
"application/xhtml+xml" # Content types
]) ContentTypeFilter(["text/html", "application/xhtml+xml"]),
]) ]
)
# Create composite scorer # Create composite scorer
scorer = CompositeScorer([ scorer = CompositeScorer(
# Prioritize by keywords [
KeywordRelevanceScorer( # Prioritize by keywords
keywords=["news", "breaking", "update", "latest"], KeywordRelevanceScorer(
weight=1.0 keywords=["news", "breaking", "update", "latest"], weight=1.0
), ),
# Prefer optimal URL structure # Prefer optimal URL structure
PathDepthScorer( PathDepthScorer(optimal_depth=3, weight=0.7),
optimal_depth=3, # Prioritize fresh content
weight=0.7 FreshnessScorer(weight=0.9),
), ]
# Prioritize fresh content )
FreshnessScorer(weight=0.9)
])
# Initialize strategy with advanced configuration # Initialize strategy with advanced configuration
strategy = BFSScraperStrategy( strategy = BFSScraperStrategy(
max_depth=2, max_depth=2, filter_chain=filter_chain, url_scorer=scorer
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=2,
min_crawl_delay=1
) )
# Create crawler and scraper # Create crawler and scraper
@@ -129,57 +128,60 @@ async def advanced_scraper_example():
scraper = AsyncWebScraper(crawler, strategy) scraper = AsyncWebScraper(crawler, strategy)
# Track statistics # Track statistics
stats = { stats = {"processed": 0, "errors": 0, "total_size": 0}
'processed': 0,
'errors': 0,
'total_size': 0
}
try: try:
# Use streaming mode # Use streaming mode
result_generator = await scraper.ascrape("https://techcrunch.com", parallel_processing=True, stream=True) result_generator = await scraper.ascrape(
"https://techcrunch.com", stream=True
)
async for result in result_generator: async for result in result_generator:
stats['processed'] += 1 stats["processed"] += 1
if result.success: if result.success:
stats['total_size'] += len(result.html) stats["total_size"] += len(result.html)
logger.info(f"Processed: {result.url}") logger.info(f"Processed: {result.url}")
else: else:
stats['errors'] += 1 stats["errors"] += 1
logger.error(f"Failed to process {result.url}: {result.error_message}") logger.error(
f"Failed to process {result.url}: {result.error_message}"
)
# Log progress regularly # Log progress regularly
if stats['processed'] % 10 == 0: if stats["processed"] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed") logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e: except Exception as e:
logger.error(f"Scraping error: {e}") logger.error(f"Scraping error: {e}")
finally: finally:
# Print final statistics # Print final statistics
logger.info("Scraping completed:") logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}") logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}") logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB") logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics # Print filter statistics
for filter_ in filter_chain.filters: for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:") logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}") logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}") logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics # Print scorer statistics
logger.info("Scoring statistics:") logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}") logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}") logger.info(
f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}"
)
if __name__ == "__main__": if __name__ == "__main__":
import asyncio import asyncio
# Run basic example # Run basic example
print("Running basic scraper example...") print("Running basic scraper example...")
asyncio.run(basic_scraper_example()) asyncio.run(basic_scraper_example())
# Run advanced example # Run advanced example
print("\nRunning advanced scraper example...") # print("\nRunning advanced scraper example...")
asyncio.run(advanced_scraper_example()) # asyncio.run(advanced_scraper_example())