diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index 2fd919e1..45a35306 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -1,33 +1,123 @@ +from typing import Union, AsyncGenerator, Optional from .scraper_strategy import ScraperStrategy from .models import ScraperResult, CrawlResult from ..async_webcrawler import AsyncWebCrawler -from typing import Union, AsyncGenerator +import logging +from dataclasses import dataclass +from contextlib import asynccontextmanager + +@dataclass +class ScrapingProgress: + """Tracks the progress of a scraping operation.""" + processed_urls: int = 0 + failed_urls: int = 0 + current_url: Optional[str] = None class AsyncWebScraper: - def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy): + """ + A high-level web scraper that combines an async crawler with a scraping strategy. + + Args: + crawler (AsyncWebCrawler): The async web crawler implementation + strategy (ScraperStrategy): The scraping strategy to use + logger (Optional[logging.Logger]): Custom logger for the scraper + """ + + def __init__( + self, + crawler: AsyncWebCrawler, + strategy: ScraperStrategy, + logger: Optional[logging.Logger] = None + ): + if not isinstance(crawler, AsyncWebCrawler): + raise TypeError("crawler must be an instance of AsyncWebCrawler") + if not isinstance(strategy, ScraperStrategy): + raise TypeError("strategy must be an instance of ScraperStrategy") + self.crawler = crawler self.strategy = strategy + self.logger = logger or logging.getLogger(__name__) + self._progress = ScrapingProgress() - async def ascrape(self, url: str, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: - if stream: - return self._ascrape_yielding(url, parallel_processing) - else: + @property + def progress(self) -> ScrapingProgress: + """Get current scraping progress.""" + return self._progress + + @asynccontextmanager + async def _error_handling_context(self, url: str): + """Context manager for handling errors during scraping.""" + try: + yield + except Exception as e: + self.logger.error(f"Error scraping {url}: {str(e)}") + self._progress.failed_urls += 1 + raise + + async def ascrape( + self, + url: str, + parallel_processing: bool = True, + stream: bool = False + ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: + """ + Scrape a website starting from the given URL. + + Args: + url: Starting URL for scraping + parallel_processing: Whether to process URLs in parallel + stream: If True, yield results as they come; if False, collect all results + + Returns: + Either an async generator yielding CrawlResults or a final ScraperResult + """ + self._progress = ScrapingProgress() # Reset progress + + async with self._error_handling_context(url): + if stream: + return self._ascrape_yielding(url, parallel_processing) return await self._ascrape_collecting(url, parallel_processing) - async def _ascrape_yielding(self, url: str, parallel_processing: bool) -> AsyncGenerator[CrawlResult, None]: - result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) - async for res in result_generator: # Consume the async generator - yield res # Yielding individual results + async def _ascrape_yielding( + self, + url: str, + parallel_processing: bool + ) -> AsyncGenerator[CrawlResult, None]: + """Stream scraping results as they become available.""" + try: + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: + self._progress.processed_urls += 1 + self._progress.current_url = res.url + yield res + except Exception as e: + self.logger.error(f"Error in streaming scrape: {str(e)}") + raise - async def _ascrape_collecting(self, url: str, parallel_processing: bool) -> ScraperResult: + async def _ascrape_collecting( + self, + url: str, + parallel_processing: bool + ) -> ScraperResult: + """Collect all scraping results before returning.""" extracted_data = {} - result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) - async for res in result_generator: # Consume the async generator - extracted_data[res.url] = res - - # Return a final ScraperResult - return ScraperResult( - url=url, - crawled_urls=list(extracted_data.keys()), - extracted_data=extracted_data - ) \ No newline at end of file + + try: + result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing) + async for res in result_generator: + self._progress.processed_urls += 1 + self._progress.current_url = res.url + extracted_data[res.url] = res + + return ScraperResult( + url=url, + crawled_urls=list(extracted_data.keys()), + extracted_data=extracted_data, + stats={ + 'processed_urls': self._progress.processed_urls, + 'failed_urls': self._progress.failed_urls + } + ) + except Exception as e: + self.logger.error(f"Error in collecting scrape: {str(e)}") + raise \ No newline at end of file diff --git a/docs/scrapper/async_web_scraper.md b/docs/scrapper/async_web_scraper.md new file mode 100644 index 00000000..ca5f749f --- /dev/null +++ b/docs/scrapper/async_web_scraper.md @@ -0,0 +1,166 @@ +# AsyncWebScraper: Smart Web Crawling Made Easy + +AsyncWebScraper is a powerful and flexible web scraping tool that makes it easy to collect data from websites efficiently. Whether you need to scrape a few pages or an entire website, AsyncWebScraper handles the complexity of web crawling while giving you fine-grained control over the process. + +## How It Works + +```mermaid +flowchart TB + Start([Start]) --> Init[Initialize AsyncWebScraper\nwith Crawler and Strategy] + Init --> InputURL[Receive URL to scrape] + InputURL --> Decision{Stream or\nCollect?} + + %% Streaming Path + Decision -->|Stream| StreamInit[Initialize Streaming Mode] + StreamInit --> StreamStrategy[Call Strategy.ascrape] + StreamStrategy --> AsyncGen[Create Async Generator] + AsyncGen --> ProcessURL[Process Next URL] + ProcessURL --> FetchContent[Fetch Page Content] + FetchContent --> Extract[Extract Data] + Extract --> YieldResult[Yield CrawlResult] + YieldResult --> CheckMore{More URLs?} + CheckMore -->|Yes| ProcessURL + CheckMore -->|No| StreamEnd([End Stream]) + + %% Collecting Path + Decision -->|Collect| CollectInit[Initialize Collection Mode] + CollectInit --> CollectStrategy[Call Strategy.ascrape] + CollectStrategy --> CollectGen[Create Async Generator] + CollectGen --> ProcessURLColl[Process Next URL] + ProcessURLColl --> FetchContentColl[Fetch Page Content] + FetchContentColl --> ExtractColl[Extract Data] + ExtractColl --> StoreColl[Store in Dictionary] + StoreColl --> CheckMoreColl{More URLs?} + CheckMoreColl -->|Yes| ProcessURLColl + CheckMoreColl -->|No| CreateResult[Create ScraperResult] + CreateResult --> ReturnResult([Return Result]) + + %% Parallel Processing + subgraph Parallel + ProcessURL + FetchContent + Extract + ProcessURLColl + FetchContentColl + ExtractColl + end + + %% Error Handling + FetchContent --> ErrorCheck{Error?} + ErrorCheck -->|Yes| LogError[Log Error] + LogError --> UpdateStats[Update Error Stats] + UpdateStats --> CheckMore + ErrorCheck -->|No| Extract + + FetchContentColl --> ErrorCheckColl{Error?} + ErrorCheckColl -->|Yes| LogErrorColl[Log Error] + LogErrorColl --> UpdateStatsColl[Update Error Stats] + UpdateStatsColl --> CheckMoreColl + ErrorCheckColl -->|No| ExtractColl + + %% Style definitions + classDef process fill:#90caf9,stroke:#000,stroke-width:2px; + classDef decision fill:#fff59d,stroke:#000,stroke-width:2px; + classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px; + classDef start fill:#a5d6a7,stroke:#000,stroke-width:2px; + + class Start,StreamEnd,ReturnResult start; + class Decision,CheckMore,CheckMoreColl,ErrorCheck,ErrorCheckColl decision; + class LogError,LogErrorColl,UpdateStats,UpdateStatsColl error; + class ProcessURL,FetchContent,Extract,ProcessURLColl,FetchContentColl,ExtractColl process; +``` + +AsyncWebScraper uses an intelligent crawling system that can navigate through websites following your specified strategy. It supports two main modes of operation: + +### 1. Streaming Mode +```python +async for result in scraper.ascrape(url, stream=True): + print(f"Found data on {result.url}") + process_data(result.data) +``` +- Perfect for processing large websites +- Memory efficient - handles one page at a time +- Ideal for real-time data processing +- Great for monitoring or continuous scraping tasks + +### 2. Collection Mode +```python +result = await scraper.ascrape(url) +print(f"Scraped {len(result.crawled_urls)} pages") +process_all_data(result.extracted_data) +``` +- Collects all data before returning +- Best for when you need the complete dataset +- Easier to work with for batch processing +- Includes comprehensive statistics + +## Key Features + +- **Smart Crawling**: Automatically follows relevant links while avoiding duplicates +- **Parallel Processing**: Scrapes multiple pages simultaneously for better performance +- **Memory Efficient**: Choose between streaming and collecting based on your needs +- **Error Resilient**: Continues working even if some pages fail to load +- **Progress Tracking**: Monitor the scraping progress in real-time +- **Customizable**: Configure crawling strategy, filters, and scoring to match your needs + +## Quick Start + +```python +from crawl4ai.scraper import AsyncWebScraper, BFSStrategy +from crawl4ai.async_webcrawler import AsyncWebCrawler + +# Initialize the scraper +crawler = AsyncWebCrawler() +strategy = BFSStrategy( + max_depth=2, # How deep to crawl + url_pattern="*.example.com/*" # What URLs to follow +) +scraper = AsyncWebScraper(crawler, strategy) + +# Start scraping +async def main(): + # Collect all results + result = await scraper.ascrape("https://example.com") + print(f"Found {len(result.extracted_data)} pages") + + # Or stream results + async for page in scraper.ascrape("https://example.com", stream=True): + print(f"Processing {page.url}") + +``` + +## Best Practices + +1. **Choose the Right Mode** + - Use streaming for large websites or real-time processing + - Use collecting for smaller sites or when you need the complete dataset + +2. **Configure Depth** + - Start with a small depth (2-3) and increase if needed + - Higher depths mean exponentially more pages to crawl + +3. **Set Appropriate Filters** + - Use URL patterns to stay within relevant sections + - Set content type filters to only process useful pages + +4. **Handle Resources Responsibly** + - Enable parallel processing for faster results + - Consider the target website's capacity + - Implement appropriate delays between requests + +## Common Use Cases + +- **Content Aggregation**: Collect articles, blog posts, or news from multiple pages +- **Data Extraction**: Gather product information, prices, or specifications +- **Site Mapping**: Create a complete map of a website's structure +- **Content Monitoring**: Track changes or updates across multiple pages +- **Data Mining**: Extract and analyze patterns across web pages + +## Advanced Features + +- Custom scoring algorithms for prioritizing important pages +- URL filters for focusing on specific site sections +- Content type filtering for processing only relevant pages +- Progress tracking for monitoring long-running scrapes + +Need more help? Check out our [examples repository](https://github.com/example/crawl4ai/examples) or join our [community Discord](https://discord.gg/example). \ No newline at end of file