From bc7559586f9e1e17af1d40a3d8a243c3c6926fcc Mon Sep 17 00:00:00 2001 From: UncleCode Date: Tue, 4 Feb 2025 01:24:49 +0800 Subject: [PATCH] feat(crawler): add deep crawling capabilities with BFS strategy Implements deep crawling functionality with a new BreadthFirstSearch strategy: - Add DeepCrawlStrategy base class and BFS implementation - Integrate deep crawling with AsyncWebCrawler via decorator pattern - Update CrawlerRunConfig to support deep crawling parameters - Add pagination support for Google Search crawler BREAKING CHANGE: AsyncWebCrawler.arun and arun_many return types now include deep crawl results --- crawl4ai/__init__.py | 4 +- crawl4ai/async_configs.py | 17 +- crawl4ai/async_deep_crawl.py | 181 +++++++++++++++++++++ crawl4ai/async_webcrawler.py | 11 +- crawl4ai/crawlers/google_search/crawler.py | 14 +- 5 files changed, 216 insertions(+), 11 deletions(-) create mode 100644 crawl4ai/async_deep_crawl.py diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index 2de100c9..443d6ec9 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -16,7 +16,7 @@ from .extraction_strategy import ( JsonXPathExtractionStrategy ) - +from .async_deep_crawl import DeepCrawlStrategy, BreadthFirstSearchStrategy from .chunking_strategy import ChunkingStrategy, RegexChunking from .markdown_generation_strategy import DefaultMarkdownGenerator from .content_filter_strategy import PruningContentFilter, BM25ContentFilter, LLMContentFilter, RelevantContentFilter @@ -33,6 +33,8 @@ from .docker_client import Crawl4aiDockerClient from .hub import CrawlerHub __all__ = [ + "DeepCrawlStrategy", + "BreadthFirstSearchStrategy", "AsyncWebCrawler", "CrawlResult", "CrawlerHub", diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 3e9f582f..d8ee675a 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1,4 +1,3 @@ -from regex import B from .config import ( MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, @@ -14,11 +13,12 @@ from .chunking_strategy import ChunkingStrategy, RegexChunking from .markdown_generation_strategy import MarkdownGenerationStrategy from .content_filter_strategy import RelevantContentFilter # , BM25ContentFilter, LLMContentFilter, PruningContentFilter from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy +from .async_deep_crawl import DeepCrawlStrategy from typing import Union, List from .cache_context import CacheMode import inspect -from typing import Any, Dict +from typing import Any, Dict, Optional from enum import Enum def to_serializable_dict(obj: Any) -> Dict: @@ -373,6 +373,9 @@ class CrawlerRunConfig(): By using this class, you have a single place to understand and adjust the crawling options. Attributes: + # Deep Crawl Parameters + deep_crawl_strategy (DeepCrawlStrategy or None): Strategy to use for deep crawling. + # Content Processing Parameters word_count_threshold (int): Minimum word count threshold before processing content. Default: MIN_WORD_THRESHOLD (typically 200). @@ -594,6 +597,9 @@ class CrawlerRunConfig(): user_agent: str = None, user_agent_mode: str = None, user_agent_generator_config: dict = {}, + # Deep Crawl Parameters + deep_crawl_strategy: Optional[DeepCrawlStrategy] = None, + ): self.url = url @@ -701,6 +707,10 @@ class CrawlerRunConfig(): if self.chunking_strategy is None: self.chunking_strategy = RegexChunking() + + # Deep Crawl Parameters + self.deep_crawl_strategy = deep_crawl_strategy + @staticmethod def from_kwargs(kwargs: dict) -> "CrawlerRunConfig": return CrawlerRunConfig( @@ -785,6 +795,8 @@ class CrawlerRunConfig(): user_agent=kwargs.get("user_agent"), user_agent_mode=kwargs.get("user_agent_mode"), user_agent_generator_config=kwargs.get("user_agent_generator_config", {}), + # Deep Crawl Parameters + deep_crawl_strategy=kwargs.get("deep_crawl_strategy"), ) # Create a funciton returns dict of the object @@ -862,6 +874,7 @@ class CrawlerRunConfig(): "user_agent": self.user_agent, "user_agent_mode": self.user_agent_mode, "user_agent_generator_config": self.user_agent_generator_config, + "deep_crawl_strategy": self.deep_crawl_strategy, } def clone(self, **kwargs): diff --git a/crawl4ai/async_deep_crawl.py b/crawl4ai/async_deep_crawl.py new file mode 100644 index 00000000..86b716be --- /dev/null +++ b/crawl4ai/async_deep_crawl.py @@ -0,0 +1,181 @@ +# crawl4ai/async_deep_crawl.py + +"""Remember: +# Update CrawlerRunConfig in async_configs.py (additional field) +class CrawlerRunConfig(BaseModel): + deep_crawl_strategy: Optional[DeepCrawlStrategy] = Field( + default=None, + description="Strategy for deep crawling websites" + ) + # ... other existing fields remain unchanged + +# In AsyncWebCrawler class (partial implementation) +class AsyncWebCrawler: + def __init__(self, *args, **kwargs): + # Existing initialization + self._deep_handler = DeepCrawlHandler(self) + self.arun = self._deep_handler(self.arun) # Decorate original method + + async def arun(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs): + # ... existing implementation +""" + +import asyncio +from collections import deque +from functools import wraps +from typing import AsyncGenerator, List, Optional, Set, Union, TypeVar +from urllib.parse import urlparse +from pydantic import BaseModel, Field +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .async_webcrawler import AsyncWebCrawler, CrawlResult + from .async_configs import CrawlerRunConfig + from .async_dispatcher import MemoryAdaptiveDispatcher + + CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult) + RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] + + +class DeepCrawlStrategy(BaseModel): + """Base class for deep crawling strategies.""" + max_depth: int = Field(default=3, description="Maximum crawl depth from initial URL") + include_external: bool = Field(default=False, description="Follow links to external domains") + + class Config: + arbitrary_types_allowed = True + + async def run( + self, + crawler: "AsyncWebCrawler", + start_url: str, + config: "CrawlerRunConfig" + ) -> "RunManyReturn": + """Execute the crawling strategy.""" + raise NotImplementedError + +class BreadthFirstSearchStrategy(DeepCrawlStrategy): + """Breadth-first search implementation for deep crawling.""" + + async def run( + self, + crawler: "AsyncWebCrawler", + start_url: str, + config: "CrawlerRunConfig" + ) -> "RunManyReturn": + """BFS implementation using arun_many for batch processing.""" + async def stream_results(): + """Inner async generator for streaming results.""" + nonlocal crawler, start_url, config + base_domain = urlparse(start_url).netloc + queue = deque([(start_url, 0)]) + visited: Set[str] = set() + + # Create config copy without deep strategy for child requests + child_config = config.copy(update={ + 'deep_crawl_strategy': None, + 'stream': False # Process levels sequentially + }) + + while queue: + current_url, depth = queue.popleft() + + if depth > self.max_depth or current_url in visited: + continue + + visited.add(current_url) + + # Process current level using arun_many + batch_results = await crawler.arun_many( + urls=[current_url], + config=child_config, + dispatcher=MemoryAdaptiveDispatcher() + ) + + for result in batch_results: + yield result + + # Queue next level if within depth limit + if depth < self.max_depth: + new_urls = self._extract_links(result, base_domain) + for url in new_urls: + if url not in visited: + queue.append((url, depth + 1)) + + # Handle streaming vs non-streaming + if config.stream: + return stream_results() + else: + results: List[CrawlResultT] = [] + async for result in stream_results(): + results.append(result) + return results + + def _extract_links(self, result: "CrawlResult", base_domain: str) -> List[str]: + """Extract links from crawl result with domain filtering.""" + internal = result.links.get('internal', []) + external = result.links.get('external', []) if self.include_external else [] + + return [ + url for url in internal + external + if self._same_domain(url, base_domain) or self.include_external + ] + + def _same_domain(self, url: str, base_domain: str) -> bool: + """Check if URL belongs to the base domain.""" + return urlparse(url).netloc == base_domain + +class DeepCrawlHandler: + """Decorator that adds deep crawling capabilities to arun.""" + + def __init__(self, crawler: "AsyncWebCrawler"): + self.crawler = crawler + + def __call__(self, original_arun): + @wraps(original_arun) + async def wrapped_arun(url: str, config: Optional["CrawlerRunConfig"] = None, **kwargs): + # First run the original arun + initial_result = await original_arun(url, config=config, **kwargs) + + if config and config.deep_crawl_strategy: + # Execute deep crawl strategy if configured + return await config.deep_crawl_strategy.run( + crawler=self.crawler, + start_url=url, + config=config + ) + + return initial_result + + return wrapped_arun + +async def main(): + """Example deep crawl of documentation site.""" + config = CrawlerRunConfig( + deep_crawl_strategy=BreadthFirstSearchStrategy( + max_depth=2, + include_external=False + ), + stream=True, + verbose=True + ) + + async with AsyncWebCrawler() as crawler: + print("Starting deep crawl in streaming mode:") + async for result in await crawler.arun( + url="https://docs.crawl4ai.com", + config=config + ): + print(f"→ {result.url} (Depth: {result.metadata.get('depth', 0)})") + + print("\nStarting deep crawl in batch mode:") + config.stream = False + results = await crawler.arun( + url="https://docs.crawl4ai.com", + config=config + ) + print(f"Crawled {len(results)} pages") + print(f"Example page: {results[0].url}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index e103742c..ce233e67 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -29,6 +29,7 @@ from .markdown_generation_strategy import ( DefaultMarkdownGenerator, MarkdownGenerationStrategy, ) +from .async_deep_crawl import DeepCrawlHandler from .async_logger import AsyncLogger from .async_configs import BrowserConfig, CrawlerRunConfig from .async_dispatcher import * # noqa: F403 @@ -47,7 +48,7 @@ from .utils import ( from typing import Union, AsyncGenerator, TypeVar CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult) -RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] +RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] DeepCrawlSingleReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]] DeepCrawlManyReturn = Union[ @@ -215,6 +216,10 @@ class AsyncWebCrawler: self.ready = False + # Decorate arun method with deep crawling capabilities + self._deep_handler = DeepCrawlHandler(self) + self.arun = self._deep_handler(self.arun) + async def start(self): """ Start the crawler explicitly without using context manager. @@ -288,7 +293,7 @@ class AsyncWebCrawler: user_agent: str = None, verbose=True, **kwargs, - ) -> Union[CrawlResult, DeepCrawlSingleReturn]: + ) -> RunManyReturn: """ Runs the crawler for a single source: URL (web, local file, or raw HTML). @@ -715,7 +720,7 @@ class AsyncWebCrawler: user_agent: str = None, verbose=True, **kwargs - ) -> Union[RunManyReturn, DeepCrawlManyReturn]: + ) -> RunManyReturn: """ Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy. diff --git a/crawl4ai/crawlers/google_search/crawler.py b/crawl4ai/crawlers/google_search/crawler.py index b1f7d725..dcf60d79 100644 --- a/crawl4ai/crawlers/google_search/crawler.py +++ b/crawl4ai/crawlers/google_search/crawler.py @@ -5,8 +5,7 @@ from crawl4ai.extraction_strategy import JsonCssExtractionStrategy from pathlib import Path import json import os -import asyncio -from typing import Dict, Any +from typing import Dict class GoogleSearchCrawler(BaseCrawler): @@ -25,6 +24,11 @@ class GoogleSearchCrawler(BaseCrawler): async def run(self, url="", query: str = "", search_type: str = "text", schema_cache_path = None, **kwargs) -> str: """Crawl Google Search results for a query""" url = f"https://www.google.com/search?q={query}&gl=sg&hl=en" if search_type == "text" else f"https://www.google.com/search?q={query}&gl=sg&hl=en&tbs=qdr:d&udm=2" + if kwargs.get("page_start", 1) > 1: + url = f"{url}&start={kwargs['page_start'] * 10}" + if kwargs.get("page_length", 1) > 1: + url = f"{url}&num={kwargs['page_length']}" + browser_config = BrowserConfig(headless=True, verbose=True) async with AsyncWebCrawler(config=browser_config) as crawler: config = CrawlerRunConfig( @@ -70,7 +74,7 @@ class GoogleSearchCrawler(BaseCrawler): organic_schema = json.load(f) else: organic_schema = JsonCssExtractionStrategy.generate_schema( - html=_html, + html=cleaned_html, target_json_example="""{ "title": "...", "link": "...", @@ -89,7 +93,7 @@ class GoogleSearchCrawler(BaseCrawler): top_stories_schema = json.load(f) else: top_stories_schema = JsonCssExtractionStrategy.generate_schema( - html=_html, + html=cleaned_html, target_json_example="""{ "title": "...", "link": "...", @@ -109,7 +113,7 @@ class GoogleSearchCrawler(BaseCrawler): suggested_query_schema = json.load(f) else: suggested_query_schema = JsonCssExtractionStrategy.generate_schema( - html=_html, + html=cleaned_html, target_json_example="""{ "query": "A for Apple", }""",