refactor: Remove the URL processing logic out of scraper

This commit is contained in:
Aravind Karnam
2025-01-21 12:16:59 +05:30
parent a677c2b61d
commit 1079965453
2 changed files with 35 additions and 144 deletions

View File

@@ -57,7 +57,6 @@ class AsyncWebScraper:
async def ascrape( async def ascrape(
self, self,
url: str, url: str,
parallel_processing: bool = True,
stream: bool = False stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
""" """
@@ -65,7 +64,6 @@ class AsyncWebScraper:
Args: Args:
url: Starting URL for scraping url: Starting URL for scraping
parallel_processing: Whether to process URLs in parallel
stream: If True, yield results as they come; if False, collect all results stream: If True, yield results as they come; if False, collect all results
Returns: Returns:
@@ -75,17 +73,16 @@ class AsyncWebScraper:
async with self._error_handling_context(url): async with self._error_handling_context(url):
if stream: if stream:
return self._ascrape_yielding(url, parallel_processing) return self._ascrape_yielding(url)
return await self._ascrape_collecting(url, parallel_processing) return await self._ascrape_collecting(url)
async def _ascrape_yielding( async def _ascrape_yielding(
self, self,
url: str, url: str,
parallel_processing: bool
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
"""Stream scraping results as they become available.""" """Stream scraping results as they become available."""
try: try:
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) result_generator = self.strategy.ascrape(url, self.crawler)
async for res in result_generator: async for res in result_generator:
self._progress.processed_urls += 1 self._progress.processed_urls += 1
self._progress.current_url = res.url self._progress.current_url = res.url
@@ -97,13 +94,12 @@ class AsyncWebScraper:
async def _ascrape_collecting( async def _ascrape_collecting(
self, self,
url: str, url: str,
parallel_processing: bool
) -> ScraperResult: ) -> ScraperResult:
"""Collect all scraping results before returning.""" """Collect all scraping results before returning."""
extracted_data = {} extracted_data = {}
try: try:
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) result_generator = self.strategy.ascrape(url, self.crawler)
async for res in result_generator: async for res in result_generator:
self._progress.processed_urls += 1 self._progress.processed_urls += 1
self._progress.current_url = res.url self._progress.current_url = res.url

View File

@@ -6,15 +6,12 @@ import logging
from urllib.parse import urlparse from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser from urllib.robotparser import RobotFileParser
import validators import validators
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
from crawl4ai.async_configs import CrawlerRunConfig
from .models import CrawlResult from .models import CrawlResult
from .filters import FilterChain from .filters import FilterChain
from .scorers import URLScorer from .scorers import URLScorer
from ..async_webcrawler import AsyncWebCrawler, CrawlerRunConfig from ..async_webcrawler import AsyncWebCrawler
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
@dataclass @dataclass
@@ -37,29 +34,18 @@ class BFSScraperStrategy(ScraperStrategy):
filter_chain: FilterChain, filter_chain: FilterChain,
url_scorer: URLScorer, url_scorer: URLScorer,
process_external_links: bool = False, process_external_links: bool = False,
max_concurrent: int = 5,
min_crawl_delay: int = 1,
timeout: int = 30,
logger: Optional[logging.Logger] = None logger: Optional[logging.Logger] = None
): ):
self.max_depth = max_depth self.max_depth = max_depth
self.filter_chain = filter_chain self.filter_chain = filter_chain
self.url_scorer = url_scorer self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
self.min_crawl_delay = min_crawl_delay
self.timeout = timeout
self.logger = logger or logging.getLogger(__name__) self.logger = logger or logging.getLogger(__name__)
# Crawl control # Crawl control
self.stats = CrawlStats(start_time=datetime.now()) self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event() self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links self.process_external_links = process_external_links
# Rate limiting and politeness
self.rate_limiter = AsyncLimiter(1, 1)
self.last_crawl_time = defaultdict(float)
self.robot_parsers: Dict[str, RobotFileParser] = {} self.robot_parsers: Dict[str, RobotFileParser] = {}
self.domain_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
async def can_process_url(self, url: str, depth: int) -> bool: async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on robots.txt and filters """Check if URL can be processed based on robots.txt and filters
@@ -107,74 +93,6 @@ class BFSScraperStrategy(ScraperStrategy):
return None return None
return self.robot_parsers[domain] return self.robot_parsers[domain]
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10))
async def _crawl_with_retry(
self,
crawler: AsyncWebCrawler,
url: str
) -> CrawlResult:
"""Crawl URL with retry logic"""
try:
crawler_config = CrawlerRunConfig(cache_mode="BYPASS")
return await asyncio.wait_for(crawler.arun(url, config=crawler_config), timeout=self.timeout)
except asyncio.TimeoutError:
self.logger.error(f"Timeout crawling {url}")
raise
except Exception as e:
# Catch any other exceptions that may cause retries
self.logger.error(f"Error crawling {url}: {e}")
raise
async def process_url(
self,
url: str,
depth: int,
crawler: AsyncWebCrawler,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int]
) -> Optional[CrawlResult]:
"""Process a single URL and extract links.
This is our main URL processing workhorse that:
- Checks for cancellation
- Validates URLs through can_process_url
- Implements politeness delays per domain
- Applies rate limiting
- Handles crawling with retries
- Updates various statistics
- Processes extracted links
- Returns the crawl result or None on failure
"""
if self._cancel_event.is_set():
return None
if not await self.can_process_url(url, depth):
self.stats.urls_skipped += 1
return None
# Politeness delay
domain = urlparse(url).netloc
time_since_last = time.time() - self.last_crawl_time[domain]
if time_since_last < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last)
self.last_crawl_time[domain] = time.time()
# Crawl with rate limiting
try:
async with self.rate_limiter:
result = await self._crawl_with_retry(crawler, url)
self.stats.urls_processed += 1
# Process links
await self._process_links(result, url, depth, queue, visited, depths)
return result
except Exception as e:
self.logger.error(f"Error crawling {url}: {e}")
self.stats.urls_failed += 1
return None
async def _process_links( async def _process_links(
self, self,
result: CrawlResult, result: CrawlResult,
@@ -187,7 +105,7 @@ class BFSScraperStrategy(ScraperStrategy):
"""Process extracted links from crawl result. """Process extracted links from crawl result.
This is our link processor that: This is our link processor that:
Handles both internal and external links Handles both internal and external links
Normalizes URLs (removes fragments) Checks if URL can be processed - validates URL, applies Filters and tests Robots.txt compliance with can_process_url
Checks depth limits Checks depth limits
Scores URLs for priority Scores URLs for priority
Updates depth tracking Updates depth tracking
@@ -199,6 +117,9 @@ class BFSScraperStrategy(ScraperStrategy):
links_to_process += result.links["external"] links_to_process += result.links["external"]
for link in links_to_process: for link in links_to_process:
url = link['href'] url = link['href']
if not await self.can_process_url(url, depth):
self.stats.urls_skipped += 1
continue
if url not in visited: if url not in visited:
new_depth = depths[source_url] + 1 new_depth = depths[source_url] + 1
if new_depth <= self.max_depth: if new_depth <= self.max_depth:
@@ -219,7 +140,6 @@ class BFSScraperStrategy(ScraperStrategy):
self, self,
start_url: str, start_url: str,
crawler: AsyncWebCrawler, crawler: AsyncWebCrawler,
parallel_processing: bool = True
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy""" """Implement BFS crawling strategy"""
@@ -237,62 +157,38 @@ class BFSScraperStrategy(ScraperStrategy):
await queue.put((0, 0, start_url)) await queue.put((0, 0, start_url))
visited: Set[str] = set() visited: Set[str] = set()
depths = {start_url: 0} depths = {start_url: 0}
pending_tasks = set()
try: try:
while (not queue.empty() or pending_tasks) and not self._cancel_event.is_set(): while not queue.empty() and not self._cancel_event.is_set():
""" """
This sets up our main control loop which: This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty()) - Continues while there are URLs to process (not queue.empty())
- Or while there are tasks still running (pending_tasks) - Or while there are tasks still running (pending_tasks)
- Can be interrupted via cancellation (not self._cancel_event.is_set()) - Can be interrupted via cancellation (not self._cancel_event.is_set())
""" """
# Start new tasks up to max_concurrent n = 3
while not queue.empty() and len(pending_tasks) < self.max_concurrent: jobs = []
""" for _ in range(n):
This section manages task creation: if self.queue.empty():
Checks if we can start more tasks (under max_concurrent limit) break
Gets the next URL from the priority queue jobs.append(await self.queue.get())
Marks URLs as visited immediately to prevent duplicates
Updates current depth in stats # Filter jobs directly, ensuring uniqueness and checking against visited
Either: filtered_jobs = []
Creates a new async task (parallel mode) for job in jobs:
Processes URL directly (sequential mode) _, depth, url = job
""" self.stats.current_depth = depth
_, depth, url = await queue.get()
if url not in visited: if url not in visited:
visited.add(url) visited.add(url)
self.stats.current_depth = depth filtered_jobs.append(job)
if parallel_processing: crawler_config = CrawlerRunConfig(cache_mode="BYPASS")
task = asyncio.create_task( async for result in await crawler.arun_many(urls=[url for _, _, url in filtered_jobs],
self.process_url(url, depth, crawler, queue, visited, depths) config=crawler_config.clone(stream=True)):
) print(f"Received result for: {result.url} - Success: {result.success}")
pending_tasks.add(task) source_url, depth = next((url, depth) for _, depth, url in filtered_jobs if url == result.source_url)
else: await self._process_links(result, source_url, depth, queue, visited, depths)
result = await self.process_url( yield result
url, depth, crawler, queue, visited, depths
)
if result:
yield result
# Process completed tasks
"""
This section manages completed tasks:
Waits for any task to complete using asyncio.wait
Uses FIRST_COMPLETED to handle results as soon as they're ready
Yields successful results to the caller
Updates pending_tasks to remove completed ones
"""
if pending_tasks:
done, pending_tasks = await asyncio.wait(
pending_tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if result:
yield result
except Exception as e: except Exception as e:
self.logger.error(f"Error in crawl process: {e}") self.logger.error(f"Error in crawl process: {e}")
@@ -300,13 +196,12 @@ class BFSScraperStrategy(ScraperStrategy):
finally: finally:
# Clean up any remaining tasks # Clean up any remaining tasks
for task in pending_tasks: # for task in pending_tasks:
task.cancel() # task.cancel()
self.stats.end_time = datetime.now() self.stats.end_time = datetime.now()
async def shutdown(self): async def shutdown(self):
"""Clean up resources and stop crawling""" """Clean up resources and stop crawling"""
self._cancel_event.set() self._cancel_event.set()
# Clear caches and close connections # Clear caches and close connections
self.robot_parsers.clear() self.robot_parsers.clear()
self.domain_queues.clear()