diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 617b6901..c722235b 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -10,13 +10,20 @@ import asyncio # from contextlib import nullcontext, asynccontextmanager from contextlib import asynccontextmanager -from .models import CrawlResult, MarkdownGenerationResult, CrawlerTaskResult, DispatchResult + +from .models import ( + CrawlResult, + MarkdownGenerationResult, + CrawlerTaskResult, + DispatchResult, + DeepCrawlingProgress, +) from .async_database import async_db_manager from .chunking_strategy import * # noqa: F403 from .chunking_strategy import RegexChunking, ChunkingStrategy, IdentityChunking from .content_filter_strategy import * # noqa: F403 from .content_filter_strategy import RelevantContentFilter -from .extraction_strategy import * # noqa: F403 +from .extraction_strategy import * # noqa: F403 from .extraction_strategy import NoExtractionStrategy, ExtractionStrategy from .async_crawler_strategy import ( AsyncCrawlerStrategy, @@ -30,8 +37,9 @@ from .markdown_generation_strategy import ( ) from .async_logger import AsyncLogger from .async_configs import BrowserConfig, CrawlerRunConfig -from .async_dispatcher import * # noqa: F403 +from .async_dispatcher import * # noqa: F403 from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter +from .traversal import TraversalStrategy from .config import MIN_WORD_THRESHOLD from .utils import ( @@ -46,7 +54,7 @@ from .utils import ( from typing import Union, AsyncGenerator, List, TypeVar from collections.abc import AsyncGenerator -CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult) +CrawlResultT = TypeVar("CrawlResultT", bound=CrawlResult) RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] from .__version__ import __version__ as crawl4ai_version @@ -257,7 +265,7 @@ class AsyncWebCrawler: @asynccontextmanager async def nullcontext(self): - """异步空上下文管理器""" + """Asynchronous null context manager""" yield async def arun( @@ -420,14 +428,18 @@ class AsyncWebCrawler: # Check robots.txt if enabled if config and config.check_robots_txt: - if not await self.robots_parser.can_fetch(url, self.browser_config.user_agent): + if not await self.robots_parser.can_fetch( + url, self.browser_config.user_agent + ): return CrawlResult( url=url, html="", success=False, status_code=403, error_message="Access denied by robots.txt", - response_headers={"X-Robots-Status": "Blocked by robots.txt"} + response_headers={ + "X-Robots-Status": "Blocked by robots.txt" + }, ) # Pass config to crawl method @@ -449,7 +461,7 @@ class AsyncWebCrawler: ) # Process the HTML content - crawl_result : CrawlResult = await self.aprocess_html( + crawl_result: CrawlResult = await self.aprocess_html( url=url, html=html, extracted_content=extracted_content, @@ -717,7 +729,7 @@ class AsyncWebCrawler: async def arun_many( self, urls: List[str], - config: Optional[CrawlerRunConfig] = None, + config: Optional[CrawlerRunConfig] = None, dispatcher: Optional[BaseDispatcher] = None, # Legacy parameters maintained for backwards compatibility word_count_threshold=MIN_WORD_THRESHOLD, @@ -731,8 +743,8 @@ class AsyncWebCrawler: pdf: bool = False, user_agent: str = None, verbose=True, - **kwargs - ) -> RunManyReturn: + **kwargs, + ) -> RunManyReturn: """ Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy. @@ -786,7 +798,9 @@ class AsyncWebCrawler: ) transform_result = lambda task_result: ( - setattr(task_result.result, 'dispatch_result', + setattr( + task_result.result, + "dispatch_result", DispatchResult( task_id=task_result.task_id, memory_usage=task_result.memory_usage, @@ -794,20 +808,59 @@ class AsyncWebCrawler: start_time=task_result.start_time, end_time=task_result.end_time, error_message=task_result.error_message, - ) - ) or task_result.result + ), + ) + or task_result.result ) stream = config.stream - + if stream: + async def result_transformer(): - async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config): + async for task_result in dispatcher.run_urls_stream( + crawler=self, urls=urls, config=config + ): yield transform_result(task_result) + return result_transformer() else: _results = await dispatcher.run_urls(crawler=self, urls=urls, config=config) - return [transform_result(res) for res in _results] + return [transform_result(res) for res in _results] + + async def adeep_crawl( + self, + url: str, + strategy: TraversalStrategy, + crawler_run_config: Optional[CrawlerRunConfig] = None, + stream: Optional[bool] = False, + ) -> Union[AsyncGenerator[CrawlResult,None],List[CrawlResult]]: + """ + Traverse child URLs starting from the given URL, based on Traversal strategy + + Args: + url: Starting URL for scraping + strategy: Traversal strategy to use + crawler_config: Configuration object controlling crawl behavior + stream (bool, optional): Whether to stream the results. Defaults to False. + + Returns: + List of CrawlResults + """ + try: + result_generator = strategy.deep_crawl( + url, crawler=self, crawler_run_config=crawler_run_config + ) + if stream: + return result_generator + else: + results = [] + async for result in result_generator: + results.append(result) + return results + except Exception as e: + self.logger.error(f"Error in streaming Deep Crawl: {str(e)}") + raise async def aclear_cache(self): """Clear the cache database.""" diff --git a/crawl4ai/models.py b/crawl4ai/models.py index 57edacd7..8bb1402c 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -1,3 +1,4 @@ +from __future__ import annotations from pydantic import BaseModel, HttpUrl from typing import List, Dict, Optional, Callable, Awaitable, Union, Any from enum import Enum @@ -5,6 +6,7 @@ from dataclasses import dataclass from .ssl_certificate import SSLCertificate from datetime import datetime from datetime import timedelta +from math import inf ############################### @@ -95,6 +97,18 @@ class DispatchResult(BaseModel): error_message: str = "" +@dataclass +class TraversalStats: + """Statistics for the traversal process""" + + start_time: datetime + urls_processed: int = 0 + urls_failed: int = 0 + urls_skipped: int = 0 + total_depth_reached: int = 0 + current_depth: int = 0 + + class CrawlResult(BaseModel): url: str html: str @@ -118,6 +132,12 @@ class CrawlResult(BaseModel): ssl_certificate: Optional[SSLCertificate] = None dispatch_result: Optional[DispatchResult] = None redirected_url: Optional[str] = None + # Attributes for position + depth: Optional[int] = None + score: Optional[float] = -inf + # For referencing children and parents from a flattened list of CrawlResult elements + parent_url: Optional[str] = None + child_urls: Optional[List[str]] = None class Config: arbitrary_types_allowed = True @@ -161,12 +181,12 @@ class Link(BaseModel): class Media(BaseModel): images: List[MediaItem] = [] - videos: List[ - MediaItem - ] = [] # Using MediaItem model for now, can be extended with Video model if needed - audios: List[ - MediaItem - ] = [] # Using MediaItem model for now, can be extended with Audio model if needed + videos: List[MediaItem] = ( + [] + ) # Using MediaItem model for now, can be extended with Video model if needed + audios: List[MediaItem] = ( + [] + ) # Using MediaItem model for now, can be extended with Audio model if needed class Links(BaseModel): diff --git a/crawl4ai/scraper/__init__.py b/crawl4ai/scraper/__init__.py deleted file mode 100644 index 92ef3539..00000000 --- a/crawl4ai/scraper/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -from .async_web_scraper import AsyncWebScraper -from .bfs_scraper_strategy import BFSScraperStrategy -from .filters import ( - URLFilter, - FilterChain, - URLPatternFilter, - ContentTypeFilter, - DomainFilter, -) -from .scorers import ( - KeywordRelevanceScorer, - PathDepthScorer, - FreshnessScorer, - CompositeScorer, -) -from .scraper_strategy import ScraperStrategy diff --git a/crawl4ai/scraper/async_web_scraper.py b/crawl4ai/scraper/async_web_scraper.py deleted file mode 100644 index 9e21a4e7..00000000 --- a/crawl4ai/scraper/async_web_scraper.py +++ /dev/null @@ -1,149 +0,0 @@ -from typing import Union, AsyncGenerator, Optional -from .scraper_strategy import ScraperStrategy -from .models import ScraperResult, CrawlResult, ScraperPageResult -from ..async_configs import BrowserConfig, CrawlerRunConfig -import logging -from dataclasses import dataclass -from contextlib import asynccontextmanager -from contextlib import AbstractAsyncContextManager - - -@dataclass -class ScrapingProgress: - """Tracks the progress of a scraping operation.""" - - processed_urls: int = 0 - failed_urls: int = 0 - current_url: Optional[str] = None - - -class AsyncWebScraper(AbstractAsyncContextManager): - """ - A high-level web scraper that combines an async crawler with a scraping strategy. - - Args: - crawler_config (CrawlerRunConfig): Configuration for the crawler run - browser_config (BrowserConfig): Configuration for the browser - strategy (ScraperStrategy): The scraping strategy to use - logger (Optional[logging.Logger]): Custom logger for the scraper - """ - - async def __aenter__(self): - # Initialize resources, if any - self.logger.info("Starting the async web scraper.") - return self - - def __init__( - self, - strategy: ScraperStrategy, - crawler_config: Optional[CrawlerRunConfig] = None, - browser_config: Optional[BrowserConfig] = None, - logger: Optional[logging.Logger] = None, - ): - if not isinstance(strategy, ScraperStrategy): - raise TypeError("strategy must be an instance of ScraperStrategy") - if browser_config is not None and not isinstance(browser_config, BrowserConfig): - raise TypeError( - "browser_config must be None or an instance of BrowserConfig" - ) - if crawler_config is not None and not isinstance( - crawler_config, CrawlerRunConfig - ): - raise TypeError( - "crawler_config must be None or an instance of CrawlerRunConfig" - ) - - self.crawler_config = crawler_config - self.browser_config = browser_config - self.strategy = strategy - self.logger = logger or logging.getLogger(__name__) - self._progress = ScrapingProgress() - - @property - def progress(self) -> ScrapingProgress: - """Get current scraping progress.""" - return self._progress - - @asynccontextmanager - async def _error_handling_context(self, url: str): - """Context manager for handling errors during scraping.""" - try: - yield - except Exception as e: - self.logger.error(f"Error scraping {url}: {str(e)}") - self._progress.failed_urls += 1 - raise - - async def ascrape( - self, url: str, stream: bool = False - ) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]: - """ - Scrape a website starting from the given URL. - - Args: - url: Starting URL for scraping - stream: If True, yield results as they come; if False, collect all results - - Returns: - Either an async generator yielding CrawlResults or a final ScraperResult - """ - self._progress = ScrapingProgress() # Reset progress - async with self._error_handling_context(url): - if stream: - return self._ascrape_yielding(url) - return await self._ascrape_collecting(url) - - async def _ascrape_yielding( - self, - url: str, - ) -> AsyncGenerator[ScraperPageResult, None]: - """Stream scraping results as they become available.""" - try: - result_generator = self.strategy.ascrape( - url, self.crawler_config, self.browser_config - ) - async for page_result in result_generator: - self._progress.processed_urls += 1 - self._progress.current_url = page_result.result.url - yield page_result - except Exception as e: - self.logger.error(f"Error in streaming scrape: {str(e)}") - raise - - async def _ascrape_collecting( - self, - url: str, - ) -> ScraperResult: - """Collect all scraping results before returning.""" - extracted_data = {} - - try: - result_generator = self.strategy.ascrape( - url, self.crawler_config, self.browser_config - ) - async for res in result_generator: - url = res.result.url - self._progress.processed_urls += 1 - self._progress.current_url = url - extracted_data[url] = res - - return ScraperResult( - url=url, - crawled_urls=list(extracted_data.keys()), - extracted_data=extracted_data, - stats={ - "processed_urls": self._progress.processed_urls, - "failed_urls": self._progress.failed_urls, - }, - ) - except Exception as e: - self.logger.error(f"Error in collecting scrape: {str(e)}") - raise - - async def __aexit__(self, exc_type, exc_val, exc_tb): - # Cleanup resources or tasks - await self.close() # Assuming you have a close method to cleanup - - async def close(self): - # Perform cleanup tasks - pass diff --git a/crawl4ai/scraper/bfs_scraper_strategy.py b/crawl4ai/scraper/bfs_scraper_strategy.py deleted file mode 100644 index 068583b9..00000000 --- a/crawl4ai/scraper/bfs_scraper_strategy.py +++ /dev/null @@ -1,209 +0,0 @@ -from typing import AsyncGenerator, Optional, Dict, Set -from dataclasses import dataclass -from datetime import datetime -import asyncio -import logging -from urllib.parse import urlparse - -from ..async_webcrawler import AsyncWebCrawler -from ..async_configs import BrowserConfig, CrawlerRunConfig -from .models import CrawlResult, ScraperPageResult -from .filters import FilterChain -from .scorers import URLScorer -from .scraper_strategy import ScraperStrategy -from ..config import SCRAPER_BATCH_SIZE - - -@dataclass -class CrawlStats: - """Statistics for the crawling process""" - - start_time: datetime - urls_processed: int = 0 - urls_failed: int = 0 - urls_skipped: int = 0 - total_depth_reached: int = 0 - current_depth: int = 0 - - -class BFSScraperStrategy(ScraperStrategy): - """Breadth-First Search scraping strategy with politeness controls""" - - def __init__( - self, - max_depth: int, - filter_chain: FilterChain, - url_scorer: URLScorer, - process_external_links: bool = False, - logger: Optional[logging.Logger] = None, - ): - self.max_depth = max_depth - self.filter_chain = filter_chain - self.url_scorer = url_scorer - self.logger = logger or logging.getLogger(__name__) - - # Crawl control - self.stats = CrawlStats(start_time=datetime.now()) - self._cancel_event = asyncio.Event() - self.process_external_links = process_external_links - - async def can_process_url(self, url: str, depth: int) -> bool: - """Check if URL can be processed based on filters - This is our gatekeeper method that determines if a URL should be processed. It: - - Validates URL format using a robust built-in method - - Applies custom filters from the filter chain - - Updates statistics for blocked URLs - - Returns False early if any check fails - """ - try: - result = urlparse(url) - if not all([result.scheme, result.netloc]): - raise ValueError("Invalid URL") - if result.scheme not in ("http", "https"): - raise ValueError("URL must be HTTP or HTTPS") - if not result.netloc or "." not in result.netloc: - raise ValueError("Invalid domain") - except Exception as e: - self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}") - return False - - # Apply the filter chain if it's not start page - if depth != 0 and not self.filter_chain.apply(url): - return False - - return True - - async def _process_links( - self, - result: CrawlResult, - source_url: str, - queue: asyncio.PriorityQueue, - visited: Set[str], - depths: Dict[str, int], - ): - """Process extracted links from crawl result. - This is our link processor that: - Checks depth limits - Handles both internal and external links - Checks if URL is visited already - Checks if URL can be processed - validates URL, applies Filters with can_process_url - Scores URLs for priority - Updates depth tracking dictionary - Adds valid URLs to the queue - Updates maximum depth statistics - """ - next_depth = depths[source_url] + 1 - # If depth limit reached, exit without processing links - if next_depth > self.max_depth: - return - links_to_process = result.links["internal"] - if self.process_external_links: - links_to_process += result.links["external"] - for link in links_to_process: - url = link["href"] - if url in visited: - continue - if not await self.can_process_url(url, next_depth): - self.stats.urls_skipped += 1 - continue - score = self.url_scorer.score(url) if self.url_scorer else 0 - await queue.put((score, next_depth, url)) - depths[url] = next_depth - self.stats.total_depth_reached = max( - self.stats.total_depth_reached, next_depth - ) - - async def ascrape( - self, - start_url: str, - crawler_config: Optional[CrawlerRunConfig] = None, - browser_config: Optional[BrowserConfig] = None, - ) -> AsyncGenerator[CrawlResult, None]: - """Implement BFS crawling strategy""" - - # Initialize crawl state - """ - queue: A priority queue where items are tuples of (score, depth, url) - Score: Determines crawling priority (lower = higher priority) - Depth: Current distance from start_url - URL: The actual URL to crawl - visited: Keeps track of URLs we've already seen to avoid cycles - depths: Maps URLs to their depths from the start URL - active_crawls: Tracks currently running crawl tasks - """ - queue = asyncio.PriorityQueue() - await queue.put((0, 0, start_url)) - visited: Set[str] = set() - depths = {start_url: 0} - active_crawls = {} # Track URLs currently being processed with depth and score - active_crawls_lock = asyncio.Lock() # Create the lock within the same event loop - - # Update crawler_config to stream back results to scraper - crawler_config = crawler_config.clone(stream=True) if crawler_config else CrawlerRunConfig(stream=True) - - async with AsyncWebCrawler( - config=browser_config, - verbose=True, - ) as crawler: - try: - while ( - not queue.empty() or active_crawls - ) and not self._cancel_event.is_set(): - """ - This sets up our main control loop which: - - Continues while there are URLs to process (not queue.empty()) - - Or while there are active crawls still running (arun_many) - - Can be interrupted via cancellation (not self._cancel_event.is_set()) - """ - # Collect batch of URLs into active_crawls to process - async with active_crawls_lock: - while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty(): - score, depth, url = await queue.get() - active_crawls[url] = {"depth": depth, "score": score} - self.stats.current_depth = depth - - if not active_crawls: - # If no active crawls exist, wait a bit and continue - await asyncio.sleep(0.1) - continue - # Process batch - try: - async for result in await crawler.arun_many( - urls=list(active_crawls.keys()), - config=crawler_config.clone(stream=True), - ): - source_url = result.url - depth = active_crawls[source_url]["depth"] - score=active_crawls[source_url]["score"] - async with active_crawls_lock: - active_crawls.pop(source_url, None) - - if result.success: - await self._process_links( - result, source_url, queue, visited, depths - ) - yield ScraperPageResult( - result = result, - depth=depth, - score=score, - ) - else: - self.logger.warning( - f"Failed to crawl {result.url}: {result.error_message}" - ) - except Exception as e: - self.logger.error(f"Batch processing error: {e}") - # Continue processing other batches - continue - - except Exception as e: - self.logger.error(f"Error in crawl process: {e}") - raise - - finally: - self.stats.end_time = datetime.now() - await crawler.close() - - async def shutdown(self): - """Clean up resources and stop crawling""" - self._cancel_event.set() diff --git a/crawl4ai/scraper/models.py b/crawl4ai/scraper/models.py deleted file mode 100644 index 7b5137a4..00000000 --- a/crawl4ai/scraper/models.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel -from typing import List, Dict -from ..models import CrawlResult - -class ScraperPageResult(BaseModel): - result: CrawlResult - depth: int - score: float -class ScraperResult(BaseModel): - url: str - crawled_urls: List[str] - extracted_data: Dict[str, ScraperPageResult] diff --git a/crawl4ai/scraper/scraper_strategy.py b/crawl4ai/scraper/scraper_strategy.py deleted file mode 100644 index 74d82a3b..00000000 --- a/crawl4ai/scraper/scraper_strategy.py +++ /dev/null @@ -1,40 +0,0 @@ -from abc import ABC, abstractmethod -from .models import ScraperResult, ScraperPageResult -from ..async_configs import BrowserConfig, CrawlerRunConfig -from typing import Union, AsyncGenerator -class ScraperStrategy(ABC): - @abstractmethod - async def ascrape( - self, - url: str, - crawler_config: CrawlerRunConfig, - browser_config: BrowserConfig, - stream: bool = False, - ) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]: - """Scrape the given URL using the specified crawler. - - Args: - url (str): The starting URL for the scrape. - crawler_config (CrawlerRunConfig): Configuration for the crawler run. - browser_config (BrowserConfig): Configuration for the browser. - stream (bool): If True, yields individual crawl results as they are ready; - if False, accumulates results and returns a final ScraperResult. - - Yields: - ScraperPageResult: Individual page results if stream is True. - - Returns: - ScraperResult: A summary of the scrape results containing the final extracted data - and the list of crawled URLs if stream is False. - """ - pass - - @abstractmethod - async def can_process_url(self, url: str, depth: int) -> bool: - """Check if URL can be processed based on strategy rules""" - pass - - @abstractmethod - async def shutdown(self): - """Clean up resources used by the strategy""" - pass diff --git a/crawl4ai/traversal/__init__.py b/crawl4ai/traversal/__init__.py new file mode 100644 index 00000000..cf2fc0d4 --- /dev/null +++ b/crawl4ai/traversal/__init__.py @@ -0,0 +1,29 @@ +from .bfs_traversal_strategy import BFSTraversalStrategy +from .filters import ( + URLFilter, + FilterChain, + URLPatternFilter, + ContentTypeFilter, + DomainFilter, +) +from .scorers import ( + KeywordRelevanceScorer, + PathDepthScorer, + FreshnessScorer, + CompositeScorer, +) +from .traversal_strategy import TraversalStrategy + +__all__ = [ + "BFSTraversalStrategy", + "FilterChain", + "URLFilter", + "URLPatternFilter", + "ContentTypeFilter", + "DomainFilter", + "KeywordRelevanceScorer", + "PathDepthScorer", + "FreshnessScorer", + "CompositeScorer", + "TraversalStrategy", +] diff --git a/crawl4ai/traversal/bfs_traversal_strategy.py b/crawl4ai/traversal/bfs_traversal_strategy.py new file mode 100644 index 00000000..eee10c7f --- /dev/null +++ b/crawl4ai/traversal/bfs_traversal_strategy.py @@ -0,0 +1,197 @@ +from typing import AsyncGenerator, Optional, Dict, Set, List +from datetime import datetime +import asyncio +import logging +from urllib.parse import urlparse +from ..async_configs import CrawlerRunConfig +from ..models import CrawlResult, TraversalStats +from .filters import FilterChain +from .scorers import URLScorer +from .traversal_strategy import TraversalStrategy +from ..config import SCRAPER_BATCH_SIZE + + +class BFSTraversalStrategy(TraversalStrategy): + """Best-First Search traversal strategy with filtering and scoring.""" + + def __init__( + self, + max_depth: int, + filter_chain: FilterChain, + url_scorer: URLScorer, + process_external_links: bool = False, + logger: Optional[logging.Logger] = None, + ): + self.max_depth = max_depth + self.filter_chain = filter_chain + self.url_scorer = url_scorer + self.logger = logger or logging.getLogger(__name__) + + # Crawl control + self.stats = TraversalStats(start_time=datetime.now()) + self._cancel_event = asyncio.Event() + self.process_external_links = process_external_links + + async def can_process_url(self, url: str, depth: int) -> bool: + """Check if URL can be processed based on filters + This is our gatekeeper method that determines if a URL should be processed. It: + - Validates URL format using a robust built-in method + - Applies custom filters from the filter chain + - Updates statistics for blocked URLs + - Returns False early if any check fails + """ + try: + result = urlparse(url) + if not all([result.scheme, result.netloc]): + raise ValueError("Invalid URL") + if result.scheme not in ("http", "https"): + raise ValueError("URL must be HTTP or HTTPS") + if not result.netloc or "." not in result.netloc: + raise ValueError("Invalid domain") + except Exception as e: + self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}") + return False + + # Apply the filter chain if it's not start page + if depth != 0 and not self.filter_chain.apply(url): + return False + + return True + + async def _process_links( + self, + result: CrawlResult, + source_url: str, + queue: asyncio.PriorityQueue, + visited: Set[str], + depths: Dict[str, int], + ) -> List[str]: + """Process extracted links from crawl result. + This is our link processor that: + Checks depth limits + Handles both internal and external links + Checks if URL is visited already + Checks if URL can be processed - validates URL, applies Filters with can_process_url + Scores URLs for priority + Updates depth tracking dictionary + Adds valid URLs to the queue + Updates maximum depth statistics + """ + next_depth = depths[source_url] + 1 + # If depth limit reached, exit without processing links + if next_depth > self.max_depth: + return + links_to_process = result.links["internal"] + if self.process_external_links: + links_to_process += result.links["external"] + child_urls = [] + for link in links_to_process: + url = link["href"] + if url in visited: + continue + if not await self.can_process_url(url, next_depth): + self.stats.urls_skipped += 1 + continue + score = self.url_scorer.score(url) if self.url_scorer else 0 + child_urls.append(url) + await queue.put((score, next_depth, url, source_url)) + depths[url] = next_depth + self.stats.total_depth_reached = max( + self.stats.total_depth_reached, next_depth + ) + return child_urls + + async def deep_crawl( + self, + start_url: str, + crawler: "AsyncWebCrawler", + crawler_run_config: Optional[CrawlerRunConfig] = None, + ) -> AsyncGenerator[CrawlResult, None]: + """Implement BFS traversal strategy""" + + # Initialize traversal state + """ + queue: A priority queue where items are tuples of (score, depth, url) + Score: Determines traversal priority (lower = higher priority) + Depth: Current distance from start_url + URL: The actual URL to crawl + visited: Keeps track of URLs we've already seen to avoid cycles + depths: Maps URLs to their depths from the start URL + active_crawls: Tracks currently running crawl tasks + """ + queue = asyncio.PriorityQueue() + await queue.put((0, 0, start_url, None)) + visited: Set[str] = set() + depths = {start_url: 0} + active_crawls = {} # Track URLs currently being processed with depth and score + active_crawls_lock = ( + asyncio.Lock() + ) # Create the lock within the same event loop + try: + while ( + not queue.empty() or active_crawls + ) and not self._cancel_event.is_set(): + """ + This sets up our main control loop which: + - Continues while there are URLs to process (not queue.empty()) + - Or while there are active crawls still running (arun_many) + - Can be interrupted via cancellation (not self._cancel_event.is_set()) + """ + # Collect batch of URLs into active_crawls to process + async with active_crawls_lock: + while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty(): + score, depth, url, parent_url = await queue.get() + active_crawls[url] = { + "depth": depth, + "score": score, + "parent_url": parent_url, + } + self.stats.current_depth = depth + + if not active_crawls: + # If no active crawls exist, wait a bit and continue + await asyncio.sleep(0.1) + continue + # Process batch + try: + stream_config = ( + crawler_run_config.clone(stream=True) + if crawler_run_config + else CrawlerRunConfig(stream=True) + ) + async for result in await crawler.arun_many( + urls=list(active_crawls.keys()), + config=stream_config, + ): + async with active_crawls_lock: + crawl_info = active_crawls.pop(result.url, None) + + if crawl_info and result.success: + child_urls = await self._process_links( + result, result.url, queue, visited, depths + ) + result.depth = crawl_info["depth"] + result.score = crawl_info["score"] + result.parent_url = crawl_info["parent_url"] + result.child_urls = child_urls + yield result + else: + self.logger.warning( + f"Failed to crawl {result.url}: {result.error_message}" + ) + except Exception as e: + self.logger.error(f"Batch processing error: {e}") + # Continue processing other batches + continue + + except Exception as e: + self.logger.error(f"Error in crawl process: {e}") + raise + + finally: + self.stats.end_time = datetime.now() + await crawler.close() + + async def shutdown(self): + """Clean up resources and stop crawling""" + self._cancel_event.set() diff --git a/crawl4ai/scraper/filters.py b/crawl4ai/traversal/filters.py similarity index 100% rename from crawl4ai/scraper/filters.py rename to crawl4ai/traversal/filters.py diff --git a/crawl4ai/scraper/scorers.py b/crawl4ai/traversal/scorers.py similarity index 99% rename from crawl4ai/scraper/scorers.py rename to crawl4ai/traversal/scorers.py index 4ef90dcd..1c50b51a 100644 --- a/crawl4ai/scraper/scorers.py +++ b/crawl4ai/traversal/scorers.py @@ -1,9 +1,8 @@ from abc import ABC, abstractmethod -from typing import List, Dict, Optional, Union +from typing import List, Dict, Optional from dataclasses import dataclass from urllib.parse import urlparse, unquote import re -from collections import defaultdict import math import logging from functools import lru_cache diff --git a/crawl4ai/traversal/traversal_strategy.py b/crawl4ai/traversal/traversal_strategy.py new file mode 100644 index 00000000..dc067317 --- /dev/null +++ b/crawl4ai/traversal/traversal_strategy.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import AsyncGenerator + +from ..async_configs import CrawlerRunConfig +from ..models import CrawlResult + + +class TraversalStrategy(ABC): + @abstractmethod + async def deep_crawl( + self, + url: str, + crawler: "AsyncWebCrawler", + crawler_run_config: CrawlerRunConfig = None, + ) -> AsyncGenerator[CrawlResult, None]: + """Traverse the given URL using the specified crawler. + + Args: + url (str): The starting URL for the traversal. + crawler (AsyncWebCrawler): The crawler instance to use for traversal. + crawler_run_config (CrawlerRunConfig, optional): The configuration for the crawler. + + Returns: + AsyncGenerator[CrawlResult, None]: An async generator yielding crawl results. + """ + pass + + @abstractmethod + async def shutdown(self): + """Clean up resources used by the strategy""" + pass diff --git a/docs/scraper/scraper_quickstart.py b/docs/scraper/deep_crawl_quickstart.py similarity index 79% rename from docs/scraper/scraper_quickstart.py rename to docs/scraper/deep_crawl_quickstart.py index e9394e3c..1e97714e 100644 --- a/docs/scraper/scraper_quickstart.py +++ b/docs/scraper/deep_crawl_quickstart.py @@ -1,20 +1,18 @@ # basic_scraper_example.py -from crawl4ai.async_configs import CrawlerRunConfig +from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy -from crawl4ai.scraper import ( - AsyncWebScraper, - BFSScraperStrategy, +from crawl4ai.traversal import ( + BFSTraversalStrategy, FilterChain, URLPatternFilter, ContentTypeFilter, ) -from crawl4ai.async_webcrawler import BrowserConfig +from crawl4ai.async_webcrawler import AsyncWebCrawler import re import time browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) - async def basic_scraper_example(): """ Basic example: Scrape a blog site for articles @@ -33,7 +31,7 @@ async def basic_scraper_example(): ) # Initialize the strategy with basic configuration - bfs_strategy = BFSScraperStrategy( + bfs_strategy = BFSTraversalStrategy( max_depth=2, # Only go 2 levels deep filter_chain=filter_chain, url_scorer=None, # Use default scoring @@ -41,17 +39,18 @@ async def basic_scraper_example(): ) # Create the crawler and scraper - async with AsyncWebScraper( - strategy=bfs_strategy, - ) as scraper: + async with AsyncWebCrawler( + config=browser_config, + ) as crawler: # Start scraping try: - result = await scraper.ascrape("https://crawl4ai.com/mkdocs") - + results = await crawler.adeep_crawl( + "https://crawl4ai.com/mkdocs", strategy=bfs_strategy + ) # Process results - print(f"Crawled {len(result.crawled_urls)} pages:") - for url, page_result in result.extracted_data.items(): - print(f"- {url}: {len(page_result.result.html)} bytes") + print(f"Crawled {len(results)} pages:") + for result in results: + print(f"- {result.url}: {len(result.html)} bytes") except Exception as e: print(f"Error during scraping: {e}") @@ -60,9 +59,8 @@ async def basic_scraper_example(): # advanced_scraper_example.py import logging -from crawl4ai.scraper import ( - AsyncWebScraper, - BFSScraperStrategy, +from crawl4ai.traversal import ( + BFSTraversalStrategy, FilterChain, URLPatternFilter, ContentTypeFilter, @@ -123,34 +121,36 @@ async def advanced_scraper_example(): ) # Initialize strategy with advanced configuration - bfs_strategy = BFSScraperStrategy( + bfs_strategy = BFSTraversalStrategy( max_depth=2, filter_chain=filter_chain, url_scorer=scorer ) # Create crawler and scraper - async with AsyncWebScraper( - strategy=bfs_strategy, - crawler_config=CrawlerRunConfig(bypass_cache=True, scraping_strategy=LXMLWebScrapingStrategy(),), - browser_config=browser_config, - ) as scraper: + async with AsyncWebCrawler( + config=browser_config, + ) as crawler: # Track statistics stats = {"processed": 0, "errors": 0, "total_size": 0} try: # Use streaming mode - result_generator = await scraper.ascrape( - "https://techcrunch.com", stream=True + result_generator = await crawler.adeep_crawl( + "https://techcrunch.com", + strategy=bfs_strategy, + crawler_run_config=CrawlerRunConfig( + scraping_strategy=LXMLWebScrapingStrategy() + ), + stream=True, ) - async for page_result in result_generator: - result = page_result.result - score = page_result.score - depth = page_result.depth + async for result in result_generator: stats["processed"] += 1 if result.success: stats["total_size"] += len(result.html) - logger.info(f"Processed at depth: {depth} with score: {score:.3f} : \n {result.url}") + logger.info( + f"Processed at depth: {result.depth} with score: {result.score:.3f} : \n {result.url}" + ) else: stats["errors"] += 1 logger.error( diff --git a/docs/scraper/scraper_quickstart_review.py b/docs/scraper/scraper_quickstart_review.py deleted file mode 100644 index 6f3c253f..00000000 --- a/docs/scraper/scraper_quickstart_review.py +++ /dev/null @@ -1,187 +0,0 @@ -# basic_scraper_example.py -from crawl4ai.scraper import ( - AsyncWebScraper, - BFSScraperStrategy, - FilterChain, - URLPatternFilter, - ContentTypeFilter, -) -from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig -import re - -browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600) - - -async def basic_scraper_example(): - """ - Basic example: Scrape a blog site for articles - - Crawls only HTML pages - - Stays within the blog section - - Collects all results at once - """ - # Create a simple filter chain - filter_chain = FilterChain( - [ - # Only crawl pages within the blog section - URLPatternFilter("*/tutorial/*"), - # Only process HTML pages - ContentTypeFilter(["text/html"]), - ] - ) - - # Initialize the strategy with basic configuration - strategy = BFSScraperStrategy( - max_depth=2, # Only go 2 levels deep - filter_chain=filter_chain, - url_scorer=None, # Use default scoring - process_external_links=True, - ) - - # Create the crawler and scraper - async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler: - scraper = AsyncWebScraper(crawler, strategy) - # Start scraping - try: - result = await scraper.ascrape("https://crawl4ai.com/mkdocs") - - # Process results - print(f"Crawled {len(result.crawled_urls)} pages:") - for url, data in result.extracted_data.items(): - print(f"- {url}: {len(data.html)} bytes") - - except Exception as e: - print(f"Error during scraping: {e}") - - -# advanced_scraper_example.py -import logging -from crawl4ai.scraper import ( - AsyncWebScraper, - BFSScraperStrategy, - FilterChain, - URLPatternFilter, - ContentTypeFilter, - DomainFilter, - KeywordRelevanceScorer, - PathDepthScorer, - FreshnessScorer, - CompositeScorer, -) -from crawl4ai.async_webcrawler import AsyncWebCrawler - - -async def advanced_scraper_example(): - """ - Advanced example: Intelligent news site scraping - - Uses all filter types - - Implements sophisticated scoring - - Streams results - - Includes monitoring and logging - """ - # Set up logging - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger("advanced_scraper") - - # Create sophisticated filter chain - filter_chain = FilterChain( - [ - # Domain control - DomainFilter( - allowed_domains=["techcrunch.com"], - blocked_domains=["login.techcrunch.com", "legal.yahoo.com"], - ), - # URL patterns - URLPatternFilter( - [ - "*/article/*", - "*/news/*", - "*/blog/*", - re.compile(r"\d{4}/\d{2}/.*"), # Date-based URLs - ] - ), - # Content types - ContentTypeFilter(["text/html", "application/xhtml+xml"]), - ] - ) - - # Create composite scorer - scorer = CompositeScorer( - [ - # Prioritize by keywords - KeywordRelevanceScorer( - keywords=["news", "breaking", "update", "latest"], weight=1.0 - ), - # Prefer optimal URL structure - PathDepthScorer(optimal_depth=3, weight=0.7), - # Prioritize fresh content - FreshnessScorer(weight=0.9), - ] - ) - - # Initialize strategy with advanced configuration - strategy = BFSScraperStrategy( - max_depth=2, filter_chain=filter_chain, url_scorer=scorer - ) - - # Create crawler and scraper - async with AsyncWebCrawler(verbose=True, config=browser_config) as crawler: - scraper = AsyncWebScraper(crawler, strategy) - - # Track statistics - stats = {"processed": 0, "errors": 0, "total_size": 0} - - try: - # Use streaming mode - result_generator = await scraper.ascrape( - "https://techcrunch.com", stream=True - ) - async for result in result_generator: - stats["processed"] += 1 - - if result.success: - stats["total_size"] += len(result.html) - logger.info(f"Processed: {result.url}") - else: - stats["errors"] += 1 - logger.error( - f"Failed to process {result.url}: {result.error_message}" - ) - - # Log progress regularly - if stats["processed"] % 10 == 0: - logger.info(f"Progress: {stats['processed']} URLs processed") - - except Exception as e: - logger.error(f"Scraping error: {e}") - - finally: - # Print final statistics - logger.info("Scraping completed:") - logger.info(f"- URLs processed: {stats['processed']}") - logger.info(f"- Errors: {stats['errors']}") - logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB") - - # Print filter statistics - for filter_ in filter_chain.filters: - logger.info(f"{filter_.name} stats:") - logger.info(f"- Passed: {filter_.stats.passed_urls}") - logger.info(f"- Rejected: {filter_.stats.rejected_urls}") - - # Print scorer statistics - logger.info("Scoring statistics:") - logger.info(f"- Average score: {scorer.stats.average_score:.2f}") - logger.info( - f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}" - ) - - -if __name__ == "__main__": - import asyncio - - # Run basic example - print("Running basic scraper example...") - asyncio.run(basic_scraper_example()) - - # Run advanced example - print("\nRunning advanced scraper example...") - asyncio.run(advanced_scraper_example()) diff --git a/models.py b/models.py new file mode 100644 index 00000000..e69de29b