feat(crawler): add deep crawling capabilities with BFS strategy

Implements deep crawling functionality with a new BreadthFirstSearch strategy:
- Add DeepCrawlStrategy base class and BFS implementation
- Integrate deep crawling with AsyncWebCrawler via decorator pattern
- Update CrawlerRunConfig to support deep crawling parameters
- Add pagination support for Google Search crawler

BREAKING CHANGE: AsyncWebCrawler.arun and arun_many return types now include deep crawl results
This commit is contained in:
UncleCode
2025-02-04 01:24:49 +08:00
parent 04bc643cec
commit bc7559586f
5 changed files with 216 additions and 11 deletions

View File

@@ -16,7 +16,7 @@ from .extraction_strategy import (
JsonXPathExtractionStrategy
)
from .async_deep_crawl import DeepCrawlStrategy, BreadthFirstSearchStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
from .content_filter_strategy import PruningContentFilter, BM25ContentFilter, LLMContentFilter, RelevantContentFilter
@@ -33,6 +33,8 @@ from .docker_client import Crawl4aiDockerClient
from .hub import CrawlerHub
__all__ = [
"DeepCrawlStrategy",
"BreadthFirstSearchStrategy",
"AsyncWebCrawler",
"CrawlResult",
"CrawlerHub",

View File

@@ -1,4 +1,3 @@
from regex import B
from .config import (
MIN_WORD_THRESHOLD,
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
@@ -14,11 +13,12 @@ from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy
from .content_filter_strategy import RelevantContentFilter # , BM25ContentFilter, LLMContentFilter, PruningContentFilter
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy
from .async_deep_crawl import DeepCrawlStrategy
from typing import Union, List
from .cache_context import CacheMode
import inspect
from typing import Any, Dict
from typing import Any, Dict, Optional
from enum import Enum
def to_serializable_dict(obj: Any) -> Dict:
@@ -373,6 +373,9 @@ class CrawlerRunConfig():
By using this class, you have a single place to understand and adjust the crawling options.
Attributes:
# Deep Crawl Parameters
deep_crawl_strategy (DeepCrawlStrategy or None): Strategy to use for deep crawling.
# Content Processing Parameters
word_count_threshold (int): Minimum word count threshold before processing content.
Default: MIN_WORD_THRESHOLD (typically 200).
@@ -594,6 +597,9 @@ class CrawlerRunConfig():
user_agent: str = None,
user_agent_mode: str = None,
user_agent_generator_config: dict = {},
# Deep Crawl Parameters
deep_crawl_strategy: Optional[DeepCrawlStrategy] = None,
):
self.url = url
@@ -701,6 +707,10 @@ class CrawlerRunConfig():
if self.chunking_strategy is None:
self.chunking_strategy = RegexChunking()
# Deep Crawl Parameters
self.deep_crawl_strategy = deep_crawl_strategy
@staticmethod
def from_kwargs(kwargs: dict) -> "CrawlerRunConfig":
return CrawlerRunConfig(
@@ -785,6 +795,8 @@ class CrawlerRunConfig():
user_agent=kwargs.get("user_agent"),
user_agent_mode=kwargs.get("user_agent_mode"),
user_agent_generator_config=kwargs.get("user_agent_generator_config", {}),
# Deep Crawl Parameters
deep_crawl_strategy=kwargs.get("deep_crawl_strategy"),
)
# Create a funciton returns dict of the object
@@ -862,6 +874,7 @@ class CrawlerRunConfig():
"user_agent": self.user_agent,
"user_agent_mode": self.user_agent_mode,
"user_agent_generator_config": self.user_agent_generator_config,
"deep_crawl_strategy": self.deep_crawl_strategy,
}
def clone(self, **kwargs):

View File

@@ -0,0 +1,181 @@
# crawl4ai/async_deep_crawl.py
"""Remember:
# Update CrawlerRunConfig in async_configs.py (additional field)
class CrawlerRunConfig(BaseModel):
deep_crawl_strategy: Optional[DeepCrawlStrategy] = Field(
default=None,
description="Strategy for deep crawling websites"
)
# ... other existing fields remain unchanged
# In AsyncWebCrawler class (partial implementation)
class AsyncWebCrawler:
def __init__(self, *args, **kwargs):
# Existing initialization
self._deep_handler = DeepCrawlHandler(self)
self.arun = self._deep_handler(self.arun) # Decorate original method
async def arun(self, url: str, config: Optional[CrawlerRunConfig] = None, **kwargs):
# ... existing implementation
"""
import asyncio
from collections import deque
from functools import wraps
from typing import AsyncGenerator, List, Optional, Set, Union, TypeVar
from urllib.parse import urlparse
from pydantic import BaseModel, Field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .async_webcrawler import AsyncWebCrawler, CrawlResult
from .async_configs import CrawlerRunConfig
from .async_dispatcher import MemoryAdaptiveDispatcher
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
class DeepCrawlStrategy(BaseModel):
"""Base class for deep crawling strategies."""
max_depth: int = Field(default=3, description="Maximum crawl depth from initial URL")
include_external: bool = Field(default=False, description="Follow links to external domains")
class Config:
arbitrary_types_allowed = True
async def run(
self,
crawler: "AsyncWebCrawler",
start_url: str,
config: "CrawlerRunConfig"
) -> "RunManyReturn":
"""Execute the crawling strategy."""
raise NotImplementedError
class BreadthFirstSearchStrategy(DeepCrawlStrategy):
"""Breadth-first search implementation for deep crawling."""
async def run(
self,
crawler: "AsyncWebCrawler",
start_url: str,
config: "CrawlerRunConfig"
) -> "RunManyReturn":
"""BFS implementation using arun_many for batch processing."""
async def stream_results():
"""Inner async generator for streaming results."""
nonlocal crawler, start_url, config
base_domain = urlparse(start_url).netloc
queue = deque([(start_url, 0)])
visited: Set[str] = set()
# Create config copy without deep strategy for child requests
child_config = config.copy(update={
'deep_crawl_strategy': None,
'stream': False # Process levels sequentially
})
while queue:
current_url, depth = queue.popleft()
if depth > self.max_depth or current_url in visited:
continue
visited.add(current_url)
# Process current level using arun_many
batch_results = await crawler.arun_many(
urls=[current_url],
config=child_config,
dispatcher=MemoryAdaptiveDispatcher()
)
for result in batch_results:
yield result
# Queue next level if within depth limit
if depth < self.max_depth:
new_urls = self._extract_links(result, base_domain)
for url in new_urls:
if url not in visited:
queue.append((url, depth + 1))
# Handle streaming vs non-streaming
if config.stream:
return stream_results()
else:
results: List[CrawlResultT] = []
async for result in stream_results():
results.append(result)
return results
def _extract_links(self, result: "CrawlResult", base_domain: str) -> List[str]:
"""Extract links from crawl result with domain filtering."""
internal = result.links.get('internal', [])
external = result.links.get('external', []) if self.include_external else []
return [
url for url in internal + external
if self._same_domain(url, base_domain) or self.include_external
]
def _same_domain(self, url: str, base_domain: str) -> bool:
"""Check if URL belongs to the base domain."""
return urlparse(url).netloc == base_domain
class DeepCrawlHandler:
"""Decorator that adds deep crawling capabilities to arun."""
def __init__(self, crawler: "AsyncWebCrawler"):
self.crawler = crawler
def __call__(self, original_arun):
@wraps(original_arun)
async def wrapped_arun(url: str, config: Optional["CrawlerRunConfig"] = None, **kwargs):
# First run the original arun
initial_result = await original_arun(url, config=config, **kwargs)
if config and config.deep_crawl_strategy:
# Execute deep crawl strategy if configured
return await config.deep_crawl_strategy.run(
crawler=self.crawler,
start_url=url,
config=config
)
return initial_result
return wrapped_arun
async def main():
"""Example deep crawl of documentation site."""
config = CrawlerRunConfig(
deep_crawl_strategy=BreadthFirstSearchStrategy(
max_depth=2,
include_external=False
),
stream=True,
verbose=True
)
async with AsyncWebCrawler() as crawler:
print("Starting deep crawl in streaming mode:")
async for result in await crawler.arun(
url="https://docs.crawl4ai.com",
config=config
):
print(f"{result.url} (Depth: {result.metadata.get('depth', 0)})")
print("\nStarting deep crawl in batch mode:")
config.stream = False
results = await crawler.arun(
url="https://docs.crawl4ai.com",
config=config
)
print(f"Crawled {len(results)} pages")
print(f"Example page: {results[0].url}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -29,6 +29,7 @@ from .markdown_generation_strategy import (
DefaultMarkdownGenerator,
MarkdownGenerationStrategy,
)
from .async_deep_crawl import DeepCrawlHandler
from .async_logger import AsyncLogger
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import * # noqa: F403
@@ -47,7 +48,7 @@ from .utils import (
from typing import Union, AsyncGenerator, TypeVar
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
DeepCrawlSingleReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
DeepCrawlManyReturn = Union[
@@ -215,6 +216,10 @@ class AsyncWebCrawler:
self.ready = False
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlHandler(self)
self.arun = self._deep_handler(self.arun)
async def start(self):
"""
Start the crawler explicitly without using context manager.
@@ -288,7 +293,7 @@ class AsyncWebCrawler:
user_agent: str = None,
verbose=True,
**kwargs,
) -> Union[CrawlResult, DeepCrawlSingleReturn]:
) -> RunManyReturn:
"""
Runs the crawler for a single source: URL (web, local file, or raw HTML).
@@ -715,7 +720,7 @@ class AsyncWebCrawler:
user_agent: str = None,
verbose=True,
**kwargs
) -> Union[RunManyReturn, DeepCrawlManyReturn]:
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.

View File

@@ -5,8 +5,7 @@ from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from pathlib import Path
import json
import os
import asyncio
from typing import Dict, Any
from typing import Dict
class GoogleSearchCrawler(BaseCrawler):
@@ -25,6 +24,11 @@ class GoogleSearchCrawler(BaseCrawler):
async def run(self, url="", query: str = "", search_type: str = "text", schema_cache_path = None, **kwargs) -> str:
"""Crawl Google Search results for a query"""
url = f"https://www.google.com/search?q={query}&gl=sg&hl=en" if search_type == "text" else f"https://www.google.com/search?q={query}&gl=sg&hl=en&tbs=qdr:d&udm=2"
if kwargs.get("page_start", 1) > 1:
url = f"{url}&start={kwargs['page_start'] * 10}"
if kwargs.get("page_length", 1) > 1:
url = f"{url}&num={kwargs['page_length']}"
browser_config = BrowserConfig(headless=True, verbose=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
@@ -70,7 +74,7 @@ class GoogleSearchCrawler(BaseCrawler):
organic_schema = json.load(f)
else:
organic_schema = JsonCssExtractionStrategy.generate_schema(
html=_html,
html=cleaned_html,
target_json_example="""{
"title": "...",
"link": "...",
@@ -89,7 +93,7 @@ class GoogleSearchCrawler(BaseCrawler):
top_stories_schema = json.load(f)
else:
top_stories_schema = JsonCssExtractionStrategy.generate_schema(
html=_html,
html=cleaned_html,
target_json_example="""{
"title": "...",
"link": "...",
@@ -109,7 +113,7 @@ class GoogleSearchCrawler(BaseCrawler):
suggested_query_schema = json.load(f)
else:
suggested_query_schema = JsonCssExtractionStrategy.generate_schema(
html=_html,
html=cleaned_html,
target_json_example="""{
"query": "A for Apple",
}""",