feat: create ScraperPageResult model to attach score and depth attributes to yielded/returned crawl results

This commit is contained in:
Aravind Karnam
2025-01-28 16:47:30 +05:30
parent 60ce8bbf55
commit 78223bc847
5 changed files with 56 additions and 37 deletions

View File

@@ -1,6 +1,6 @@
from typing import Union, AsyncGenerator, Optional from typing import Union, AsyncGenerator, Optional
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
from .models import ScraperResult, CrawlResult from .models import ScraperResult, CrawlResult, ScraperPageResult
from ..async_configs import BrowserConfig, CrawlerRunConfig from ..async_configs import BrowserConfig, CrawlerRunConfig
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
@@ -35,17 +35,23 @@ class AsyncWebScraper(AbstractAsyncContextManager):
def __init__( def __init__(
self, self,
crawler_config: CrawlerRunConfig,
browser_config: BrowserConfig,
strategy: ScraperStrategy, strategy: ScraperStrategy,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_config: Optional[BrowserConfig] = None,
logger: Optional[logging.Logger] = None, logger: Optional[logging.Logger] = None,
): ):
if not isinstance(browser_config, BrowserConfig):
raise TypeError("browser_config must be an instance of BrowserConfig")
if not isinstance(crawler_config, CrawlerRunConfig):
raise TypeError("crawler must be an instance of CrawlerRunConfig")
if not isinstance(strategy, ScraperStrategy): if not isinstance(strategy, ScraperStrategy):
raise TypeError("strategy must be an instance of ScraperStrategy") raise TypeError("strategy must be an instance of ScraperStrategy")
if browser_config is not None and not isinstance(browser_config, BrowserConfig):
raise TypeError(
"browser_config must be None or an instance of BrowserConfig"
)
if crawler_config is not None and not isinstance(
crawler_config, CrawlerRunConfig
):
raise TypeError(
"crawler_config must be None or an instance of CrawlerRunConfig"
)
self.crawler_config = crawler_config self.crawler_config = crawler_config
self.browser_config = browser_config self.browser_config = browser_config
@@ -70,7 +76,7 @@ class AsyncWebScraper(AbstractAsyncContextManager):
async def ascrape( async def ascrape(
self, url: str, stream: bool = False self, url: str, stream: bool = False
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: ) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]:
""" """
Scrape a website starting from the given URL. Scrape a website starting from the given URL.
@@ -82,7 +88,6 @@ class AsyncWebScraper(AbstractAsyncContextManager):
Either an async generator yielding CrawlResults or a final ScraperResult Either an async generator yielding CrawlResults or a final ScraperResult
""" """
self._progress = ScrapingProgress() # Reset progress self._progress = ScrapingProgress() # Reset progress
async with self._error_handling_context(url): async with self._error_handling_context(url):
if stream: if stream:
return self._ascrape_yielding(url) return self._ascrape_yielding(url)
@@ -91,16 +96,16 @@ class AsyncWebScraper(AbstractAsyncContextManager):
async def _ascrape_yielding( async def _ascrape_yielding(
self, self,
url: str, url: str,
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[ScraperPageResult, None]:
"""Stream scraping results as they become available.""" """Stream scraping results as they become available."""
try: try:
result_generator = self.strategy.ascrape( result_generator = self.strategy.ascrape(
url, self.crawler_config, self.browser_config url, self.crawler_config, self.browser_config
) )
async for res in result_generator: async for page_result in result_generator:
self._progress.processed_urls += 1 self._progress.processed_urls += 1
self._progress.current_url = res.url self._progress.current_url = page_result.result.url
yield res yield page_result
except Exception as e: except Exception as e:
self.logger.error(f"Error in streaming scrape: {str(e)}") self.logger.error(f"Error in streaming scrape: {str(e)}")
raise raise
@@ -117,9 +122,10 @@ class AsyncWebScraper(AbstractAsyncContextManager):
url, self.crawler_config, self.browser_config url, self.crawler_config, self.browser_config
) )
async for res in result_generator: async for res in result_generator:
url = res.result.url
self._progress.processed_urls += 1 self._progress.processed_urls += 1
self._progress.current_url = res.url self._progress.current_url = url
extracted_data[res.url] = res extracted_data[url] = res
return ScraperResult( return ScraperResult(
url=url, url=url,

View File

@@ -7,7 +7,7 @@ from urllib.parse import urlparse
from ..async_webcrawler import AsyncWebCrawler from ..async_webcrawler import AsyncWebCrawler
from ..async_configs import BrowserConfig, CrawlerRunConfig from ..async_configs import BrowserConfig, CrawlerRunConfig
from .models import CrawlResult from .models import CrawlResult, ScraperPageResult
from .filters import FilterChain from .filters import FilterChain
from .scorers import URLScorer from .scorers import URLScorer
from .scraper_strategy import ScraperStrategy from .scraper_strategy import ScraperStrategy
@@ -46,7 +46,6 @@ class BFSScraperStrategy(ScraperStrategy):
self.stats = CrawlStats(start_time=datetime.now()) self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event() self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links self.process_external_links = process_external_links
self._active_crawls_lock = asyncio.Lock()
async def can_process_url(self, url: str, depth: int) -> bool: async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on filters """Check if URL can be processed based on filters
@@ -117,8 +116,8 @@ class BFSScraperStrategy(ScraperStrategy):
async def ascrape( async def ascrape(
self, self,
start_url: str, start_url: str,
crawler_config: CrawlerRunConfig, crawler_config: Optional[CrawlerRunConfig] = None,
browser_config: BrowserConfig, browser_config: Optional[BrowserConfig] = None,
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy""" """Implement BFS crawling strategy"""
@@ -137,6 +136,11 @@ class BFSScraperStrategy(ScraperStrategy):
visited: Set[str] = set() visited: Set[str] = set()
depths = {start_url: 0} depths = {start_url: 0}
active_crawls = {} # Track URLs currently being processed with depth and score active_crawls = {} # Track URLs currently being processed with depth and score
active_crawls_lock = asyncio.Lock() # Create the lock within the same event loop
# Update crawler_config to stream back results to scraper
crawler_config = crawler_config.clone(stream=True) if crawler_config else CrawlerRunConfig(stream=True)
async with AsyncWebCrawler( async with AsyncWebCrawler(
config=browser_config, config=browser_config,
verbose=True, verbose=True,
@@ -152,7 +156,7 @@ class BFSScraperStrategy(ScraperStrategy):
- Can be interrupted via cancellation (not self._cancel_event.is_set()) - Can be interrupted via cancellation (not self._cancel_event.is_set())
""" """
# Collect batch of URLs into active_crawls to process # Collect batch of URLs into active_crawls to process
async with self._active_crawls_lock: async with active_crawls_lock:
while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty(): while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty():
score, depth, url = await queue.get() score, depth, url = await queue.get()
active_crawls[url] = {"depth": depth, "score": score} active_crawls[url] = {"depth": depth, "score": score}
@@ -170,14 +174,19 @@ class BFSScraperStrategy(ScraperStrategy):
): ):
source_url = result.url source_url = result.url
depth = active_crawls[source_url]["depth"] depth = active_crawls[source_url]["depth"]
async with self._active_crawls_lock: score=active_crawls[source_url]["score"]
async with active_crawls_lock:
active_crawls.pop(source_url, None) active_crawls.pop(source_url, None)
if result.success: if result.success:
await self._process_links( await self._process_links(
result, source_url, queue, visited, depths result, source_url, queue, visited, depths
) )
yield result yield ScraperPageResult(
result = result,
depth=depth,
score=score,
)
else: else:
self.logger.warning( self.logger.warning(
f"Failed to crawl {result.url}: {result.error_message}" f"Failed to crawl {result.url}: {result.error_message}"

View File

@@ -2,8 +2,11 @@ from pydantic import BaseModel
from typing import List, Dict from typing import List, Dict
from ..models import CrawlResult from ..models import CrawlResult
class ScraperPageResult(BaseModel):
result: CrawlResult
depth: int
score: float
class ScraperResult(BaseModel): class ScraperResult(BaseModel):
url: str url: str
crawled_urls: List[str] crawled_urls: List[str]
extracted_data: Dict[str, CrawlResult] extracted_data: Dict[str, ScraperPageResult]

View File

@@ -1,6 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from .models import ScraperResult, CrawlResult from .models import ScraperResult, ScraperPageResult
from ..models import CrawlResult
from ..async_configs import BrowserConfig, CrawlerRunConfig from ..async_configs import BrowserConfig, CrawlerRunConfig
from typing import Union, AsyncGenerator from typing import Union, AsyncGenerator
class ScraperStrategy(ABC): class ScraperStrategy(ABC):
@@ -11,7 +10,7 @@ class ScraperStrategy(ABC):
crawler_config: CrawlerRunConfig, crawler_config: CrawlerRunConfig,
browser_config: BrowserConfig, browser_config: BrowserConfig,
stream: bool = False, stream: bool = False,
) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]: ) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler. """Scrape the given URL using the specified crawler.
Args: Args:
@@ -22,7 +21,7 @@ class ScraperStrategy(ABC):
if False, accumulates results and returns a final ScraperResult. if False, accumulates results and returns a final ScraperResult.
Yields: Yields:
CrawlResult: Individual crawl results if stream is True. ScraperPageResult: Individual page results if stream is True.
Returns: Returns:
ScraperResult: A summary of the scrape results containing the final extracted data ScraperResult: A summary of the scrape results containing the final extracted data

View File

@@ -1,5 +1,6 @@
# basic_scraper_example.py # basic_scraper_example.py
from crawl4ai.async_configs import CrawlerRunConfig from crawl4ai.async_configs import CrawlerRunConfig
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.scraper import ( from crawl4ai.scraper import (
AsyncWebScraper, AsyncWebScraper,
BFSScraperStrategy, BFSScraperStrategy,
@@ -7,7 +8,7 @@ from crawl4ai.scraper import (
URLPatternFilter, URLPatternFilter,
ContentTypeFilter, ContentTypeFilter,
) )
from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig from crawl4ai.async_webcrawler import BrowserConfig
import re import re
import time import time
@@ -41,8 +42,6 @@ async def basic_scraper_example():
# Create the crawler and scraper # Create the crawler and scraper
async with AsyncWebScraper( async with AsyncWebScraper(
crawler_config=CrawlerRunConfig(bypass_cache=True),
browser_config=browser_config,
strategy=bfs_strategy, strategy=bfs_strategy,
) as scraper: ) as scraper:
# Start scraping # Start scraping
@@ -51,8 +50,8 @@ async def basic_scraper_example():
# Process results # Process results
print(f"Crawled {len(result.crawled_urls)} pages:") print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items(): for url, page_result in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes") print(f"- {url}: {len(page_result.result.html)} bytes")
except Exception as e: except Exception as e:
print(f"Error during scraping: {e}") print(f"Error during scraping: {e}")
@@ -130,9 +129,9 @@ async def advanced_scraper_example():
# Create crawler and scraper # Create crawler and scraper
async with AsyncWebScraper( async with AsyncWebScraper(
crawler_config=CrawlerRunConfig(bypass_cache=True),
browser_config=browser_config,
strategy=bfs_strategy, strategy=bfs_strategy,
crawler_config=CrawlerRunConfig(bypass_cache=True, scraping_strategy=LXMLWebScrapingStrategy(),),
browser_config=browser_config,
) as scraper: ) as scraper:
# Track statistics # Track statistics
@@ -143,12 +142,15 @@ async def advanced_scraper_example():
result_generator = await scraper.ascrape( result_generator = await scraper.ascrape(
"https://techcrunch.com", stream=True "https://techcrunch.com", stream=True
) )
async for result in result_generator: async for page_result in result_generator:
result = page_result.result
score = page_result.score
depth = page_result.depth
stats["processed"] += 1 stats["processed"] += 1
if result.success: if result.success:
stats["total_size"] += len(result.html) stats["total_size"] += len(result.html)
logger.info(f"Processed: {result.url}") logger.info(f"Processed at depth: {depth} with score: {score:.3f} : \n {result.url}")
else: else:
stats["errors"] += 1 stats["errors"] += 1
logger.error( logger.error(