From 0ff95c83bc9ed259391ab67a7b97a0627d9341cd Mon Sep 17 00:00:00 2001 From: Aravind Karnam Date: Mon, 27 Jan 2025 18:13:33 +0530 Subject: [PATCH] feat: change input params to scraper, Add asynchronous context manager to AsyncWebScraper, Optimise filter application --- crawl4ai/scraper/async_web_scraper.py | 41 ++++++++--- crawl4ai/scraper/bfs_scraper_strategy.py | 94 ++++++++++++------------ crawl4ai/scraper/scraper_strategy.py | 10 +-- docs/scraper/scraper_quickstart.py | 34 +++++---- 4 files changed, 104 insertions(+), 75 deletions(-) diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py index fb0d7286..a5710306 100644 --- a/crawl4ai/scraper/async_web_scraper.py +++ b/crawl4ai/scraper/async_web_scraper.py @@ -1,10 +1,11 @@ from typing import Union, AsyncGenerator, Optional from .scraper_strategy import ScraperStrategy from .models import ScraperResult, CrawlResult -from ..async_webcrawler import AsyncWebCrawler +from ..async_configs import BrowserConfig, CrawlerRunConfig import logging from dataclasses import dataclass from contextlib import asynccontextmanager +from contextlib import AbstractAsyncContextManager @dataclass @@ -16,28 +17,38 @@ class ScrapingProgress: current_url: Optional[str] = None -class AsyncWebScraper: +class AsyncWebScraper(AbstractAsyncContextManager): """ A high-level web scraper that combines an async crawler with a scraping strategy. Args: - crawler (AsyncWebCrawler): The async web crawler implementation + crawler_config (CrawlerRunConfig): Configuration for the crawler run + browser_config (BrowserConfig): Configuration for the browser strategy (ScraperStrategy): The scraping strategy to use logger (Optional[logging.Logger]): Custom logger for the scraper """ + async def __aenter__(self): + # Initialize resources, if any + self.logger.info("Starting the async web scraper.") + return self + def __init__( self, - crawler: AsyncWebCrawler, + crawler_config: CrawlerRunConfig, + browser_config: BrowserConfig, strategy: ScraperStrategy, logger: Optional[logging.Logger] = None, ): - if not isinstance(crawler, AsyncWebCrawler): - raise TypeError("crawler must be an instance of AsyncWebCrawler") + if not isinstance(browser_config, BrowserConfig): + raise TypeError("browser_config must be an instance of BrowserConfig") + if not isinstance(crawler_config, CrawlerRunConfig): + raise TypeError("crawler must be an instance of CrawlerRunConfig") if not isinstance(strategy, ScraperStrategy): raise TypeError("strategy must be an instance of ScraperStrategy") - self.crawler = crawler + self.crawler_config = crawler_config + self.browser_config = browser_config self.strategy = strategy self.logger = logger or logging.getLogger(__name__) self._progress = ScrapingProgress() @@ -83,7 +94,9 @@ class AsyncWebScraper: ) -> AsyncGenerator[CrawlResult, None]: """Stream scraping results as they become available.""" try: - result_generator = self.strategy.ascrape(url, self.crawler) + result_generator = self.strategy.ascrape( + url, self.crawler_config, self.browser_config + ) async for res in result_generator: self._progress.processed_urls += 1 self._progress.current_url = res.url @@ -100,7 +113,9 @@ class AsyncWebScraper: extracted_data = {} try: - result_generator = self.strategy.ascrape(url, self.crawler) + result_generator = self.strategy.ascrape( + url, self.crawler_config, self.browser_config + ) async for res in result_generator: self._progress.processed_urls += 1 self._progress.current_url = res.url @@ -118,3 +133,11 @@ class AsyncWebScraper: except Exception as e: self.logger.error(f"Error in collecting scrape: {str(e)}") raise + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # Cleanup resources or tasks + await self.close() # Assuming you have a close method to cleanup + + async def close(self): + # Perform cleanup tasks + pass diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py index 2a706a28..2c51dd2f 100644 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ b/crawl4ai/scraper/bfs_scraper_strategy.py @@ -5,13 +5,11 @@ import asyncio import logging from urllib.parse import urlparse -# import validators - -from ..async_configs import CrawlerRunConfig +from ..async_webcrawler import AsyncWebCrawler +from ..async_configs import BrowserConfig, CrawlerRunConfig from .models import CrawlResult from .filters import FilterChain from .scorers import URLScorer -from ..async_webcrawler import AsyncWebCrawler from .scraper_strategy import ScraperStrategy from ..config import SCRAPER_BATCH_SIZE @@ -99,28 +97,26 @@ class BFSScraperStrategy(ScraperStrategy): links_to_process += result.links["external"] for link in links_to_process: url = link["href"] - if not await self.can_process_url(url, depth): + if url in visited: + continue + new_depth = depths[source_url] + 1 + if new_depth > self.max_depth: + continue + if not await self.can_process_url(url, new_depth): self.stats.urls_skipped += 1 continue - if url not in visited: - new_depth = depths[source_url] + 1 - if new_depth <= self.max_depth: - if self.url_scorer: - score = self.url_scorer.score(url) - else: - # When no url_scorer is provided all urls will have same score of 0. - # Therefore will be process in FIFO order as per URL depth - score = 0 - await queue.put((score, new_depth, url)) - depths[url] = new_depth - self.stats.total_depth_reached = max( - self.stats.total_depth_reached, new_depth - ) + score = self.url_scorer.score(url) if self.url_scorer else 0 + await queue.put((score, new_depth, url)) + depths[url] = new_depth + self.stats.total_depth_reached = max( + self.stats.total_depth_reached, new_depth + ) async def ascrape( self, start_url: str, - crawler: AsyncWebCrawler, + crawler_config: CrawlerRunConfig, + browser_config: BrowserConfig, ) -> AsyncGenerator[CrawlResult, None]: """Implement BFS crawling strategy""" @@ -164,34 +160,39 @@ class BFSScraperStrategy(ScraperStrategy): if active_crawls: await asyncio.sleep(0.1) continue - # Process batch - crawler_config = CrawlerRunConfig(cache_mode="BYPASS", stream=True) - try: - async for result in await crawler.arun_many( - urls=[url for _, _, url in jobs], config=crawler_config - ): - source_url, depth = next( - (url, depth) for _, depth, url in jobs if url == result.url - ) - active_crawls.remove(source_url) # Remove from active set + async with AsyncWebCrawler( + config=browser_config, + verbose=True, + ) as crawler: + try: + async for result in await crawler.arun_many( + urls=[url for _, _, url in jobs], + config=crawler_config.clone(stream=True), + ): + source_url, depth = next( + (url, depth) + for _, depth, url in jobs + if url == result.url + ) + active_crawls.remove(source_url) # Remove from active set - if result.success: - await self._process_links( - result, source_url, depth, queue, visited, depths - ) - yield result - else: - self.logger.warning( - f"Failed to crawl {result.url}: {result.error_message}" - ) - except Exception as e: - # Remove failed URLs from active set - for _, _, url in jobs: - active_crawls.discard(url) - self.logger.error(f"Batch processing error: {e}") - # Continue processing other batches - continue + if result.success: + await self._process_links( + result, source_url, depth, queue, visited, depths + ) + yield result + else: + self.logger.warning( + f"Failed to crawl {result.url}: {result.error_message}" + ) + except Exception as e: + # Remove failed URLs from active set + for _, _, url in jobs: + active_crawls.discard(url) + self.logger.error(f"Batch processing error: {e}") + # Continue processing other batches + continue except Exception as e: self.logger.error(f"Error in crawl process: {e}") @@ -199,6 +200,7 @@ class BFSScraperStrategy(ScraperStrategy): finally: self.stats.end_time = datetime.now() + await crawler.close() async def shutdown(self): """Clean up resources and stop crawling""" diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py index 66a60130..3015cbcd 100644 --- a/crawl4ai/scraper/scraper_strategy.py +++ b/crawl4ai/scraper/scraper_strategy.py @@ -1,23 +1,23 @@ from abc import ABC, abstractmethod from .models import ScraperResult, CrawlResult from ..models import CrawlResult -from ..async_webcrawler import AsyncWebCrawler +from ..async_configs import BrowserConfig, CrawlerRunConfig from typing import Union, AsyncGenerator - - class ScraperStrategy(ABC): @abstractmethod async def ascrape( self, url: str, - crawler: AsyncWebCrawler, + crawler_config: CrawlerRunConfig, + browser_config: BrowserConfig, stream: bool = False, ) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: """Scrape the given URL using the specified crawler. Args: url (str): The starting URL for the scrape. - crawler (AsyncWebCrawler): The web crawler instance. + crawler_config (CrawlerRunConfig): Configuration for the crawler run. + browser_config (BrowserConfig): Configuration for the browser. stream (bool): If True, yields individual crawl results as they are ready; if False, accumulates results and returns a final ScraperResult. diff --git a/docs/scraper/scraper_quickstart.py b/docs/scraper/scraper_quickstart.py index 9bd91051..da4aef7f 100644 --- a/docs/scraper/scraper_quickstart.py +++ b/docs/scraper/scraper_quickstart.py @@ -1,4 +1,5 @@ # basic_scraper_example.py +from crawl4ai.async_configs import CrawlerRunConfig from crawl4ai.scraper import ( AsyncWebScraper, BFSScraperStrategy, @@ -24,14 +25,14 @@ async def basic_scraper_example(): filter_chain = FilterChain( [ # Only crawl pages within the blog section - URLPatternFilter("*/tutorial/*"), + URLPatternFilter("*/basic/*"), # Only process HTML pages ContentTypeFilter(["text/html"]), ] ) # Initialize the strategy with basic configuration - strategy = BFSScraperStrategy( + bfs_strategy = BFSScraperStrategy( max_depth=2, # Only go 2 levels deep filter_chain=filter_chain, url_scorer=None, # Use default scoring @@ -39,8 +40,11 @@ async def basic_scraper_example(): ) # Create the crawler and scraper - async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler: - scraper = AsyncWebScraper(crawler, strategy) + async with AsyncWebScraper( + crawler_config=CrawlerRunConfig(bypass_cache=True), + browser_config=browser_config, + strategy=bfs_strategy, + ) as scraper: # Start scraping try: result = await scraper.ascrape("https://crawl4ai.com/mkdocs") @@ -69,7 +73,6 @@ from crawl4ai.scraper import ( FreshnessScorer, CompositeScorer, ) -from crawl4ai.async_webcrawler import AsyncWebCrawler async def advanced_scraper_example(): @@ -121,13 +124,14 @@ async def advanced_scraper_example(): ) # Initialize strategy with advanced configuration - strategy = BFSScraperStrategy( + bfs_strategy = BFSScraperStrategy( max_depth=2, filter_chain=filter_chain, url_scorer=scorer ) # Create crawler and scraper - async with AsyncWebCrawler(verbose=True, config=browser_config) as crawler: - scraper = AsyncWebScraper(crawler, strategy) + async with AsyncWebScraper(crawler_config=CrawlerRunConfig(bypass_cache=True), + browser_config=browser_config, + strategy=bfs_strategy) as scraper: # Track statistics stats = {"processed": 0, "errors": 0, "total_size": 0} @@ -182,12 +186,12 @@ if __name__ == "__main__": import time # Run basic example - start_time = time.perf_counter() - print("Running basic scraper example...") - asyncio.run(basic_scraper_example()) - end_time = time.perf_counter() - print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") + # start_time = time.perf_counter() + # print("Running basic scraper example...") + # asyncio.run(basic_scraper_example()) + # end_time = time.perf_counter() + # print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") # # Run advanced example - # print("\nRunning advanced scraper example...") - # asyncio.run(advanced_scraper_example()) + print("\nRunning advanced scraper example...") + asyncio.run(advanced_scraper_example())