208 lines
8.0 KiB
Python
208 lines
8.0 KiB
Python
from typing import AsyncGenerator, Optional, Dict, Set
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import asyncio
|
|
import logging
|
|
from urllib.parse import urlparse
|
|
|
|
from ..async_webcrawler import AsyncWebCrawler
|
|
from ..async_configs import BrowserConfig, CrawlerRunConfig
|
|
from .models import CrawlResult
|
|
from .filters import FilterChain
|
|
from .scorers import URLScorer
|
|
from .scraper_strategy import ScraperStrategy
|
|
from ..config import SCRAPER_BATCH_SIZE
|
|
|
|
|
|
@dataclass
|
|
class CrawlStats:
|
|
"""Statistics for the crawling process"""
|
|
|
|
start_time: datetime
|
|
urls_processed: int = 0
|
|
urls_failed: int = 0
|
|
urls_skipped: int = 0
|
|
total_depth_reached: int = 0
|
|
current_depth: int = 0
|
|
|
|
|
|
class BFSScraperStrategy(ScraperStrategy):
|
|
"""Breadth-First Search scraping strategy with politeness controls"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_depth: int,
|
|
filter_chain: FilterChain,
|
|
url_scorer: URLScorer,
|
|
process_external_links: bool = False,
|
|
logger: Optional[logging.Logger] = None,
|
|
):
|
|
self.max_depth = max_depth
|
|
self.filter_chain = filter_chain
|
|
self.url_scorer = url_scorer
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
# Crawl control
|
|
self.stats = CrawlStats(start_time=datetime.now())
|
|
self._cancel_event = asyncio.Event()
|
|
self.process_external_links = process_external_links
|
|
|
|
async def can_process_url(self, url: str, depth: int) -> bool:
|
|
"""Check if URL can be processed based on filters
|
|
This is our gatekeeper method that determines if a URL should be processed. It:
|
|
- Validates URL format using a robust built-in method
|
|
- Applies custom filters from the filter chain
|
|
- Updates statistics for blocked URLs
|
|
- Returns False early if any check fails
|
|
"""
|
|
try:
|
|
result = urlparse(url)
|
|
if not all([result.scheme, result.netloc]):
|
|
raise ValueError("Invalid URL")
|
|
if result.scheme not in ("http", "https"):
|
|
raise ValueError("URL must be HTTP or HTTPS")
|
|
if not result.netloc or "." not in result.netloc:
|
|
raise ValueError("Invalid domain")
|
|
except Exception as e:
|
|
self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}")
|
|
return False
|
|
|
|
# Apply the filter chain if it's not start page
|
|
if depth != 0 and not self.filter_chain.apply(url):
|
|
return False
|
|
|
|
return True
|
|
|
|
async def _process_links(
|
|
self,
|
|
result: CrawlResult,
|
|
source_url: str,
|
|
depth: int,
|
|
queue: asyncio.PriorityQueue,
|
|
visited: Set[str],
|
|
depths: Dict[str, int],
|
|
):
|
|
"""Process extracted links from crawl result.
|
|
This is our link processor that:
|
|
Handles both internal and external links
|
|
Checks if URL can be processed - validates URL, applies Filters with can_process_url
|
|
Checks depth limits
|
|
Scores URLs for priority
|
|
Updates depth tracking
|
|
Adds valid URLs to the queue
|
|
Updates maximum depth statistics
|
|
"""
|
|
links_to_process = result.links["internal"]
|
|
if self.process_external_links:
|
|
links_to_process += result.links["external"]
|
|
for link in links_to_process:
|
|
url = link["href"]
|
|
if url in visited:
|
|
continue
|
|
new_depth = depths[source_url] + 1
|
|
if new_depth > self.max_depth:
|
|
continue
|
|
if not await self.can_process_url(url, new_depth):
|
|
self.stats.urls_skipped += 1
|
|
continue
|
|
score = self.url_scorer.score(url) if self.url_scorer else 0
|
|
await queue.put((score, new_depth, url))
|
|
depths[url] = new_depth
|
|
self.stats.total_depth_reached = max(
|
|
self.stats.total_depth_reached, new_depth
|
|
)
|
|
|
|
async def ascrape(
|
|
self,
|
|
start_url: str,
|
|
crawler_config: CrawlerRunConfig,
|
|
browser_config: BrowserConfig,
|
|
) -> AsyncGenerator[CrawlResult, None]:
|
|
"""Implement BFS crawling strategy"""
|
|
|
|
# Initialize crawl state
|
|
"""
|
|
queue: A priority queue where items are tuples of (score, depth, url)
|
|
Score: Determines crawling priority (lower = higher priority)
|
|
Depth: Current distance from start_url
|
|
URL: The actual URL to crawl
|
|
visited: Keeps track of URLs we've already seen to avoid cycles
|
|
depths: Maps URLs to their depths from the start URL
|
|
active_crawls: Tracks currently running crawl tasks
|
|
"""
|
|
queue = asyncio.PriorityQueue()
|
|
await queue.put((0, 0, start_url))
|
|
visited: Set[str] = set()
|
|
depths = {start_url: 0}
|
|
active_crawls = set() # Track URLs currently being processed
|
|
try:
|
|
while (
|
|
not queue.empty() or active_crawls
|
|
) and not self._cancel_event.is_set():
|
|
"""
|
|
This sets up our main control loop which:
|
|
- Continues while there are URLs to process (not queue.empty())
|
|
- Or while there are active crawls still running (arun_many)
|
|
- Can be interrupted via cancellation (not self._cancel_event.is_set())
|
|
"""
|
|
# Collect batch of jobs to process
|
|
jobs = []
|
|
# Fill batch with available jobs
|
|
while len(jobs) < SCRAPER_BATCH_SIZE and not queue.empty():
|
|
score, depth, url = await queue.get()
|
|
if url not in active_crawls: # Only add if not currently processing
|
|
jobs.append((score, depth, url))
|
|
active_crawls.add(url)
|
|
self.stats.current_depth = depth
|
|
|
|
if not jobs:
|
|
# If no jobs but active crawls exist, wait a bit and continue
|
|
if active_crawls:
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
# Process batch
|
|
async with AsyncWebCrawler(
|
|
config=browser_config,
|
|
verbose=True,
|
|
) as crawler:
|
|
try:
|
|
async for result in await crawler.arun_many(
|
|
urls=[url for _, _, url in jobs],
|
|
config=crawler_config.clone(stream=True),
|
|
):
|
|
source_url, depth = next(
|
|
(url, depth)
|
|
for _, depth, url in jobs
|
|
if url == result.url
|
|
)
|
|
active_crawls.remove(source_url) # Remove from active set
|
|
|
|
if result.success:
|
|
await self._process_links(
|
|
result, source_url, depth, queue, visited, depths
|
|
)
|
|
yield result
|
|
else:
|
|
self.logger.warning(
|
|
f"Failed to crawl {result.url}: {result.error_message}"
|
|
)
|
|
except Exception as e:
|
|
# Remove failed URLs from active set
|
|
for _, _, url in jobs:
|
|
active_crawls.discard(url)
|
|
self.logger.error(f"Batch processing error: {e}")
|
|
# Continue processing other batches
|
|
continue
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in crawl process: {e}")
|
|
raise
|
|
|
|
finally:
|
|
self.stats.end_time = datetime.now()
|
|
await crawler.close()
|
|
|
|
async def shutdown(self):
|
|
"""Clean up resources and stop crawling"""
|
|
self._cancel_event.set()
|