Refactor: Move adeep_crawl as method of crawler itself. Create attributes in CrawlResult to reconstruct the tree once deep crawling is completed

This commit is contained in:
Aravind Karnam
2025-01-29 13:06:09 +05:30
parent 78223bc847
commit 84ffdaab9a
15 changed files with 385 additions and 669 deletions

View File

@@ -10,13 +10,20 @@ import asyncio
# from contextlib import nullcontext, asynccontextmanager
from contextlib import asynccontextmanager
from .models import CrawlResult, MarkdownGenerationResult, CrawlerTaskResult, DispatchResult
from .models import (
CrawlResult,
MarkdownGenerationResult,
CrawlerTaskResult,
DispatchResult,
DeepCrawlingProgress,
)
from .async_database import async_db_manager
from .chunking_strategy import * # noqa: F403
from .chunking_strategy import RegexChunking, ChunkingStrategy, IdentityChunking
from .content_filter_strategy import * # noqa: F403
from .content_filter_strategy import RelevantContentFilter
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import NoExtractionStrategy, ExtractionStrategy
from .async_crawler_strategy import (
AsyncCrawlerStrategy,
@@ -30,8 +37,9 @@ from .markdown_generation_strategy import (
)
from .async_logger import AsyncLogger
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .traversal import TraversalStrategy
from .config import MIN_WORD_THRESHOLD
from .utils import (
@@ -46,7 +54,7 @@ from .utils import (
from typing import Union, AsyncGenerator, List, TypeVar
from collections.abc import AsyncGenerator
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
CrawlResultT = TypeVar("CrawlResultT", bound=CrawlResult)
RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
from .__version__ import __version__ as crawl4ai_version
@@ -257,7 +265,7 @@ class AsyncWebCrawler:
@asynccontextmanager
async def nullcontext(self):
"""异步空上下文管理器"""
"""Asynchronous null context manager"""
yield
async def arun(
@@ -420,14 +428,18 @@ class AsyncWebCrawler:
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(url, self.browser_config.user_agent):
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
):
return CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
response_headers={
"X-Robots-Status": "Blocked by robots.txt"
},
)
# Pass config to crawl method
@@ -449,7 +461,7 @@ class AsyncWebCrawler:
)
# Process the HTML content
crawl_result : CrawlResult = await self.aprocess_html(
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
extracted_content=extracted_content,
@@ -717,7 +729,7 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
word_count_threshold=MIN_WORD_THRESHOLD,
@@ -731,8 +743,8 @@ class AsyncWebCrawler:
pdf: bool = False,
user_agent: str = None,
verbose=True,
**kwargs
) -> RunManyReturn:
**kwargs,
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
@@ -786,7 +798,9 @@ class AsyncWebCrawler:
)
transform_result = lambda task_result: (
setattr(task_result.result, 'dispatch_result',
setattr(
task_result.result,
"dispatch_result",
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
@@ -794,20 +808,59 @@ class AsyncWebCrawler:
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
)
) or task_result.result
),
)
or task_result.result
)
stream = config.stream
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):
async for task_result in dispatcher.run_urls_stream(
crawler=self, urls=urls, config=config
):
yield transform_result(task_result)
return result_transformer()
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
return [transform_result(res) for res in _results]
async def adeep_crawl(
self,
url: str,
strategy: TraversalStrategy,
crawler_run_config: Optional[CrawlerRunConfig] = None,
stream: Optional[bool] = False,
) -> Union[AsyncGenerator[CrawlResult,None],List[CrawlResult]]:
"""
Traverse child URLs starting from the given URL, based on Traversal strategy
Args:
url: Starting URL for scraping
strategy: Traversal strategy to use
crawler_config: Configuration object controlling crawl behavior
stream (bool, optional): Whether to stream the results. Defaults to False.
Returns:
List of CrawlResults
"""
try:
result_generator = strategy.deep_crawl(
url, crawler=self, crawler_run_config=crawler_run_config
)
if stream:
return result_generator
else:
results = []
async for result in result_generator:
results.append(result)
return results
except Exception as e:
self.logger.error(f"Error in streaming Deep Crawl: {str(e)}")
raise
async def aclear_cache(self):
"""Clear the cache database."""

View File

@@ -1,3 +1,4 @@
from __future__ import annotations
from pydantic import BaseModel, HttpUrl
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from enum import Enum
@@ -5,6 +6,7 @@ from dataclasses import dataclass
from .ssl_certificate import SSLCertificate
from datetime import datetime
from datetime import timedelta
from math import inf
###############################
@@ -95,6 +97,18 @@ class DispatchResult(BaseModel):
error_message: str = ""
@dataclass
class TraversalStats:
"""Statistics for the traversal process"""
start_time: datetime
urls_processed: int = 0
urls_failed: int = 0
urls_skipped: int = 0
total_depth_reached: int = 0
current_depth: int = 0
class CrawlResult(BaseModel):
url: str
html: str
@@ -118,6 +132,12 @@ class CrawlResult(BaseModel):
ssl_certificate: Optional[SSLCertificate] = None
dispatch_result: Optional[DispatchResult] = None
redirected_url: Optional[str] = None
# Attributes for position
depth: Optional[int] = None
score: Optional[float] = -inf
# For referencing children and parents from a flattened list of CrawlResult elements
parent_url: Optional[str] = None
child_urls: Optional[List[str]] = None
class Config:
arbitrary_types_allowed = True
@@ -161,12 +181,12 @@ class Link(BaseModel):
class Media(BaseModel):
images: List[MediaItem] = []
videos: List[
MediaItem
] = [] # Using MediaItem model for now, can be extended with Video model if needed
audios: List[
MediaItem
] = [] # Using MediaItem model for now, can be extended with Audio model if needed
videos: List[MediaItem] = (
[]
) # Using MediaItem model for now, can be extended with Video model if needed
audios: List[MediaItem] = (
[]
) # Using MediaItem model for now, can be extended with Audio model if needed
class Links(BaseModel):

View File

@@ -1,16 +0,0 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy
from .filters import (
URLFilter,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
)
from .scorers import (
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from .scraper_strategy import ScraperStrategy

View File

@@ -1,149 +0,0 @@
from typing import Union, AsyncGenerator, Optional
from .scraper_strategy import ScraperStrategy
from .models import ScraperResult, CrawlResult, ScraperPageResult
from ..async_configs import BrowserConfig, CrawlerRunConfig
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from contextlib import AbstractAsyncContextManager
@dataclass
class ScrapingProgress:
"""Tracks the progress of a scraping operation."""
processed_urls: int = 0
failed_urls: int = 0
current_url: Optional[str] = None
class AsyncWebScraper(AbstractAsyncContextManager):
"""
A high-level web scraper that combines an async crawler with a scraping strategy.
Args:
crawler_config (CrawlerRunConfig): Configuration for the crawler run
browser_config (BrowserConfig): Configuration for the browser
strategy (ScraperStrategy): The scraping strategy to use
logger (Optional[logging.Logger]): Custom logger for the scraper
"""
async def __aenter__(self):
# Initialize resources, if any
self.logger.info("Starting the async web scraper.")
return self
def __init__(
self,
strategy: ScraperStrategy,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_config: Optional[BrowserConfig] = None,
logger: Optional[logging.Logger] = None,
):
if not isinstance(strategy, ScraperStrategy):
raise TypeError("strategy must be an instance of ScraperStrategy")
if browser_config is not None and not isinstance(browser_config, BrowserConfig):
raise TypeError(
"browser_config must be None or an instance of BrowserConfig"
)
if crawler_config is not None and not isinstance(
crawler_config, CrawlerRunConfig
):
raise TypeError(
"crawler_config must be None or an instance of CrawlerRunConfig"
)
self.crawler_config = crawler_config
self.browser_config = browser_config
self.strategy = strategy
self.logger = logger or logging.getLogger(__name__)
self._progress = ScrapingProgress()
@property
def progress(self) -> ScrapingProgress:
"""Get current scraping progress."""
return self._progress
@asynccontextmanager
async def _error_handling_context(self, url: str):
"""Context manager for handling errors during scraping."""
try:
yield
except Exception as e:
self.logger.error(f"Error scraping {url}: {str(e)}")
self._progress.failed_urls += 1
raise
async def ascrape(
self, url: str, stream: bool = False
) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]:
"""
Scrape a website starting from the given URL.
Args:
url: Starting URL for scraping
stream: If True, yield results as they come; if False, collect all results
Returns:
Either an async generator yielding CrawlResults or a final ScraperResult
"""
self._progress = ScrapingProgress() # Reset progress
async with self._error_handling_context(url):
if stream:
return self._ascrape_yielding(url)
return await self._ascrape_collecting(url)
async def _ascrape_yielding(
self,
url: str,
) -> AsyncGenerator[ScraperPageResult, None]:
"""Stream scraping results as they become available."""
try:
result_generator = self.strategy.ascrape(
url, self.crawler_config, self.browser_config
)
async for page_result in result_generator:
self._progress.processed_urls += 1
self._progress.current_url = page_result.result.url
yield page_result
except Exception as e:
self.logger.error(f"Error in streaming scrape: {str(e)}")
raise
async def _ascrape_collecting(
self,
url: str,
) -> ScraperResult:
"""Collect all scraping results before returning."""
extracted_data = {}
try:
result_generator = self.strategy.ascrape(
url, self.crawler_config, self.browser_config
)
async for res in result_generator:
url = res.result.url
self._progress.processed_urls += 1
self._progress.current_url = url
extracted_data[url] = res
return ScraperResult(
url=url,
crawled_urls=list(extracted_data.keys()),
extracted_data=extracted_data,
stats={
"processed_urls": self._progress.processed_urls,
"failed_urls": self._progress.failed_urls,
},
)
except Exception as e:
self.logger.error(f"Error in collecting scrape: {str(e)}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Cleanup resources or tasks
await self.close() # Assuming you have a close method to cleanup
async def close(self):
# Perform cleanup tasks
pass

View File

@@ -1,209 +0,0 @@
from typing import AsyncGenerator, Optional, Dict, Set
from dataclasses import dataclass
from datetime import datetime
import asyncio
import logging
from urllib.parse import urlparse
from ..async_webcrawler import AsyncWebCrawler
from ..async_configs import BrowserConfig, CrawlerRunConfig
from .models import CrawlResult, ScraperPageResult
from .filters import FilterChain
from .scorers import URLScorer
from .scraper_strategy import ScraperStrategy
from ..config import SCRAPER_BATCH_SIZE
@dataclass
class CrawlStats:
"""Statistics for the crawling process"""
start_time: datetime
urls_processed: int = 0
urls_failed: int = 0
urls_skipped: int = 0
total_depth_reached: int = 0
current_depth: int = 0
class BFSScraperStrategy(ScraperStrategy):
"""Breadth-First Search scraping strategy with politeness controls"""
def __init__(
self,
max_depth: int,
filter_chain: FilterChain,
url_scorer: URLScorer,
process_external_links: bool = False,
logger: Optional[logging.Logger] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.logger = logger or logging.getLogger(__name__)
# Crawl control
self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links
async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on filters
This is our gatekeeper method that determines if a URL should be processed. It:
- Validates URL format using a robust built-in method
- Applies custom filters from the filter chain
- Updates statistics for blocked URLs
- Returns False early if any check fails
"""
try:
result = urlparse(url)
if not all([result.scheme, result.netloc]):
raise ValueError("Invalid URL")
if result.scheme not in ("http", "https"):
raise ValueError("URL must be HTTP or HTTPS")
if not result.netloc or "." not in result.netloc:
raise ValueError("Invalid domain")
except Exception as e:
self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}")
return False
# Apply the filter chain if it's not start page
if depth != 0 and not self.filter_chain.apply(url):
return False
return True
async def _process_links(
self,
result: CrawlResult,
source_url: str,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int],
):
"""Process extracted links from crawl result.
This is our link processor that:
Checks depth limits
Handles both internal and external links
Checks if URL is visited already
Checks if URL can be processed - validates URL, applies Filters with can_process_url
Scores URLs for priority
Updates depth tracking dictionary
Adds valid URLs to the queue
Updates maximum depth statistics
"""
next_depth = depths[source_url] + 1
# If depth limit reached, exit without processing links
if next_depth > self.max_depth:
return
links_to_process = result.links["internal"]
if self.process_external_links:
links_to_process += result.links["external"]
for link in links_to_process:
url = link["href"]
if url in visited:
continue
if not await self.can_process_url(url, next_depth):
self.stats.urls_skipped += 1
continue
score = self.url_scorer.score(url) if self.url_scorer else 0
await queue.put((score, next_depth, url))
depths[url] = next_depth
self.stats.total_depth_reached = max(
self.stats.total_depth_reached, next_depth
)
async def ascrape(
self,
start_url: str,
crawler_config: Optional[CrawlerRunConfig] = None,
browser_config: Optional[BrowserConfig] = None,
) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS crawling strategy"""
# Initialize crawl state
"""
queue: A priority queue where items are tuples of (score, depth, url)
Score: Determines crawling priority (lower = higher priority)
Depth: Current distance from start_url
URL: The actual URL to crawl
visited: Keeps track of URLs we've already seen to avoid cycles
depths: Maps URLs to their depths from the start URL
active_crawls: Tracks currently running crawl tasks
"""
queue = asyncio.PriorityQueue()
await queue.put((0, 0, start_url))
visited: Set[str] = set()
depths = {start_url: 0}
active_crawls = {} # Track URLs currently being processed with depth and score
active_crawls_lock = asyncio.Lock() # Create the lock within the same event loop
# Update crawler_config to stream back results to scraper
crawler_config = crawler_config.clone(stream=True) if crawler_config else CrawlerRunConfig(stream=True)
async with AsyncWebCrawler(
config=browser_config,
verbose=True,
) as crawler:
try:
while (
not queue.empty() or active_crawls
) and not self._cancel_event.is_set():
"""
This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty())
- Or while there are active crawls still running (arun_many)
- Can be interrupted via cancellation (not self._cancel_event.is_set())
"""
# Collect batch of URLs into active_crawls to process
async with active_crawls_lock:
while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty():
score, depth, url = await queue.get()
active_crawls[url] = {"depth": depth, "score": score}
self.stats.current_depth = depth
if not active_crawls:
# If no active crawls exist, wait a bit and continue
await asyncio.sleep(0.1)
continue
# Process batch
try:
async for result in await crawler.arun_many(
urls=list(active_crawls.keys()),
config=crawler_config.clone(stream=True),
):
source_url = result.url
depth = active_crawls[source_url]["depth"]
score=active_crawls[source_url]["score"]
async with active_crawls_lock:
active_crawls.pop(source_url, None)
if result.success:
await self._process_links(
result, source_url, queue, visited, depths
)
yield ScraperPageResult(
result = result,
depth=depth,
score=score,
)
else:
self.logger.warning(
f"Failed to crawl {result.url}: {result.error_message}"
)
except Exception as e:
self.logger.error(f"Batch processing error: {e}")
# Continue processing other batches
continue
except Exception as e:
self.logger.error(f"Error in crawl process: {e}")
raise
finally:
self.stats.end_time = datetime.now()
await crawler.close()
async def shutdown(self):
"""Clean up resources and stop crawling"""
self._cancel_event.set()

View File

@@ -1,12 +0,0 @@
from pydantic import BaseModel
from typing import List, Dict
from ..models import CrawlResult
class ScraperPageResult(BaseModel):
result: CrawlResult
depth: int
score: float
class ScraperResult(BaseModel):
url: str
crawled_urls: List[str]
extracted_data: Dict[str, ScraperPageResult]

View File

@@ -1,40 +0,0 @@
from abc import ABC, abstractmethod
from .models import ScraperResult, ScraperPageResult
from ..async_configs import BrowserConfig, CrawlerRunConfig
from typing import Union, AsyncGenerator
class ScraperStrategy(ABC):
@abstractmethod
async def ascrape(
self,
url: str,
crawler_config: CrawlerRunConfig,
browser_config: BrowserConfig,
stream: bool = False,
) -> Union[AsyncGenerator[ScraperPageResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler.
Args:
url (str): The starting URL for the scrape.
crawler_config (CrawlerRunConfig): Configuration for the crawler run.
browser_config (BrowserConfig): Configuration for the browser.
stream (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult.
Yields:
ScraperPageResult: Individual page results if stream is True.
Returns:
ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if stream is False.
"""
pass
@abstractmethod
async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on strategy rules"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass

View File

@@ -0,0 +1,29 @@
from .bfs_traversal_strategy import BFSTraversalStrategy
from .filters import (
URLFilter,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
)
from .scorers import (
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from .traversal_strategy import TraversalStrategy
__all__ = [
"BFSTraversalStrategy",
"FilterChain",
"URLFilter",
"URLPatternFilter",
"ContentTypeFilter",
"DomainFilter",
"KeywordRelevanceScorer",
"PathDepthScorer",
"FreshnessScorer",
"CompositeScorer",
"TraversalStrategy",
]

View File

@@ -0,0 +1,197 @@
from typing import AsyncGenerator, Optional, Dict, Set, List
from datetime import datetime
import asyncio
import logging
from urllib.parse import urlparse
from ..async_configs import CrawlerRunConfig
from ..models import CrawlResult, TraversalStats
from .filters import FilterChain
from .scorers import URLScorer
from .traversal_strategy import TraversalStrategy
from ..config import SCRAPER_BATCH_SIZE
class BFSTraversalStrategy(TraversalStrategy):
"""Best-First Search traversal strategy with filtering and scoring."""
def __init__(
self,
max_depth: int,
filter_chain: FilterChain,
url_scorer: URLScorer,
process_external_links: bool = False,
logger: Optional[logging.Logger] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.logger = logger or logging.getLogger(__name__)
# Crawl control
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links
async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on filters
This is our gatekeeper method that determines if a URL should be processed. It:
- Validates URL format using a robust built-in method
- Applies custom filters from the filter chain
- Updates statistics for blocked URLs
- Returns False early if any check fails
"""
try:
result = urlparse(url)
if not all([result.scheme, result.netloc]):
raise ValueError("Invalid URL")
if result.scheme not in ("http", "https"):
raise ValueError("URL must be HTTP or HTTPS")
if not result.netloc or "." not in result.netloc:
raise ValueError("Invalid domain")
except Exception as e:
self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}")
return False
# Apply the filter chain if it's not start page
if depth != 0 and not self.filter_chain.apply(url):
return False
return True
async def _process_links(
self,
result: CrawlResult,
source_url: str,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int],
) -> List[str]:
"""Process extracted links from crawl result.
This is our link processor that:
Checks depth limits
Handles both internal and external links
Checks if URL is visited already
Checks if URL can be processed - validates URL, applies Filters with can_process_url
Scores URLs for priority
Updates depth tracking dictionary
Adds valid URLs to the queue
Updates maximum depth statistics
"""
next_depth = depths[source_url] + 1
# If depth limit reached, exit without processing links
if next_depth > self.max_depth:
return
links_to_process = result.links["internal"]
if self.process_external_links:
links_to_process += result.links["external"]
child_urls = []
for link in links_to_process:
url = link["href"]
if url in visited:
continue
if not await self.can_process_url(url, next_depth):
self.stats.urls_skipped += 1
continue
score = self.url_scorer.score(url) if self.url_scorer else 0
child_urls.append(url)
await queue.put((score, next_depth, url, source_url))
depths[url] = next_depth
self.stats.total_depth_reached = max(
self.stats.total_depth_reached, next_depth
)
return child_urls
async def deep_crawl(
self,
start_url: str,
crawler: "AsyncWebCrawler",
crawler_run_config: Optional[CrawlerRunConfig] = None,
) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS traversal strategy"""
# Initialize traversal state
"""
queue: A priority queue where items are tuples of (score, depth, url)
Score: Determines traversal priority (lower = higher priority)
Depth: Current distance from start_url
URL: The actual URL to crawl
visited: Keeps track of URLs we've already seen to avoid cycles
depths: Maps URLs to their depths from the start URL
active_crawls: Tracks currently running crawl tasks
"""
queue = asyncio.PriorityQueue()
await queue.put((0, 0, start_url, None))
visited: Set[str] = set()
depths = {start_url: 0}
active_crawls = {} # Track URLs currently being processed with depth and score
active_crawls_lock = (
asyncio.Lock()
) # Create the lock within the same event loop
try:
while (
not queue.empty() or active_crawls
) and not self._cancel_event.is_set():
"""
This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty())
- Or while there are active crawls still running (arun_many)
- Can be interrupted via cancellation (not self._cancel_event.is_set())
"""
# Collect batch of URLs into active_crawls to process
async with active_crawls_lock:
while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty():
score, depth, url, parent_url = await queue.get()
active_crawls[url] = {
"depth": depth,
"score": score,
"parent_url": parent_url,
}
self.stats.current_depth = depth
if not active_crawls:
# If no active crawls exist, wait a bit and continue
await asyncio.sleep(0.1)
continue
# Process batch
try:
stream_config = (
crawler_run_config.clone(stream=True)
if crawler_run_config
else CrawlerRunConfig(stream=True)
)
async for result in await crawler.arun_many(
urls=list(active_crawls.keys()),
config=stream_config,
):
async with active_crawls_lock:
crawl_info = active_crawls.pop(result.url, None)
if crawl_info and result.success:
child_urls = await self._process_links(
result, result.url, queue, visited, depths
)
result.depth = crawl_info["depth"]
result.score = crawl_info["score"]
result.parent_url = crawl_info["parent_url"]
result.child_urls = child_urls
yield result
else:
self.logger.warning(
f"Failed to crawl {result.url}: {result.error_message}"
)
except Exception as e:
self.logger.error(f"Batch processing error: {e}")
# Continue processing other batches
continue
except Exception as e:
self.logger.error(f"Error in crawl process: {e}")
raise
finally:
self.stats.end_time = datetime.now()
await crawler.close()
async def shutdown(self):
"""Clean up resources and stop crawling"""
self._cancel_event.set()

View File

@@ -1,9 +1,8 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Union
from typing import List, Dict, Optional
from dataclasses import dataclass
from urllib.parse import urlparse, unquote
import re
from collections import defaultdict
import math
import logging
from functools import lru_cache

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from typing import AsyncGenerator
from ..async_configs import CrawlerRunConfig
from ..models import CrawlResult
class TraversalStrategy(ABC):
@abstractmethod
async def deep_crawl(
self,
url: str,
crawler: "AsyncWebCrawler",
crawler_run_config: CrawlerRunConfig = None,
) -> AsyncGenerator[CrawlResult, None]:
"""Traverse the given URL using the specified crawler.
Args:
url (str): The starting URL for the traversal.
crawler (AsyncWebCrawler): The crawler instance to use for traversal.
crawler_run_config (CrawlerRunConfig, optional): The configuration for the crawler.
Returns:
AsyncGenerator[CrawlResult, None]: An async generator yielding crawl results.
"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass

View File

@@ -1,20 +1,18 @@
# basic_scraper_example.py
from crawl4ai.async_configs import CrawlerRunConfig
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
from crawl4ai.traversal import (
BFSTraversalStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
)
from crawl4ai.async_webcrawler import BrowserConfig
from crawl4ai.async_webcrawler import AsyncWebCrawler
import re
import time
browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600)
async def basic_scraper_example():
"""
Basic example: Scrape a blog site for articles
@@ -33,7 +31,7 @@ async def basic_scraper_example():
)
# Initialize the strategy with basic configuration
bfs_strategy = BFSScraperStrategy(
bfs_strategy = BFSTraversalStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
@@ -41,17 +39,18 @@ async def basic_scraper_example():
)
# Create the crawler and scraper
async with AsyncWebScraper(
strategy=bfs_strategy,
) as scraper:
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Start scraping
try:
result = await scraper.ascrape("https://crawl4ai.com/mkdocs")
results = await crawler.adeep_crawl(
"https://crawl4ai.com/mkdocs", strategy=bfs_strategy
)
# Process results
print(f"Crawled {len(result.crawled_urls)} pages:")
for url, page_result in result.extracted_data.items():
print(f"- {url}: {len(page_result.result.html)} bytes")
print(f"Crawled {len(results)} pages:")
for result in results:
print(f"- {result.url}: {len(result.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
@@ -60,9 +59,8 @@ async def basic_scraper_example():
# advanced_scraper_example.py
import logging
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
from crawl4ai.traversal import (
BFSTraversalStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
@@ -123,34 +121,36 @@ async def advanced_scraper_example():
)
# Initialize strategy with advanced configuration
bfs_strategy = BFSScraperStrategy(
bfs_strategy = BFSTraversalStrategy(
max_depth=2, filter_chain=filter_chain, url_scorer=scorer
)
# Create crawler and scraper
async with AsyncWebScraper(
strategy=bfs_strategy,
crawler_config=CrawlerRunConfig(bypass_cache=True, scraping_strategy=LXMLWebScrapingStrategy(),),
browser_config=browser_config,
) as scraper:
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Track statistics
stats = {"processed": 0, "errors": 0, "total_size": 0}
try:
# Use streaming mode
result_generator = await scraper.ascrape(
"https://techcrunch.com", stream=True
result_generator = await crawler.adeep_crawl(
"https://techcrunch.com",
strategy=bfs_strategy,
crawler_run_config=CrawlerRunConfig(
scraping_strategy=LXMLWebScrapingStrategy()
),
stream=True,
)
async for page_result in result_generator:
result = page_result.result
score = page_result.score
depth = page_result.depth
async for result in result_generator:
stats["processed"] += 1
if result.success:
stats["total_size"] += len(result.html)
logger.info(f"Processed at depth: {depth} with score: {score:.3f} : \n {result.url}")
logger.info(
f"Processed at depth: {result.depth} with score: {result.score:.3f} : \n {result.url}"
)
else:
stats["errors"] += 1
logger.error(

View File

@@ -1,187 +0,0 @@
# basic_scraper_example.py
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
)
from crawl4ai.async_webcrawler import AsyncWebCrawler, BrowserConfig
import re
browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600)
async def basic_scraper_example():
"""
Basic example: Scrape a blog site for articles
- Crawls only HTML pages
- Stays within the blog section
- Collects all results at once
"""
# Create a simple filter chain
filter_chain = FilterChain(
[
# Only crawl pages within the blog section
URLPatternFilter("*/tutorial/*"),
# Only process HTML pages
ContentTypeFilter(["text/html"]),
]
)
# Initialize the strategy with basic configuration
strategy = BFSScraperStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
process_external_links=True,
)
# Create the crawler and scraper
async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
scraper = AsyncWebScraper(crawler, strategy)
# Start scraping
try:
result = await scraper.ascrape("https://crawl4ai.com/mkdocs")
# Process results
print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
# advanced_scraper_example.py
import logging
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def advanced_scraper_example():
"""
Advanced example: Intelligent news site scraping
- Uses all filter types
- Implements sophisticated scoring
- Streams results
- Includes monitoring and logging
"""
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("advanced_scraper")
# Create sophisticated filter chain
filter_chain = FilterChain(
[
# Domain control
DomainFilter(
allowed_domains=["techcrunch.com"],
blocked_domains=["login.techcrunch.com", "legal.yahoo.com"],
),
# URL patterns
URLPatternFilter(
[
"*/article/*",
"*/news/*",
"*/blog/*",
re.compile(r"\d{4}/\d{2}/.*"), # Date-based URLs
]
),
# Content types
ContentTypeFilter(["text/html", "application/xhtml+xml"]),
]
)
# Create composite scorer
scorer = CompositeScorer(
[
# Prioritize by keywords
KeywordRelevanceScorer(
keywords=["news", "breaking", "update", "latest"], weight=1.0
),
# Prefer optimal URL structure
PathDepthScorer(optimal_depth=3, weight=0.7),
# Prioritize fresh content
FreshnessScorer(weight=0.9),
]
)
# Initialize strategy with advanced configuration
strategy = BFSScraperStrategy(
max_depth=2, filter_chain=filter_chain, url_scorer=scorer
)
# Create crawler and scraper
async with AsyncWebCrawler(verbose=True, config=browser_config) as crawler:
scraper = AsyncWebScraper(crawler, strategy)
# Track statistics
stats = {"processed": 0, "errors": 0, "total_size": 0}
try:
# Use streaming mode
result_generator = await scraper.ascrape(
"https://techcrunch.com", stream=True
)
async for result in result_generator:
stats["processed"] += 1
if result.success:
stats["total_size"] += len(result.html)
logger.info(f"Processed: {result.url}")
else:
stats["errors"] += 1
logger.error(
f"Failed to process {result.url}: {result.error_message}"
)
# Log progress regularly
if stats["processed"] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e:
logger.error(f"Scraping error: {e}")
finally:
# Print final statistics
logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics
for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics
logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(
f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}"
)
if __name__ == "__main__":
import asyncio
# Run basic example
print("Running basic scraper example...")
asyncio.run(basic_scraper_example())
# Run advanced example
print("\nRunning advanced scraper example...")
asyncio.run(advanced_scraper_example())

0
models.py Normal file
View File