Fixed a few bugs, import errors and changed to asyncio wait_for instead of timeout to support python versions < 3.11

This commit is contained in:
Aravind Karnam
2024-11-23 12:39:25 +05:30
parent 60670b2af6
commit c1797037c0
3 changed files with 30 additions and 33 deletions

View File

@@ -1,3 +1,5 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy
from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter
from .filters import URLFilter, FilterChain, URLPatternFilter, ContentTypeFilter, DomainFilter
from .scorers import KeywordRelevanceScorer, PathDepthScorer, FreshnessScorer, CompositeScorer
from .scraper_strategy import ScraperStrategy

View File

@@ -16,6 +16,7 @@ from .models import ScraperResult, CrawlResult
from .filters import FilterChain
from .scorers import URLScorer
from ..async_webcrawler import AsyncWebCrawler
from .scraper_strategy import ScraperStrategy
@dataclass
class CrawlStats:
@@ -28,30 +29,6 @@ class CrawlStats:
current_depth: int = 0
robots_blocked: int = 0
class ScraperStrategy(ABC):
"""Base class for scraping strategies"""
@abstractmethod
async def ascrape(
self,
url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Abstract method for scraping implementation"""
pass
@abstractmethod
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on strategy rules"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass
class BFSScraperStrategy(ScraperStrategy):
"""Breadth-First Search scraping strategy with politeness controls"""
@@ -135,11 +112,15 @@ class BFSScraperStrategy(ScraperStrategy):
) -> CrawlResult:
"""Crawl URL with retry logic"""
try:
async with asyncio.timeout(self.timeout):
return await crawler.arun(url)
return await asyncio.wait_for(crawler.arun(url), timeout=self.timeout)
except asyncio.TimeoutError:
self.logger.error(f"Timeout crawling {url}")
raise
except Exception as e:
# Catch any other exceptions that may cause retries
self.logger.error(f"Error crawling {url}: {e}")
raise
async def process_url(
self,
@@ -181,16 +162,14 @@ class BFSScraperStrategy(ScraperStrategy):
async with self.rate_limiter:
result = await self._crawl_with_retry(crawler, url)
self.stats.urls_processed += 1
# Process links
await self._process_links(result, url, depth, queue, visited, depths)
return result
except Exception as e:
self.logger.error(f"Error crawling {url}: {e}")
self.stats.urls_failed += 1
return None
# Process links
await self._process_links(result, url, depth, queue, visited, depths)
return result
async def _process_links(
self,
result: CrawlResult,

View File

@@ -6,7 +6,13 @@ from typing import Union, AsyncGenerator
class ScraperStrategy(ABC):
@abstractmethod
async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
async def ascrape(
self,
url: str,
crawler: AsyncWebCrawler,
parallel_processing: bool = True,
stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler.
Args:
@@ -23,4 +29,14 @@ class ScraperStrategy(ABC):
ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if stream is False.
"""
pass
@abstractmethod
async def can_process_url(self, url: str) -> bool:
"""Check if URL can be processed based on strategy rules"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass