Merge pull request #1607 from unclecode/fix/dfs_deep_crawling

Fix/dfs deep crawling
This commit is contained in:
Nasrin
2025-11-13 16:43:47 +08:00
committed by GitHub
2 changed files with 156 additions and 7 deletions

View File

@@ -4,14 +4,26 @@ from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple
from ..models import CrawlResult from ..models import CrawlResult
from .bfs_strategy import BFSDeepCrawlStrategy # noqa from .bfs_strategy import BFSDeepCrawlStrategy # noqa
from ..types import AsyncWebCrawler, CrawlerRunConfig from ..types import AsyncWebCrawler, CrawlerRunConfig
from ..utils import normalize_url_for_deep_crawl
class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy): class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
""" """
Depth-First Search (DFS) deep crawling strategy. Depth-first deep crawling with familiar BFS rules.
Inherits URL validation and link discovery from BFSDeepCrawlStrategy. We reuse the same filters, scoring, and page limits from :class:`BFSDeepCrawlStrategy`,
Overrides _arun_batch and _arun_stream to use a stack (LIFO) for DFS traversal. but walk the graph with a stack so we fully explore one branch before hopping to the
next. DFS also keeps its own ``_dfs_seen`` set so we can drop duplicate links at
discovery time without accidentally marking them as “already crawled”.
""" """
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._dfs_seen: Set[str] = set()
def _reset_seen(self, start_url: str) -> None:
"""Start each crawl with a clean dedupe set seeded with the root URL."""
self._dfs_seen = {start_url}
async def _arun_batch( async def _arun_batch(
self, self,
start_url: str, start_url: str,
@@ -19,14 +31,19 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
config: CrawlerRunConfig, config: CrawlerRunConfig,
) -> List[CrawlResult]: ) -> List[CrawlResult]:
""" """
Batch (non-streaming) DFS mode. Crawl level-by-level but emit results at the end.
Uses a stack to traverse URLs in DFS order, aggregating CrawlResults into a list.
We keep a stack of ``(url, parent, depth)`` tuples, pop one at a time, and
hand it to ``crawler.arun_many`` with deep crawling disabled so we remain
in control of traversal. Every successful page bumps ``_pages_crawled`` and
seeds new stack items discovered via :meth:`link_discovery`.
""" """
visited: Set[str] = set() visited: Set[str] = set()
# Stack items: (url, parent_url, depth) # Stack items: (url, parent_url, depth)
stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)]
depths: Dict[str, int] = {start_url: 0} depths: Dict[str, int] = {start_url: 0}
results: List[CrawlResult] = [] results: List[CrawlResult] = []
self._reset_seen(start_url)
while stack and not self._cancel_event.is_set(): while stack and not self._cancel_event.is_set():
url, parent, depth = stack.pop() url, parent, depth = stack.pop()
@@ -71,12 +88,16 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
config: CrawlerRunConfig, config: CrawlerRunConfig,
) -> AsyncGenerator[CrawlResult, None]: ) -> AsyncGenerator[CrawlResult, None]:
""" """
Streaming DFS mode. Same traversal as :meth:`_arun_batch`, but yield pages immediately.
Uses a stack to traverse URLs in DFS order and yields CrawlResults as they become available.
Each popped URL is crawled, its metadata annotated, then the result gets
yielded before we even look at the next stack entry. Successful crawls
still feed :meth:`link_discovery`, keeping DFS order intact.
""" """
visited: Set[str] = set() visited: Set[str] = set()
stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)] stack: List[Tuple[str, Optional[str], int]] = [(start_url, None, 0)]
depths: Dict[str, int] = {start_url: 0} depths: Dict[str, int] = {start_url: 0}
self._reset_seen(start_url)
while stack and not self._cancel_event.is_set(): while stack and not self._cancel_event.is_set():
url, parent, depth = stack.pop() url, parent, depth = stack.pop()
@@ -108,3 +129,92 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
for new_url, new_parent in reversed(new_links): for new_url, new_parent in reversed(new_links):
new_depth = depths.get(new_url, depth + 1) new_depth = depths.get(new_url, depth + 1)
stack.append((new_url, new_parent, new_depth)) stack.append((new_url, new_parent, new_depth))
async def link_discovery(
self,
result: CrawlResult,
source_url: str,
current_depth: int,
_visited: Set[str],
next_level: List[Tuple[str, Optional[str]]],
depths: Dict[str, int],
) -> None:
"""
Find the next URLs we should push onto the DFS stack.
Parameters
----------
result : CrawlResult
Output of the page we just crawled; its ``links`` block is our raw material.
source_url : str
URL of the parent page; stored so callers can track ancestry.
current_depth : int
Depth of the parent; children naturally sit at ``current_depth + 1``.
_visited : Set[str]
Present to match the BFS signature, but we rely on ``_dfs_seen`` instead.
next_level : list of tuples
The stack buffer supplied by the caller; we append new ``(url, parent)`` items here.
depths : dict
Shared depth map so future metadata tagging knows how deep each URL lives.
Notes
-----
- ``_dfs_seen`` keeps us from pushing duplicates without touching the traversal guard.
- Validation, scoring, and capacity trimming mirror the BFS version so behaviour stays consistent.
"""
next_depth = current_depth + 1
if next_depth > self.max_depth:
return
remaining_capacity = self.max_pages - self._pages_crawled
if remaining_capacity <= 0:
self.logger.info(
f"Max pages limit ({self.max_pages}) reached, stopping link discovery"
)
return
links = result.links.get("internal", [])
if self.include_external:
links += result.links.get("external", [])
seen = self._dfs_seen
valid_links: List[Tuple[str, float]] = []
for link in links:
raw_url = link.get("href")
if not raw_url:
continue
normalized_url = normalize_url_for_deep_crawl(raw_url, source_url)
if not normalized_url or normalized_url in seen:
continue
if not await self.can_process_url(raw_url, next_depth):
self.stats.urls_skipped += 1
continue
score = self.url_scorer.score(normalized_url) if self.url_scorer else 0
if score < self.score_threshold:
self.logger.debug(
f"URL {normalized_url} skipped: score {score} below threshold {self.score_threshold}"
)
self.stats.urls_skipped += 1
continue
seen.add(normalized_url)
valid_links.append((normalized_url, score))
if len(valid_links) > remaining_capacity:
if self.url_scorer:
valid_links.sort(key=lambda x: x[1], reverse=True)
valid_links = valid_links[:remaining_capacity]
self.logger.info(
f"Limiting to {remaining_capacity} URLs due to max_pages limit"
)
for url, score in valid_links:
if score:
result.metadata = result.metadata or {}
result.metadata["score"] = score
next_level.append((url, source_url))
depths[url] = next_depth

View File

@@ -0,0 +1,39 @@
"""
Simple demonstration of the DFS deep crawler visiting multiple pages.
Run with: python docs/examples/dfs_crawl_demo.py
"""
import asyncio
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.cache_context import CacheMode
from crawl4ai.deep_crawling.dfs_strategy import DFSDeepCrawlStrategy
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def main() -> None:
dfs_strategy = DFSDeepCrawlStrategy(
max_depth=3,
max_pages=50,
include_external=False,
)
config = CrawlerRunConfig(
deep_crawl_strategy=dfs_strategy,
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(),
stream=True,
)
seed_url = "https://docs.python.org/3/" # Plenty of internal links
async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
async for result in await crawler.arun(url=seed_url, config=config):
depth = result.metadata.get("depth")
status = "SUCCESS" if result.success else "FAILED"
print(f"[{status}] depth={depth} url={result.url}")
if __name__ == "__main__":
asyncio.run(main())