Compare commits

...

5 Commits
v0.7.7 ... next

Author SHA1 Message Date
UncleCode
8a906fcad0 fix(dependencies): Update and clean up package versions in pyproject.toml, the bundle size will be much smaller. 2025-07-29 19:56:27 +08:00
UncleCode
54ae10d957 feat(extraction_strategy): Enhance schema generation with improved validation and task description handling
fix(prompts): Update GENERATE_SCRIPT_PROMPT to raw string for better formatting

docs: Add missing import for GENERATE_SCRIPT_PROMPT in hello_world example
2025-07-29 19:33:36 +08:00
UncleCode
843457a9cb Refactor adaptive crawling state management
- Renamed `CrawlState` to `AdaptiveCrawlResult` to better reflect its purpose.
- Updated all references to `CrawlState` in the codebase, including method signatures and documentation.
- Modified the `AdaptiveCrawler` class to initialize and manage the new `AdaptiveCrawlResult` state.
- Adjusted example strategies and documentation to align with the new state class.
- Ensured all tests are updated to use `AdaptiveCrawlResult` instead of `CrawlState`.
2025-07-24 20:11:43 +08:00
UncleCode
d1de82a332 feat(crawl4ai): Implement SMART cache mode
This commit introduces a new cache mode, SMART, to the crawl4ai library. The SMART mode intelligently validates cached content using HEAD requests before using it, saving significant bandwidth while ensuring fresh content. The changes include modifications to the async_webcrawler.py, cache_context.py, and utils.py files in the crawl4ai directory. The async_webcrawler.py file now includes a check for the SMART cache mode and performs a HEAD check to see if the content has changed. If the content has changed, the url is re-crawled; otherwise, the cached result is used. The cache_context.py and utils.py files have been updated to support these changes.

The documentation has also been updated to reflect these changes. The cache-modes.md file now includes a detailed explanation of the SMART mode, its logs, limitations, and an advanced example. The examples.md file now includes a link to the SMART Cache Mode example. The quickstart.md file now mentions the SMART mode in the note about cache modes.

These changes improve the efficiency of the crawl4ai library by reducing unnecessary re-crawling and bandwidth usage.

BREAKING CHANGE: The introduction of the SMART cache mode may affect existing code that uses the crawl4ai library and does not expect this new mode. Users should review the updated documentation to understand how to use this new mode.
2025-07-21 21:19:37 +08:00
UncleCode
8a04351406 feat(crawl4ai): Update to version 0.7.1 with improvements and new tests
This commit includes several updates to the crawl4ai package, including changes to the browser manager and content scraping strategy. The version number has been updated to 0.7.1. Significant modifications have been made to the documentation, including updates to the release notes for version 0.7.0 and the addition of release notes for version 0.7.1. Examples and core documentation have also been updated to reflect the changes in this version. Additionally, a new simple API test has been added to the Docker tests.

These changes were made to improve the functionality of the crawl4ai package and to provide clearer, more up-to-date documentation for users. The new test will help ensure the API is working as expected.

BREAKING CHANGE: The updates to the browser manager and content scraping strategy may affect how these components interact with the rest of the package. Users should review the updated documentation for details on these changes.
2025-07-18 16:27:19 +08:00
44 changed files with 2047 additions and 2525 deletions

View File

@@ -216,7 +216,7 @@ Under certain assumptions about link preview accuracy:
### 8.1 Core Components
1. **CrawlState**: Maintains crawl history and metrics
1. **AdaptiveCrawlResult**: Maintains crawl history and metrics
2. **AdaptiveConfig**: Configuration parameters
3. **CrawlStrategy**: Pluggable strategy interface
4. **AdaptiveCrawler**: Main orchestrator

View File

@@ -523,15 +523,18 @@ async def test_news_crawl():
- **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically:
```python
config = AdaptiveConfig(
confidence_threshold=0.7,
max_history=100,
learning_rate=0.2
confidence_threshold=0.7, # Min confidence to stop crawling
max_depth=5, # Maximum crawl depth
max_pages=20, # Maximum number of pages to crawl
strategy="statistical"
)
result = await crawler.arun(
"https://news.example.com",
config=CrawlerRunConfig(adaptive_config=config)
)
async with AsyncWebCrawler() as crawler:
adaptive_crawler = AdaptiveCrawler(crawler, config)
state = await adaptive_crawler.digest(
start_url="https://news.example.com",
query="latest news content"
)
# Crawler learns patterns and improves extraction over time
```

View File

@@ -3,7 +3,7 @@ import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig and VirtualScrollConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig
from .content_scraping_strategy import (
ContentScrapingStrategy,
@@ -73,7 +73,7 @@ from .async_url_seeder import AsyncUrlSeeder
from .adaptive_crawler import (
AdaptiveCrawler,
AdaptiveConfig,
CrawlState,
AdaptiveCrawlResult,
CrawlStrategy,
StatisticalStrategy
)
@@ -108,7 +108,7 @@ __all__ = [
# Adaptive Crawler
"AdaptiveCrawler",
"AdaptiveConfig",
"CrawlState",
"AdaptiveCrawlResult",
"CrawlStrategy",
"StatisticalStrategy",
"DeepCrawlStrategy",
@@ -173,6 +173,7 @@ __all__ = [
"CompilationResult",
"ValidationResult",
"ErrorDetail",
"LinkPreviewConfig"
]

View File

@@ -1,7 +1,7 @@
# crawl4ai/__version__.py
# This is the version that will be used for stable releases
__version__ = "0.7.0"
__version__ = "0.7.1"
# For nightly builds, this gets set during build process
__nightly_version__ = None

File diff suppressed because it is too large Load Diff

View File

@@ -24,7 +24,7 @@ from crawl4ai.models import Link, CrawlResult
import numpy as np
@dataclass
class CrawlState:
class AdaptiveCrawlResult:
"""Tracks the current state of adaptive crawling"""
crawled_urls: Set[str] = field(default_factory=set)
knowledge_base: List[CrawlResult] = field(default_factory=list)
@@ -80,7 +80,7 @@ class CrawlState:
json.dump(state_dict, f, indent=2)
@classmethod
def load(cls, path: Union[str, Path]) -> 'CrawlState':
def load(cls, path: Union[str, Path]) -> 'AdaptiveCrawlResult':
"""Load state from disk"""
path = Path(path)
with open(path, 'r') as f:
@@ -256,22 +256,22 @@ class CrawlStrategy(ABC):
"""Abstract base class for crawling strategies"""
@abstractmethod
async def calculate_confidence(self, state: CrawlState) -> float:
async def calculate_confidence(self, state: AdaptiveCrawlResult) -> float:
"""Calculate overall confidence that we have sufficient information"""
pass
@abstractmethod
async def rank_links(self, state: CrawlState, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
async def rank_links(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
"""Rank pending links by expected information gain"""
pass
@abstractmethod
async def should_stop(self, state: CrawlState, config: AdaptiveConfig) -> bool:
async def should_stop(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> bool:
"""Determine if crawling should stop"""
pass
@abstractmethod
async def update_state(self, state: CrawlState, new_results: List[CrawlResult]) -> None:
async def update_state(self, state: AdaptiveCrawlResult, new_results: List[CrawlResult]) -> None:
"""Update state with new crawl results"""
pass
@@ -284,7 +284,7 @@ class StatisticalStrategy(CrawlStrategy):
self.bm25_k1 = 1.2 # BM25 parameter
self.bm25_b = 0.75 # BM25 parameter
async def calculate_confidence(self, state: CrawlState) -> float:
async def calculate_confidence(self, state: AdaptiveCrawlResult) -> float:
"""Calculate confidence using coverage, consistency, and saturation"""
if not state.knowledge_base:
return 0.0
@@ -303,7 +303,7 @@ class StatisticalStrategy(CrawlStrategy):
return confidence
def _calculate_coverage(self, state: CrawlState) -> float:
def _calculate_coverage(self, state: AdaptiveCrawlResult) -> float:
"""Coverage scoring - measures query term presence across knowledge base
Returns a score between 0 and 1, where:
@@ -344,7 +344,7 @@ class StatisticalStrategy(CrawlStrategy):
# This helps differentiate between partial and good coverage
return min(1.0, math.sqrt(coverage))
def _calculate_consistency(self, state: CrawlState) -> float:
def _calculate_consistency(self, state: AdaptiveCrawlResult) -> float:
"""Information overlap between pages - high overlap suggests coherent topic coverage"""
if len(state.knowledge_base) < 2:
return 1.0 # Single or no documents are perfectly consistent
@@ -371,7 +371,7 @@ class StatisticalStrategy(CrawlStrategy):
return consistency
def _calculate_saturation(self, state: CrawlState) -> float:
def _calculate_saturation(self, state: AdaptiveCrawlResult) -> float:
"""Diminishing returns indicator - are we still discovering new information?"""
if not state.new_terms_history:
return 0.0
@@ -388,7 +388,7 @@ class StatisticalStrategy(CrawlStrategy):
return max(0.0, min(saturation, 1.0))
async def rank_links(self, state: CrawlState, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
async def rank_links(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
"""Rank links by expected information gain"""
scored_links = []
@@ -415,7 +415,7 @@ class StatisticalStrategy(CrawlStrategy):
return scored_links
def _calculate_relevance(self, link: Link, state: CrawlState) -> float:
def _calculate_relevance(self, link: Link, state: AdaptiveCrawlResult) -> float:
"""BM25 relevance score between link preview and query"""
if not state.query or not link:
return 0.0
@@ -447,7 +447,7 @@ class StatisticalStrategy(CrawlStrategy):
overlap = len(query_terms & link_terms) / len(query_terms)
return overlap
def _calculate_novelty(self, link: Link, state: CrawlState) -> float:
def _calculate_novelty(self, link: Link, state: AdaptiveCrawlResult) -> float:
"""Estimate how much new information this link might provide"""
if not state.knowledge_base:
return 1.0 # First links are maximally novel
@@ -502,7 +502,7 @@ class StatisticalStrategy(CrawlStrategy):
return min(score, 1.0)
async def should_stop(self, state: CrawlState, config: AdaptiveConfig) -> bool:
async def should_stop(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> bool:
"""Determine if crawling should stop"""
# Check confidence threshold
confidence = state.metrics.get('confidence', 0.0)
@@ -523,7 +523,7 @@ class StatisticalStrategy(CrawlStrategy):
return False
async def update_state(self, state: CrawlState, new_results: List[CrawlResult]) -> None:
async def update_state(self, state: AdaptiveCrawlResult, new_results: List[CrawlResult]) -> None:
"""Update state with new crawl results"""
for result in new_results:
# Track new terms
@@ -921,7 +921,7 @@ class EmbeddingStrategy(CrawlStrategy):
return sorted(scored_links, key=lambda x: x[1], reverse=True)
async def calculate_confidence(self, state: CrawlState) -> float:
async def calculate_confidence(self, state: AdaptiveCrawlResult) -> float:
"""Coverage-based learning score (01)."""
# Guard clauses
if state.kb_embeddings is None or state.query_embeddings is None:
@@ -951,7 +951,7 @@ class EmbeddingStrategy(CrawlStrategy):
# async def calculate_confidence(self, state: CrawlState) -> float:
# async def calculate_confidence(self, state: AdaptiveCrawlResult) -> float:
# """Calculate learning score for adaptive crawling (used for stopping)"""
#
@@ -1021,7 +1021,7 @@ class EmbeddingStrategy(CrawlStrategy):
# # For stopping criteria, return learning score
# return float(learning_score)
async def rank_links(self, state: CrawlState, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
async def rank_links(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> List[Tuple[Link, float]]:
"""Main entry point for link ranking"""
# Store config for use in other methods
self.config = config
@@ -1052,7 +1052,7 @@ class EmbeddingStrategy(CrawlStrategy):
state.kb_embeddings
)
async def validate_coverage(self, state: CrawlState) -> float:
async def validate_coverage(self, state: AdaptiveCrawlResult) -> float:
"""Validate coverage using held-out queries with caching"""
if not hasattr(self, '_validation_queries') or not self._validation_queries:
return state.metrics.get('confidence', 0.0)
@@ -1088,7 +1088,7 @@ class EmbeddingStrategy(CrawlStrategy):
return validation_confidence
async def should_stop(self, state: CrawlState, config: AdaptiveConfig) -> bool:
async def should_stop(self, state: AdaptiveCrawlResult, config: AdaptiveConfig) -> bool:
"""Stop based on learning curve convergence"""
confidence = state.metrics.get('confidence', 0.0)
@@ -1139,7 +1139,7 @@ class EmbeddingStrategy(CrawlStrategy):
return False
def get_quality_confidence(self, state: CrawlState) -> float:
def get_quality_confidence(self, state: AdaptiveCrawlResult) -> float:
"""Calculate quality-based confidence score for display"""
learning_score = state.metrics.get('learning_score', 0.0)
validation_score = state.metrics.get('validation_confidence', 0.0)
@@ -1166,7 +1166,7 @@ class EmbeddingStrategy(CrawlStrategy):
return confidence
async def update_state(self, state: CrawlState, new_results: List[CrawlResult]) -> None:
async def update_state(self, state: AdaptiveCrawlResult, new_results: List[CrawlResult]) -> None:
"""Update embeddings and coverage metrics with deduplication"""
from .utils import get_text_embeddings
@@ -1246,7 +1246,7 @@ class AdaptiveCrawler:
self.strategy = self._create_strategy(self.config.strategy)
# Initialize state
self.state: Optional[CrawlState] = None
self.state: Optional[AdaptiveCrawlResult] = None
# Track if we own the crawler (for cleanup)
self._owns_crawler = crawler is None
@@ -1266,14 +1266,14 @@ class AdaptiveCrawler:
async def digest(self,
start_url: str,
query: str,
resume_from: Optional[str] = None) -> CrawlState:
resume_from: Optional[str] = None) -> AdaptiveCrawlResult:
"""Main entry point for adaptive crawling"""
# Initialize or resume state
if resume_from:
self.state = CrawlState.load(resume_from)
self.state = AdaptiveCrawlResult.load(resume_from)
self.state.query = query # Update query in case it changed
else:
self.state = CrawlState(
self.state = AdaptiveCrawlResult(
crawled_urls=set(),
knowledge_base=[],
pending_links=[],
@@ -1803,7 +1803,7 @@ class AdaptiveCrawler:
# Initialize state if needed
if not self.state:
self.state = CrawlState()
self.state = AdaptiveCrawlResult()
# Add imported results
self.state.knowledge_base.extend(imported_results)

View File

@@ -47,6 +47,7 @@ from .utils import (
get_error_context,
RobotsParser,
preprocess_html_for_schema,
should_crawl_based_on_head,
)
@@ -268,31 +269,56 @@ class AsyncWebCrawler:
cached_result = await async_db_manager.aget_cached_url(url)
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(
cached_result.extracted_content or ""
)
extracted_content = (
None
if not extracted_content or extracted_content == "[]"
else extracted_content
)
# If screenshot is requested but its not in cache, then set cache_result to None
screenshot_data = cached_result.screenshot
pdf_data = cached_result.pdf
# if config.screenshot and not screenshot or config.pdf and not pdf:
if config.screenshot and not screenshot_data:
cached_result = None
# Check if SMART mode requires validation
if cache_context.cache_mode == CacheMode.SMART:
# Perform HEAD check to see if content has changed
user_agent = self.crawler_strategy.user_agent if hasattr(self.crawler_strategy, 'user_agent') else "Mozilla/5.0"
should_crawl, reason = await should_crawl_based_on_head(
url=url,
cached_headers=cached_result.response_headers or {},
user_agent=user_agent,
timeout=5
)
if should_crawl:
self.logger.info(
f"SMART cache: {reason} - Re-crawling {url}",
tag="SMART"
)
cached_result = None # Force re-crawl
else:
self.logger.info(
f"SMART cache: {reason} - Using cache for {url}",
tag="SMART"
)
# Process cached result if still valid
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(
cached_result.extracted_content or ""
)
extracted_content = (
None
if not extracted_content or extracted_content == "[]"
else extracted_content
)
# If screenshot is requested but its not in cache, then set cache_result to None
screenshot_data = cached_result.screenshot
pdf_data = cached_result.pdf
# if config.screenshot and not screenshot or config.pdf and not pdf:
if config.screenshot and not screenshot_data:
cached_result = None
if config.pdf and not pdf_data:
cached_result = None
if config.pdf and not pdf_data:
cached_result = None
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - start_time,
tag="FETCH",
)
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - start_time,
tag="FETCH",
)
# Update proxy configuration from rotation strategy if available
if config and config.proxy_rotation_strategy:

View File

@@ -14,23 +14,8 @@ import hashlib
from .js_snippet import load_js_script
from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from playwright_stealth import StealthConfig
from .utils import get_chromium_path
stealth_config = StealthConfig(
webdriver=True,
chrome_app=True,
chrome_csi=True,
chrome_load_times=True,
chrome_runtime=True,
navigator_languages=True,
navigator_plugins=True,
navigator_permissions=True,
webgl_vendor=True,
outerdimensions=True,
navigator_hardware_concurrency=True,
media_codecs=True,
)
BROWSER_DISABLE_OPTIONS = [
"--disable-background-networking",

View File

@@ -11,6 +11,7 @@ class CacheMode(Enum):
- READ_ONLY: Only read from cache, don't write
- WRITE_ONLY: Only write to cache, don't read
- BYPASS: Bypass cache for this operation
- SMART: Validate cache with HEAD request before using
"""
ENABLED = "enabled"
@@ -18,6 +19,7 @@ class CacheMode(Enum):
READ_ONLY = "read_only"
WRITE_ONLY = "write_only"
BYPASS = "bypass"
SMART = "smart"
class CacheContext:
@@ -62,14 +64,14 @@ class CacheContext:
How it works:
1. If always_bypass is True or is_cacheable is False, return False.
2. If cache_mode is ENABLED or READ_ONLY, return True.
2. If cache_mode is ENABLED, READ_ONLY, or SMART, return True.
Returns:
bool: True if cache should be read, False otherwise.
"""
if self.always_bypass or not self.is_cacheable:
return False
return self.cache_mode in [CacheMode.ENABLED, CacheMode.READ_ONLY]
return self.cache_mode in [CacheMode.ENABLED, CacheMode.READ_ONLY, CacheMode.SMART]
def should_write(self) -> bool:
"""
@@ -77,14 +79,14 @@ class CacheContext:
How it works:
1. If always_bypass is True or is_cacheable is False, return False.
2. If cache_mode is ENABLED or WRITE_ONLY, return True.
2. If cache_mode is ENABLED, WRITE_ONLY, or SMART, return True.
Returns:
bool: True if cache should be written, False otherwise.
"""
if self.always_bypass or not self.is_cacheable:
return False
return self.cache_mode in [CacheMode.ENABLED, CacheMode.WRITE_ONLY]
return self.cache_mode in [CacheMode.ENABLED, CacheMode.WRITE_ONLY, CacheMode.SMART]
@property
def display_url(self) -> str:

View File

@@ -1145,10 +1145,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
link_data["intrinsic_score"] = intrinsic_score
except Exception:
# Fail gracefully - assign default score
link_data["intrinsic_score"] = float('inf')
link_data["intrinsic_score"] = 0
else:
# No scoring enabled - assign infinity (all links equal priority)
link_data["intrinsic_score"] = float('inf')
link_data["intrinsic_score"] = 0
is_external = is_external_url(normalized_href, base_domain)
if is_external:

View File

@@ -1088,111 +1088,147 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
@staticmethod
def generate_schema(
html: str,
schema_type: str = "CSS", # or XPATH
query: str = None,
target_json_example: str = None,
llm_config: 'LLMConfig' = create_llm_config(),
provider: str = None,
api_token: str = None,
**kwargs
*,
schema_type: str = "CSS", # "CSS" or "XPATH"
query: str | None = None,
target_json_example: str | None = None,
last_instruction: str | None = None, # extra “IMPORTANT” notes
llm_config: "LLMConfig" = create_llm_config(),
token_usages: Optional[list["TokenUsage"]] = None,
prompt: str | None = None,
**kwargs,
) -> dict:
"""
Generate extraction schema from HTML content and optional query.
Args:
html (str): The HTML content to analyze
query (str, optional): Natural language description of what data to extract
provider (str): Legacy Parameter. LLM provider to use
api_token (str): Legacy Parameter. API token for LLM provider
llm_config (LLMConfig): LLM configuration object
prompt (str, optional): Custom prompt template to use
**kwargs: Additional args passed to LLM processor
Returns:
dict: Generated schema following the JsonElementExtractionStrategy format
Produce a JSON extraction schema from raw HTML.
- If `query` is given, the task section echoes it.
- If no `query` but `target_json_example` exists,
we instruct the model to fit the schema to that example.
- If neither is provided, we ask the model to detect
the most obvious repeating data and build a schema.
Returns
-------
dict
A schema compliant with JsonElementExtractionStrategy.
"""
from .prompts import JSON_SCHEMA_BUILDER
import json, re, textwrap
from .prompts import JSON_SCHEMA_BUILDER, JSON_SCHEMA_BUILDER_XPATH
from .utils import perform_completion_with_backoff
for name, message in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals()[name] is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {message}")
# Use default or custom prompt
prompt_template = JSON_SCHEMA_BUILDER if schema_type == "CSS" else JSON_SCHEMA_BUILDER_XPATH
# Build the prompt
system_message = {
"role": "system",
"content": f"""You specialize in generating special JSON schemas for web scraping. This schema uses CSS or XPATH selectors to present a repetitive pattern in crawled HTML, such as a product in a product list or a search result item in a list of search results. We use this JSON schema to pass to a language model along with the HTML content to extract structured data from the HTML. The language model uses the JSON schema to extract data from the HTML and retrieve values for fields in the JSON schema, following the schema.
Generating this HTML manually is not feasible, so you need to generate the JSON schema using the HTML content. The HTML copied from the crawled website is provided below, which we believe contains the repetitive pattern.
# ─── basic validation ────────────────────────────────────
if not html or not html.strip():
raise ValueError("html must be non-empty")
if schema_type not in {"CSS", "XPATH"}:
raise ValueError("schema_type must be 'CSS' or 'XPATH'")
for name, msg in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals().get(name) is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {msg}")
# Schema main keys:
- name: This is the name of the schema.
- baseSelector: This is the CSS or XPATH selector that identifies the base element that contains all the repetitive patterns.
- baseFields: This is a list of fields that you extract from the base element itself.
- fields: This is a list of fields that you extract from the children of the base element. {{name, selector, type}} based on the type, you may have extra keys such as "attribute" when the type is "attribute".
# Extra Context:
In this context, the following items may or may not be present:
- Example of target JSON object: This is a sample of the final JSON object that we hope to extract from the HTML using the schema you are generating.
- Extra Instructions: This is optional instructions to consider when generating the schema provided by the user.
- Query or explanation of target/goal data item: This is a description of what data we are trying to extract from the HTML. This explanation means we're not sure about the rigid schema of the structures we want, so we leave it to you to use your expertise to create the best and most comprehensive structures aimed at maximizing data extraction from this page. You must ensure that you do not pick up nuances that may exist on a particular page. The focus should be on the data we are extracting, and it must be valid, safe, and robust based on the given HTML.
# What if there is no example of target JSON object and also no extra instructions or even no explanation of target/goal data item?
In this scenario, use your best judgment to generate the schema. You need to examine the content of the page and understand the data it provides. If the page contains repetitive data, such as lists of items, products, jobs, places, books, or movies, focus on one single item that repeats. If the page is a detailed page about one product or item, create a schema to extract the entire structured data. At this stage, you must think and decide for yourself. Try to maximize the number of fields that you can extract from the HTML.
# What are the instructions and details for this schema generation?
{prompt_template}"""
}
user_message = {
"role": "user",
"content": f"""
HTML to analyze:
```html
{html}
```
"""
}
# ─── prompt selection ────────────────────────────────────
prompt_template = (
prompt
if prompt is not None
else (JSON_SCHEMA_BUILDER if schema_type == "CSS" else JSON_SCHEMA_BUILDER_XPATH)
)
# ─── derive task description ─────────────────────────────
if query:
user_message["content"] += f"\n\n## Query or explanation of target/goal data item:\n{query}"
if target_json_example:
user_message["content"] += f"\n\n## Example of target JSON object:\n```json\n{target_json_example}\n```"
if query and not target_json_example:
user_message["content"] += """IMPORTANT: To remind you, in this process, we are not providing a rigid example of the adjacent objects we seek. We rely on your understanding of the explanation provided in the above section. Make sure to grasp what we are looking for and, based on that, create the best schema.."""
elif not query and target_json_example:
user_message["content"] += """IMPORTANT: Please remember that in this process, we provided a proper example of a target JSON object. Make sure to adhere to the structure and create a schema that exactly fits this example. If you find that some elements on the page do not match completely, vote for the majority."""
elif not query and not target_json_example:
user_message["content"] += """IMPORTANT: Since we neither have a query nor an example, it is crucial to rely solely on the HTML content provided. Leverage your expertise to determine the schema based on the repetitive patterns observed in the content."""
user_message["content"] += """IMPORTANT:
0/ Ensure your schema remains reliable by avoiding selectors that appear to generate dynamically and are not dependable. You want a reliable schema, as it consistently returns the same data even after many page reloads.
1/ DO NOT USE use base64 kind of classes, they are temporary and not reliable.
2/ Every selector must refer to only one unique element. You should ensure your selector points to a single element and is unique to the place that contains the information. You have to use available techniques based on CSS or XPATH requested schema to make sure your selector is unique and also not fragile, meaning if we reload the page now or in the future, the selector should remain reliable.
3/ Do not use Regex as much as possible.
Analyze the HTML and generate a JSON schema that follows the specified format. Only output valid JSON schema, nothing else.
"""
try:
# Call LLM with backoff handling
response = perform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_message["content"], user_message["content"]]),
json_response = True,
api_token=llm_config.api_token,
base_url=llm_config.base_url,
extra_args=kwargs
task_line = query.strip()
elif target_json_example:
task_line = (
"Use the example JSON below to infer all required fields, "
"then generate a schema that extracts matching data."
)
# Extract and return schema
return json.loads(response.choices[0].message.content)
except Exception as e:
raise Exception(f"Failed to generate schema: {str(e)}")
else:
task_line = (
"Detect the most obvious repeating data on this page and "
"generate a schema that captures it completely."
)
# ─── build user prompt body ──────────────────────────────
html_clean = re.sub(r"\s{2,}", " ", textwrap.dedent(html).strip())
parts: list[str] = [
f"{prompt_template}",
"\n\n## Extracted HTML\n"
"==================== Beginning of Html ====================\n",
html_clean,
"\n==================== End of Html ====================\n",
]
if target_json_example:
parts.extend(
[
"\n## Example of end result\n",
target_json_example.strip(),
"\n",
]
)
if last_instruction:
parts.extend(
[
"\n## Important\n",
last_instruction.strip(),
"\n",
]
)
parts.extend(
[
"\n## Task:\n",
task_line,
]
)
user_message = {"role": "user", "content": "".join(parts)}
# slim system message, JSON_SCHEMA_BUILDER already holds heavy guidance
system_message = {
"role": "system",
"content": (
"You generate reliable JSON schemas for structured extraction. "
"Return valid JSON only."
),
}
# ─── call LLM ─────────────────────────────────────────────
response = perform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join(
[system_message["content"], user_message["content"]]
),
json_response=True,
api_token=llm_config.api_token,
base_url=llm_config.base_url,
extra_args=kwargs,
)
# ─── token usage accounting ──────────────────────────────
if token_usages is not None and hasattr(response, "usage"):
token_usages.append(
TokenUsage(
completion_tokens=getattr(response.usage, "completion_tokens", 0),
prompt_tokens=getattr(response.usage, "prompt_tokens", 0),
total_tokens=getattr(response.usage, "total_tokens", 0),
)
)
# ─── parse and validate JSON answer ──────────────────────
try:
schema = json.loads(response.choices[0].message.content)
except Exception as exc:
raise ValueError(f"LLM returned invalid JSON: {exc}") from exc
required = {"name", "baseSelector", "fields"}
if not required.issubset(schema):
missing = required - set(schema)
raise ValueError(f"Generated schema missing required keys: {missing}")
return schema
class JsonCssExtractionStrategy(JsonElementExtractionStrategy):
"""

View File

@@ -1056,7 +1056,7 @@ Your output must:
</output_requirements>
"""
GENERATE_SCRIPT_PROMPT = """You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
GENERATE_SCRIPT_PROMPT = r"""You are a world-class browser automation specialist. Your sole purpose is to convert a natural language objective and a snippet of HTML into the most **efficient, robust, and simple** script possible to prepare a web page for data extraction.
Your scripts run **before the crawl** to handle dynamic content, user interactions, and other obstacles. You are a master of two tools: raw **JavaScript** and the high-level **Crawl4ai Script (c4a)**.

View File

@@ -3387,3 +3387,90 @@ def cosine_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate cosine distance (1 - similarity) between two vectors"""
return 1 - cosine_similarity(vec1, vec2)
async def should_crawl_based_on_head(
url: str,
cached_headers: Dict[str, str],
user_agent: str = "Mozilla/5.0",
timeout: int = 5
) -> tuple[bool, str]:
"""
Check if content has changed using HEAD request.
Args:
url: The URL to check
cached_headers: The cached response headers from previous crawl
user_agent: User agent string to use for the HEAD request
timeout: Timeout in seconds for the HEAD request
Returns:
Tuple of (should_crawl: bool, reason: str)
- should_crawl: True if content has changed and should be re-crawled, False otherwise
- reason: Explanation of the decision
"""
import email.utils
if not cached_headers:
return True, "No cached headers available, must crawl"
headers = {
"Accept-Encoding": "identity",
"User-Agent": user_agent,
"Want-Content-Digest": "sha-256", # Request RFC 9530 digest
}
# Add conditional headers if available in cache
if cached_headers.get("etag"):
headers["If-None-Match"] = cached_headers["etag"]
if cached_headers.get("last-modified"):
headers["If-Modified-Since"] = cached_headers["last-modified"]
try:
async with aiohttp.ClientSession() as session:
async with session.head(
url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout),
allow_redirects=True
) as response:
# 304 Not Modified - content hasn't changed
if response.status == 304:
return False, "304 Not Modified - Content unchanged"
# Check other headers if no 304 response
new_headers = dict(response.headers)
# Check Content-Digest (most reliable)
if new_headers.get("content-digest") and cached_headers.get("content-digest"):
if new_headers["content-digest"] == cached_headers["content-digest"]:
return False, "Content-Digest matches - Content unchanged"
# Check strong ETag
if new_headers.get("etag") and cached_headers.get("etag"):
# Strong ETags start with '"'
if (new_headers["etag"].startswith('"') and
new_headers["etag"] == cached_headers["etag"]):
return False, "Strong ETag matches - Content unchanged"
# Check Last-Modified
if new_headers.get("last-modified") and cached_headers.get("last-modified"):
try:
new_lm = email.utils.parsedate_to_datetime(new_headers["last-modified"])
cached_lm = email.utils.parsedate_to_datetime(cached_headers["last-modified"])
if new_lm <= cached_lm:
return False, "Last-Modified not newer - Content unchanged"
except Exception:
pass
# Content-Length changed is a positive signal
if (new_headers.get("content-length") and cached_headers.get("content-length") and
new_headers["content-length"] != cached_headers["content-length"]):
return True, f"Content-Length changed ({cached_headers['content-length']} -> {new_headers['content-length']})"
# Default: assume content has changed
return True, "No definitive cache headers matched - Assuming content changed"
except Exception as e:
# On error, assume content has changed (safe default)
return True, f"HEAD request failed: {str(e)} - Assuming content changed"

View File

@@ -10,9 +10,8 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -30,44 +29,41 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores
```python
from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
import asyncio
# Initialize with custom learning parameters
config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to use learned patterns
max_history=100, # Remember last 100 crawls per domain
learning_rate=0.2, # How quickly to adapt to changes
patterns_per_page=3, # Patterns to learn per page type
extraction_strategy='css' # 'css' or 'xpath'
)
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
async def main():
# Configure adaptive crawler
config = AdaptiveConfig(
strategy="statistical", # or "embedding" for semantic understanding
max_pages=10,
confidence_threshold=0.7, # Stop at 70% confidence
top_k_links=3, # Follow top 3 links per page
min_gain_threshold=0.05 # Need 5% information gain to continue
)
# Crawler identifies and stores patterns
if result.success:
state = adaptive_crawler.get_state("news.example.com")
print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
print("Starting adaptive crawl about Python decorators...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
asyncio.run(main())
```
**Expected Real-World Impact:**
@@ -92,9 +88,7 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
wait_after_scroll=1.0 # Let content load
)
# For e-commerce product grids (Instagram style)
@@ -102,8 +96,7 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid",
scroll_count=30,
scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
wait_after_scroll=1.5 # Images need time
)
# For news feeds with lazy loading
@@ -111,9 +104,7 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed",
scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
wait_after_scroll=0.5 # Wait for content to load
)
# Use it in your crawl
@@ -157,68 +148,63 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### The Three-Layer Scoring System
### Intelligent Link Analysis and Scoring
```python
from crawl4ai import LinkPreviewConfig
import asyncio
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
# What to analyze
include_internal=True,
include_external=True,
max_links=100, # Analyze top 100 links
# Relevance scoring
query="machine learning tutorials", # Your interest
score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
async def main():
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
include_internal=True,
include_external=False,
max_links=10,
concurrency=5,
query="python tutorial", # For contextual scoring
score_threshold=0.3,
verbose=True
)
)
# Use in your crawl
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
# Access scored and sorted links
for link in result.links["internal"][:10]: # Top 10 internal links
print(f"Score: {link['total_score']:.3f}")
print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
print(f" Description: {link['head_data']['meta']['description'][:100]}...")
# Access scored and sorted links
if result.success and result.links:
for link in result.links.get("internal", []):
text = link.get('text', 'No text')[:40]
print(
text,
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10",
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1",
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
```
**Scoring Components:**
1. **Intrinsic Score (0-10)**: Based on link quality indicators
1. **Intrinsic Score**: Based on link quality indicators
- Position on page (navigation, content, footer)
- Link attributes (rel, title, class names)
- Anchor text quality and length
- URL structure and depth
2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
2. **Contextual Score**: Relevance to your query using BM25 algorithm
- Keyword matching in link text and title
- Meta description analysis
- Content preview scoring
3. **Total Score**: Weighted combination for final ranking
3. **Total Score**: Combined score for final ranking
**Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -235,58 +221,34 @@ for link in result.links["internal"][:10]: # Top 10 internal links
### Technical Architecture
```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig
# Basic discovery - find all product pages
seeder_config = SeedingConfig(
# Discovery sources
source="sitemap+cc", # Sitemap + Common Crawl
# Filtering
pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation
live_check=True, # Verify URLs are alive
max_urls=5000, # Stop at 5000 URLs
# Performance
concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
)
async def main():
async with AsyncUrlSeeder() as seeder:
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter
extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
asyncio.run(main())
```
**Discovery Methods:**
@@ -309,35 +271,18 @@ This release includes significant performance improvements through optimized res
### What We Optimized
```python
# Before v0.7.0 (slow)
# Optimized crawling with v0.7.0 improvements
results = []
for url in urls:
result = await crawler.arun(url)
results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
result = await crawler.arun(
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
results.append(result)
```
**Performance Gains:**
@@ -347,24 +292,6 @@ async for result in crawler.arun_stream(large_url_list):
- **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes

View File

@@ -0,0 +1,43 @@
# 🛠️ Crawl4AI v0.7.1: Minor Cleanup Update
*July 17, 2025 • 2 min read*
---
A small maintenance release that removes unused code and improves documentation.
## 🎯 What's Changed
- **Removed unused StealthConfig** from `crawl4ai/browser_manager.py`
- **Updated documentation** with better examples and parameter explanations
- **Fixed virtual scroll configuration** examples in docs
## 🧹 Code Cleanup
Removed unused `StealthConfig` import and configuration that wasn't being used anywhere in the codebase. The project uses its own custom stealth implementation through JavaScript injection instead.
```python
# Removed unused code:
from playwright_stealth import StealthConfig
stealth_config = StealthConfig(...) # This was never used
```
## 📖 Documentation Updates
- Fixed adaptive crawling parameter examples
- Updated session management documentation
- Corrected virtual scroll configuration examples
## 🚀 Installation
```bash
pip install crawl4ai==0.7.1
```
No breaking changes - upgrade directly from v0.7.0.
---
Questions? Issues?
- GitHub: [github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
- Discord: [discord.gg/crawl4ai](https://discord.gg/jP8KfhDhyN)

View File

@@ -9,7 +9,7 @@ import asyncio
import re
from typing import List, Dict, Set
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
from crawl4ai.adaptive_crawler import CrawlState, Link
from crawl4ai.adaptive_crawler import AdaptiveCrawlResult, Link
import math
@@ -45,7 +45,7 @@ class APIDocumentationStrategy:
r'/legal/'
]
def score_link(self, link: Link, query: str, state: CrawlState) -> float:
def score_link(self, link: Link, query: str, state: AdaptiveCrawlResult) -> float:
"""Custom link scoring for API documentation"""
score = 1.0
url = link.href.lower()
@@ -77,7 +77,7 @@ class APIDocumentationStrategy:
return score
def calculate_api_coverage(self, state: CrawlState, query: str) -> Dict[str, float]:
def calculate_api_coverage(self, state: AdaptiveCrawlResult, query: str) -> Dict[str, float]:
"""Calculate specialized coverage metrics for API documentation"""
metrics = {
'endpoint_coverage': 0.0,

View File

@@ -8,6 +8,8 @@ from crawl4ai import (
CrawlResult
)
from crawl4ai.prompts import GENERATE_SCRIPT_PROMPT
async def main():
browser_config = BrowserConfig(

View File

@@ -18,7 +18,7 @@ Usage:
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.async_configs import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
async def basic_link_head_extraction():

View File

@@ -0,0 +1,202 @@
"""
SMART Cache Mode Example for Crawl4AI
This example demonstrates how to use the SMART cache mode to intelligently
validate cached content before using it. SMART mode can save 70-95% bandwidth
on unchanged content while ensuring you always get fresh data when it changes.
SMART Cache Mode: Only Crawl When Changes
"""
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
import asyncio
import time
from crawl4ai import AsyncWebCrawler
from crawl4ai.cache_context import CacheMode
from crawl4ai.async_configs import CrawlerRunConfig
async def basic_smart_cache_example():
"""Basic example showing SMART cache mode in action"""
print("=== Basic SMART Cache Example ===\n")
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://example.com"
# First crawl: Cache the content
print("1. Initial crawl to cache the content:")
config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
result1 = await crawler.arun(url=url, config=config)
print(f" Initial crawl: {len(result1.html)} bytes\n")
# Second crawl: Use SMART mode
print("2. SMART mode crawl (should use cache for static content):")
smart_config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
start_time = time.time()
result2 = await crawler.arun(url=url, config=smart_config)
elapsed = time.time() - start_time
print(f" SMART crawl: {len(result2.html)} bytes in {elapsed:.2f}s")
print(f" Content identical: {result1.html == result2.html}\n")
async def news_site_monitoring():
"""Monitor a news site for changes using SMART cache mode"""
print("=== News Site Monitoring Example ===\n")
async with AsyncWebCrawler(verbose=True) as crawler:
config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
url = "https://news.ycombinator.com"
print("Monitoring Hacker News for changes...\n")
previous_length = 0
for i in range(3):
result = await crawler.arun(url=url, config=config)
current_length = len(result.html)
if i == 0:
print(f"Check {i+1}: Initial crawl - {current_length} bytes")
else:
if current_length != previous_length:
print(f"Check {i+1}: Content changed! {previous_length} -> {current_length} bytes")
else:
print(f"Check {i+1}: Content unchanged - {current_length} bytes")
previous_length = current_length
if i < 2: # Don't wait after last check
print(" Waiting 10 seconds before next check...")
await asyncio.sleep(10)
print()
async def compare_cache_modes():
"""Compare different cache modes to understand SMART mode benefits"""
print("=== Cache Mode Comparison ===\n")
async with AsyncWebCrawler(verbose=False) as crawler:
url = "https://www.wikipedia.org"
# First, populate the cache
config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
await crawler.arun(url=url, config=config)
print("Cache populated.\n")
# Test different cache modes
modes = [
(CacheMode.ENABLED, "ENABLED (always uses cache if available)"),
(CacheMode.BYPASS, "BYPASS (never uses cache)"),
(CacheMode.SMART, "SMART (validates cache before using)")
]
for mode, description in modes:
config = CrawlerRunConfig(cache_mode=mode)
start_time = time.time()
result = await crawler.arun(url=url, config=config)
elapsed = time.time() - start_time
print(f"{description}:")
print(f" Time: {elapsed:.2f}s")
print(f" Size: {len(result.html)} bytes\n")
async def dynamic_content_example():
"""Show how SMART mode handles dynamic content"""
print("=== Dynamic Content Example ===\n")
async with AsyncWebCrawler(verbose=True) as crawler:
# URL that returns different content each time
dynamic_url = "https://httpbin.org/uuid"
print("Testing with dynamic content (changes every request):\n")
# First crawl
config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
result1 = await crawler.arun(url=dynamic_url, config=config)
# Extract UUID from the response
import re
uuid1 = re.search(r'"uuid":\s*"([^"]+)"', result1.html)
if uuid1:
print(f"1. First crawl UUID: {uuid1.group(1)}")
# SMART mode crawl - should detect change and re-crawl
smart_config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
result2 = await crawler.arun(url=dynamic_url, config=smart_config)
uuid2 = re.search(r'"uuid":\s*"([^"]+)"', result2.html)
if uuid2:
print(f"2. SMART crawl UUID: {uuid2.group(1)}")
print(f" Different UUIDs: {uuid1.group(1) != uuid2.group(1)} (should be True)")
async def bandwidth_savings_demo():
"""Demonstrate bandwidth savings with SMART mode"""
print("=== Bandwidth Savings Demo ===\n")
async with AsyncWebCrawler(verbose=True) as crawler:
# List of URLs to crawl
urls = [
"https://example.com",
"https://www.python.org",
"https://docs.python.org/3/",
]
print("Crawling multiple URLs twice to show bandwidth savings:\n")
# First pass: Cache all URLs
print("First pass - Caching all URLs:")
total_bytes_pass1 = 0
config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
for url in urls:
result = await crawler.arun(url=url, config=config)
total_bytes_pass1 += len(result.html)
print(f" {url}: {len(result.html)} bytes")
print(f"\nTotal downloaded in first pass: {total_bytes_pass1} bytes")
# Second pass: Use SMART mode
print("\nSecond pass - Using SMART mode:")
total_bytes_pass2 = 0
smart_config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
for url in urls:
result = await crawler.arun(url=url, config=smart_config)
# In SMART mode, unchanged content uses cache (minimal bandwidth)
print(f" {url}: Using {'cache' if result else 'fresh crawl'}")
print(f"\nBandwidth saved: ~{total_bytes_pass1} bytes (only HEAD requests sent)")
async def main():
"""Run all examples"""
examples = [
basic_smart_cache_example,
news_site_monitoring,
compare_cache_modes,
dynamic_content_example,
bandwidth_savings_demo
]
for example in examples:
await example()
print("\n" + "="*50 + "\n")
await asyncio.sleep(2) # Brief pause between examples
if __name__ == "__main__":
print("""
Crawl4AI SMART Cache Mode Examples
==================================
These examples demonstrate the SMART cache mode that intelligently
validates cached content using HEAD requests before deciding whether
to use cache or perform a fresh crawl.
""")
asyncio.run(main())

View File

@@ -130,7 +130,7 @@ Factors:
```python
class CustomLinkScorer:
def score(self, link: Link, query: str, state: CrawlState) -> float:
def score(self, link: Link, query: str, state: AdaptiveCrawlResult) -> float:
# Prioritize specific URL patterns
if "/api/reference/" in link.href:
return 2.0 # Double the score
@@ -325,17 +325,17 @@ with open("crawl_analysis.json", "w") as f:
from crawl4ai.adaptive_crawler import BaseStrategy
class DomainSpecificStrategy(BaseStrategy):
def calculate_coverage(self, state: CrawlState) -> float:
def calculate_coverage(self, state: AdaptiveCrawlResult) -> float:
# Custom coverage calculation
# e.g., weight certain terms more heavily
pass
def calculate_consistency(self, state: CrawlState) -> float:
def calculate_consistency(self, state: AdaptiveCrawlResult) -> float:
# Custom consistency logic
# e.g., domain-specific validation
pass
def rank_links(self, links: List[Link], state: CrawlState) -> List[Link]:
def rank_links(self, links: List[Link], state: AdaptiveCrawlResult) -> List[Link]:
# Custom link ranking
# e.g., prioritize specific URL patterns
pass
@@ -359,7 +359,7 @@ class HybridStrategy(BaseStrategy):
URLPatternStrategy()
]
def calculate_confidence(self, state: CrawlState) -> float:
def calculate_confidence(self, state: AdaptiveCrawlResult) -> float:
# Weighted combination of strategies
scores = [s.calculate_confidence(state) for s in self.strategies]
weights = [0.5, 0.3, 0.2]

View File

@@ -49,46 +49,75 @@ from crawl4ai import JsonCssExtractionStrategy
from crawl4ai.cache_context import CacheMode
async def crawl_dynamic_content():
async with AsyncWebCrawler() as crawler:
session_id = "github_commits_session"
url = "https://github.com/microsoft/TypeScript/commits/main"
all_commits = []
url = "https://github.com/microsoft/TypeScript/commits/main"
session_id = "wait_for_session"
all_commits = []
# Define extraction schema
schema = {
"name": "Commit Extractor",
"baseSelector": "li.Box-sc-g0xbh4-0",
"fields": [{
"name": "title", "selector": "h4.markdown-title", "type": "text"
}],
}
extraction_strategy = JsonCssExtractionStrategy(schema)
js_next_page = """
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4');
if (commits.length > 0) {
window.lastCommit = commits[0].textContent.trim();
}
const button = document.querySelector('a[data-testid="pagination-next-button"]');
if (button) {button.click(); console.log('button clicked') }
"""
# JavaScript and wait configurations
js_next_page = """document.querySelector('a[data-testid="pagination-next-button"]').click();"""
wait_for = """() => document.querySelectorAll('li.Box-sc-g0xbh4-0').length > 0"""
# Crawl multiple pages
wait_for = """() => {
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4');
if (commits.length === 0) return false;
const firstCommit = commits[0].textContent.trim();
return firstCommit !== window.lastCommit;
}"""
schema = {
"name": "Commit Extractor",
"baseSelector": "li[data-testid='commit-row-item']",
"fields": [
{
"name": "title",
"selector": "h4 a",
"type": "text",
"transform": "strip",
},
],
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
browser_config = BrowserConfig(
verbose=True,
headless=False,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
for page in range(3):
config = CrawlerRunConfig(
url=url,
crawler_config = CrawlerRunConfig(
session_id=session_id,
css_selector="li[data-testid='commit-row-item']",
extraction_strategy=extraction_strategy,
js_code=js_next_page if page > 0 else None,
wait_for=wait_for if page > 0 else None,
js_only=page > 0,
cache_mode=CacheMode.BYPASS
cache_mode=CacheMode.BYPASS,
capture_console_messages=True,
)
result = await crawler.arun(config=config)
if result.success:
result = await crawler.arun(url=url, config=crawler_config)
if result.console_messages:
print(f"Page {page + 1} console messages:", result.console_messages)
if result.extracted_content:
# print(f"Page {page + 1} result:", result.extracted_content)
commits = json.loads(result.extracted_content)
all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits")
else:
print(f"Page {page + 1}: No content extracted")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Clean up session
await crawler.crawler_strategy.kill_session(session_id)
return all_commits
```
---

View File

@@ -91,13 +91,12 @@ async def crawl_twitter_timeline():
wait_after_scroll=1.0 # Twitter needs time to load
)
browser_config = BrowserConfig(headless=True) # Set to False to watch it work
config = CrawlerRunConfig(
virtual_scroll_config=virtual_config,
# Optional: Set headless=False to watch it work
# browser_config=BrowserConfig(headless=False)
virtual_scroll_config=virtual_config
)
async with AsyncWebCrawler() as crawler:
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://twitter.com/search?q=AI",
config=config
@@ -200,7 +199,7 @@ Use **scan_full_page** when:
Virtual Scroll works seamlessly with extraction strategies:
```python
from crawl4ai import LLMExtractionStrategy
from crawl4ai import LLMExtractionStrategy, LLMConfig
# Define extraction schema
schema = {
@@ -222,7 +221,7 @@ config = CrawlerRunConfig(
scroll_count=20
),
extraction_strategy=LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=schema
)
)

View File

@@ -27,7 +27,7 @@ async def digest(
start_url: str,
query: str,
resume_from: Optional[Union[str, Path]] = None
) -> CrawlState
) -> AdaptiveCrawlResult
```
#### Parameters
@@ -38,7 +38,7 @@ async def digest(
#### Returns
- **CrawlState**: The final crawl state containing all crawled URLs, knowledge base, and metrics
- **AdaptiveCrawlResult**: The final crawl state containing all crawled URLs, knowledge base, and metrics
#### Example
@@ -92,7 +92,7 @@ Access to the current crawl state.
```python
@property
def state(self) -> CrawlState
def state(self) -> AdaptiveCrawlResult
```
## Methods

View File

@@ -9,7 +9,7 @@ async def digest(
start_url: str,
query: str,
resume_from: Optional[Union[str, Path]] = None
) -> CrawlState
) -> AdaptiveCrawlResult
```
## Parameters
@@ -31,7 +31,7 @@ async def digest(
## Return Value
Returns a `CrawlState` object containing:
Returns a `AdaptiveCrawlResult` object containing:
- **crawled_urls** (`Set[str]`): All URLs that have been crawled
- **knowledge_base** (`List[CrawlResult]`): Collection of crawled pages with content

View File

@@ -10,9 +10,8 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -30,44 +29,41 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores
```python
from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
import asyncio
# Initialize with custom learning parameters
config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to use learned patterns
max_history=100, # Remember last 100 crawls per domain
learning_rate=0.2, # How quickly to adapt to changes
patterns_per_page=3, # Patterns to learn per page type
extraction_strategy='css' # 'css' or 'xpath'
)
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
async def main():
# Configure adaptive crawler
config = AdaptiveConfig(
strategy="statistical", # or "embedding" for semantic understanding
max_pages=10,
confidence_threshold=0.7, # Stop at 70% confidence
top_k_links=3, # Follow top 3 links per page
min_gain_threshold=0.05 # Need 5% information gain to continue
)
# Crawler identifies and stores patterns
if result.success:
state = adaptive_crawler.get_state("news.example.com")
print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
print("Starting adaptive crawl about Python decorators...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
asyncio.run(main())
```
**Expected Real-World Impact:**
@@ -92,9 +88,7 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
wait_after_scroll=1.0 # Let content load
)
# For e-commerce product grids (Instagram style)
@@ -102,8 +96,7 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid",
scroll_count=30,
scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
wait_after_scroll=1.5 # Images need time
)
# For news feeds with lazy loading
@@ -111,9 +104,7 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed",
scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
wait_after_scroll=0.5 # Wait for content to load
)
# Use it in your crawl
@@ -157,68 +148,63 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### The Three-Layer Scoring System
### Intelligent Link Analysis and Scoring
```python
from crawl4ai import LinkPreviewConfig
import asyncio
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
# What to analyze
include_internal=True,
include_external=True,
max_links=100, # Analyze top 100 links
# Relevance scoring
query="machine learning tutorials", # Your interest
score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
async def main():
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
include_internal=True,
include_external=False,
max_links=10,
concurrency=5,
query="python tutorial", # For contextual scoring
score_threshold=0.3,
verbose=True
)
)
# Use in your crawl
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
# Access scored and sorted links
for link in result.links["internal"][:10]: # Top 10 internal links
print(f"Score: {link['total_score']:.3f}")
print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
print(f" Description: {link['head_data']['meta']['description'][:100]}...")
# Access scored and sorted links
if result.success and result.links:
for link in result.links.get("internal", []):
text = link.get('text', 'No text')[:40]
print(
text,
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10",
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1",
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
```
**Scoring Components:**
1. **Intrinsic Score (0-10)**: Based on link quality indicators
1. **Intrinsic Score**: Based on link quality indicators
- Position on page (navigation, content, footer)
- Link attributes (rel, title, class names)
- Anchor text quality and length
- URL structure and depth
2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
2. **Contextual Score**: Relevance to your query using BM25 algorithm
- Keyword matching in link text and title
- Meta description analysis
- Content preview scoring
3. **Total Score**: Weighted combination for final ranking
3. **Total Score**: Combined score for final ranking
**Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -235,58 +221,34 @@ for link in result.links["internal"][:10]: # Top 10 internal links
### Technical Architecture
```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig
# Basic discovery - find all product pages
seeder_config = SeedingConfig(
# Discovery sources
source="sitemap+cc", # Sitemap + Common Crawl
# Filtering
pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation
live_check=True, # Verify URLs are alive
max_urls=5000, # Stop at 5000 URLs
# Performance
concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
)
async def main():
async with AsyncUrlSeeder() as seeder:
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter
extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
asyncio.run(main())
```
**Discovery Methods:**
@@ -309,35 +271,18 @@ This release includes significant performance improvements through optimized res
### What We Optimized
```python
# Before v0.7.0 (slow)
# Optimized crawling with v0.7.0 improvements
results = []
for url in urls:
result = await crawler.arun(url)
results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
result = await crawler.arun(
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
results.append(result)
```
**Performance Gains:**
@@ -347,24 +292,6 @@ async for result in crawler.arun_stream(large_url_list):
- **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes

View File

@@ -35,7 +35,7 @@ from crawl4ai import AsyncWebCrawler, AdaptiveCrawler
async def main():
async with AsyncWebCrawler() as crawler:
# Create an adaptive crawler
# Create an adaptive crawler (config is optional)
adaptive = AdaptiveCrawler(crawler)
# Start crawling with a query
@@ -59,13 +59,13 @@ async def main():
from crawl4ai import AdaptiveConfig
config = AdaptiveConfig(
confidence_threshold=0.7, # Stop when 70% confident (default: 0.8)
max_pages=20, # Maximum pages to crawl (default: 50)
top_k_links=3, # Links to follow per page (default: 5)
confidence_threshold=0.8, # Stop when 80% confident (default: 0.7)
max_pages=30, # Maximum pages to crawl (default: 20)
top_k_links=5, # Links to follow per page (default: 3)
min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1)
)
adaptive = AdaptiveCrawler(crawler, config=config)
adaptive = AdaptiveCrawler(crawler, config)
```
## Crawling Strategies
@@ -198,8 +198,8 @@ if result.metrics.get('is_irrelevant', False):
The confidence score (0-1) indicates how sufficient the gathered information is:
- **0.0-0.3**: Insufficient information, needs more crawling
- **0.3-0.6**: Partial information, may answer basic queries
- **0.6-0.8**: Good coverage, can answer most queries
- **0.8-1.0**: Excellent coverage, comprehensive information
- **0.6-0.7**: Good coverage, can answer most queries
- **0.7-1.0**: Excellent coverage, comprehensive information
### Statistics Display
@@ -257,9 +257,9 @@ new_adaptive.import_knowledge_base("knowledge_base.jsonl")
- Avoid overly broad queries
### 2. Threshold Tuning
- Start with default (0.8) for general use
- Lower to 0.6-0.7 for exploratory crawling
- Raise to 0.9+ for exhaustive coverage
- Start with default (0.7) for general use
- Lower to 0.5-0.6 for exploratory crawling
- Raise to 0.8+ for exhaustive coverage
### 3. Performance Optimization
- Use appropriate `max_pages` limits

View File

@@ -19,6 +19,7 @@ The new system uses a single `CacheMode` enum:
- `CacheMode.READ_ONLY`: Only read from cache
- `CacheMode.WRITE_ONLY`: Only write to cache
- `CacheMode.BYPASS`: Skip cache for this operation
- `CacheMode.SMART`: **NEW** - Intelligently validate cache with HEAD requests
## Migration Example
@@ -72,4 +73,128 @@ if __name__ == "__main__":
| `bypass_cache=True` | `cache_mode=CacheMode.BYPASS` |
| `disable_cache=True` | `cache_mode=CacheMode.DISABLED`|
| `no_cache_read=True` | `cache_mode=CacheMode.WRITE_ONLY` |
| `no_cache_write=True` | `cache_mode=CacheMode.READ_ONLY` |
| `no_cache_write=True` | `cache_mode=CacheMode.READ_ONLY` |
## SMART Cache Mode: Only Crawl When Changes
Starting from version 0.7.1, Crawl4AI introduces the **SMART cache mode** - an intelligent caching strategy that validates cached content before using it. This mode uses HTTP HEAD requests to check if content has changed, potentially saving 70-95% bandwidth on unchanged content.
### How SMART Mode Works
When you use `CacheMode.SMART`, Crawl4AI:
1. **Retrieves cached content** (if available)
2. **Sends a HEAD request** with conditional headers (ETag, Last-Modified)
3. **Validates the response**:
- If server returns `304 Not Modified` → uses cache
- If content changed → performs fresh crawl
- If headers indicate changes → performs fresh crawl
### Benefits
- **Bandwidth Efficient**: Only downloads full content when necessary
- **Always Fresh**: Ensures you get the latest content when it changes
- **Cost Effective**: Reduces API calls and bandwidth usage
- **Intelligent**: Uses multiple signals to detect changes (ETag, Last-Modified, Content-Length)
### Basic Usage
```python
import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.cache_context import CacheMode
from crawl4ai.async_configs import CrawlerRunConfig
async def smart_crawl():
async with AsyncWebCrawler(verbose=True) as crawler:
# First crawl - caches the content
config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
result1 = await crawler.arun(
url="https://example.com",
config=config
)
print(f"First crawl: {len(result1.html)} bytes")
# Second crawl - uses SMART mode
smart_config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
result2 = await crawler.arun(
url="https://example.com",
config=smart_config
)
print(f"SMART crawl: {len(result2.html)} bytes (from cache if unchanged)")
asyncio.run(smart_crawl())
```
### When to Use SMART Mode
SMART mode is ideal for:
- **Periodic crawling** of websites that update irregularly
- **News sites** where you want fresh content but avoid re-downloading unchanged pages
- **API endpoints** that provide proper caching headers
- **Large-scale crawling** where bandwidth costs are significant
### How It Detects Changes
SMART mode checks these signals in order:
1. **304 Not Modified** status (most reliable)
2. **Content-Digest** header (RFC 9530)
3. **Strong ETag** comparison
4. **Last-Modified** timestamp
5. **Content-Length** changes (as a hint)
### Example: News Site Monitoring
```python
async def monitor_news_site():
async with AsyncWebCrawler(verbose=True) as crawler:
config = CrawlerRunConfig(cache_mode=CacheMode.SMART)
# Check multiple times
for i in range(3):
result = await crawler.arun(
url="https://news.ycombinator.com",
config=config
)
# SMART mode will only re-crawl if content changed
print(f"Check {i+1}: Retrieved {len(result.html)} bytes")
await asyncio.sleep(300) # Wait 5 minutes
asyncio.run(monitor_news_site())
```
### Understanding SMART Mode Logs
When using SMART mode with `verbose=True`, you'll see informative logs:
```
[SMART] SMART cache: 304 Not Modified - Content unchanged - Using cache for https://example.com
[SMART] SMART cache: Content-Length changed (12345 -> 12789) - Re-crawling https://example.com
[SMART] SMART cache: No definitive cache headers matched - Assuming content changed - Re-crawling https://example.com
```
### Limitations
- Some servers don't properly support HEAD requests
- Dynamic content without proper cache headers will always be re-crawled
- Content changes must be reflected in HTTP headers for detection
### Advanced Example
For a complete example demonstrating SMART mode with both static and dynamic content, check out `docs/examples/smart_cache.py`.
## Cache Mode Reference
| Mode | Read from Cache | Write to Cache | Use Case |
|------|----------------|----------------|----------|
| `ENABLED` | ✓ | ✓ | Normal operation |
| `DISABLED` | ✗ | ✗ | No caching needed |
| `READ_ONLY` | ✓ | ✗ | Use existing cache only |
| `WRITE_ONLY` | ✗ | ✓ | Refresh cache only |
| `BYPASS` | ✗ | ✗ | Skip cache for this request |
| `SMART` | ✓* | ✓ | Validate before using cache |
*SMART mode reads from cache but validates it first with a HEAD request.

View File

@@ -37,6 +37,12 @@ This page provides a comprehensive list of example scripts that demonstrate vari
| Storage State | Tutorial on managing browser storage state for persistence. | [View Guide](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/storage_state_tutorial.md) |
| Network Console Capture | Demonstrates how to capture and analyze network requests and console logs. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/network_console_capture_example.py) |
## Caching & Performance
| Example | Description | Link |
|---------|-------------|------|
| SMART Cache Mode | Demonstrates the intelligent SMART cache mode that validates cached content using HEAD requests, saving 70-95% bandwidth while ensuring fresh content. | [View Code](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/smart_cache.py) |
## Extraction Strategies
| Example | Description | Link |

View File

@@ -125,7 +125,7 @@ Here's a full example you can copy, paste, and run immediately:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.async_configs import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
async def extract_link_heads_example():
"""
@@ -237,7 +237,7 @@ if __name__ == "__main__":
The `LinkPreviewConfig` class supports these options:
```python
from crawl4ai.async_configs import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
link_preview_config = LinkPreviewConfig(
# BASIC SETTINGS

View File

@@ -79,7 +79,7 @@ if __name__ == "__main__":
asyncio.run(main())
```
> IMPORTANT: By default cache mode is set to `CacheMode.ENABLED`. So to have fresh content, you need to set it to `CacheMode.BYPASS`
> IMPORTANT: By default cache mode is set to `CacheMode.ENABLED`. So to have fresh content, you need to set it to `CacheMode.BYPASS`. For intelligent caching that validates content before using cache, use the new `CacheMode.SMART` - it saves bandwidth while ensuring fresh content.
Well explore more advanced config in later tutorials (like enabling proxies, PDF output, multi-tab sessions, etc.). For now, just note how you pass these objects to manage crawling.

View File

@@ -137,7 +137,7 @@ async def smart_blog_crawler():
word_count_threshold=300 # Only substantial articles
)
# Extract URLs and stream results as they come
# Extract URLs and crawl them
tutorial_urls = [t["url"] for t in tutorials[:10]]
results = await crawler.arun_many(tutorial_urls, config=config)
@@ -231,7 +231,7 @@ Common Crawl is a massive public dataset that regularly crawls the entire web. I
```python
# Use both sources
config = SeedingConfig(source="cc+sitemap")
config = SeedingConfig(source="sitemap+cc")
urls = await seeder.urls("example.com", config)
```
@@ -241,13 +241,13 @@ The `SeedingConfig` object is your control panel. Here's everything you can conf
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `source` | str | "cc" | URL source: "cc" (Common Crawl), "sitemap", or "cc+sitemap" |
| `source` | str | "sitemap+cc" | URL source: "cc" (Common Crawl), "sitemap", or "sitemap+cc" |
| `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") |
| `extract_head` | bool | False | Extract metadata from page `<head>` |
| `live_check` | bool | False | Verify URLs are accessible |
| `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) |
| `concurrency` | int | 10 | Parallel workers for fetching |
| `hits_per_sec` | int | None | Rate limit for requests |
| `hits_per_sec` | int | 5 | Rate limit for requests |
| `force` | bool | False | Bypass cache, fetch fresh data |
| `verbose` | bool | False | Show detailed progress |
| `query` | str | None | Search query for BM25 scoring |
@@ -522,7 +522,7 @@ urls = await seeder.urls("docs.example.com", config)
```python
# Find specific products
config = SeedingConfig(
source="cc+sitemap", # Use both sources
source="sitemap+cc", # Use both sources
extract_head=True,
query="wireless headphones noise canceling",
scoring_method="bm25",
@@ -782,7 +782,7 @@ class ResearchAssistant:
# Step 1: Discover relevant URLs
config = SeedingConfig(
source="cc+sitemap", # Maximum coverage
source="sitemap+cc", # Maximum coverage
extract_head=True, # Get metadata
query=topic, # Research topic
scoring_method="bm25", # Smart scoring
@@ -832,7 +832,8 @@ class ResearchAssistant:
# Extract URLs and crawl all articles
article_urls = [article['url'] for article in top_articles]
results = []
async for result in await crawler.arun_many(article_urls, config=config):
crawl_results = await crawler.arun_many(article_urls, config=config)
async for result in crawl_results:
if result.success:
results.append({
'url': result.url,
@@ -933,10 +934,10 @@ config = SeedingConfig(concurrency=10, hits_per_sec=5)
# When crawling many URLs
async with AsyncWebCrawler() as crawler:
# Assuming urls is a list of URL strings
results = await crawler.arun_many(urls, config=config)
crawl_results = await crawler.arun_many(urls, config=config)
# Process as they arrive
async for result in results:
async for result in crawl_results:
process_immediately(result) # Don't wait for all
```
@@ -1020,7 +1021,7 @@ config = SeedingConfig(
# E-commerce product discovery
config = SeedingConfig(
source="cc+sitemap",
source="sitemap+cc",
pattern="*/product/*",
extract_head=True,
live_check=True

View File

@@ -28,7 +28,7 @@ from rich import box
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, AdaptiveCrawler, AdaptiveConfig, BrowserConfig, CacheMode
from crawl4ai import AsyncUrlSeeder, SeedingConfig
from crawl4ai.async_configs import LinkPreviewConfig, VirtualScrollConfig
from crawl4ai import LinkPreviewConfig, VirtualScrollConfig
from crawl4ai import c4a_compile, CompilationResult
# Initialize Rich console for beautiful output

View File

@@ -13,14 +13,13 @@ from crawl4ai import (
BrowserConfig,
CacheMode,
# New imports for v0.7.0
LinkPreviewConfig,
VirtualScrollConfig,
LinkPreviewConfig,
AdaptiveCrawler,
AdaptiveConfig,
AsyncUrlSeeder,
SeedingConfig,
c4a_compile,
CompilationResult
)
@@ -170,16 +169,16 @@ async def demo_url_seeder():
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*tutorial*", # URL pattern filter
pattern="*python*", # URL pattern filter
extract_head=True, # Get metadata
query="python async programming", # For relevance scoring
query="python tutorial", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("docs.python.org", config)
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
@@ -245,39 +244,6 @@ IF (EXISTS `.price-filter`) THEN CLICK `input[data-max-price="100"]`
print(f"❌ Compilation error: {result.first_error.message}")
async def demo_pdf_support():
"""
Demo 6: PDF Parsing Support
Shows how to extract content from PDF files.
Note: Requires 'pip install crawl4ai[pdf]'
"""
print("\n" + "="*60)
print("📄 DEMO 6: PDF Parsing Support")
print("="*60)
try:
# Check if PDF support is installed
import PyPDF2
# Example: Process a PDF URL
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
pdf=True, # Enable PDF generation
extract_text_from_pdf=True # Extract text content
)
print("PDF parsing is available!")
print("You can now crawl PDF URLs and extract their content.")
print("\nExample usage:")
print(' result = await crawler.arun("https://example.com/document.pdf")')
print(' pdf_text = result.extracted_content # Contains extracted text')
except ImportError:
print("⚠️ PDF support not installed.")
print("Install with: pip install crawl4ai[pdf]")
async def main():
"""Run all demos"""
print("\n🚀 Crawl4AI v0.7.0 Feature Demonstrations")
@@ -289,7 +255,6 @@ async def main():
("Virtual Scroll", demo_virtual_scroll),
("URL Seeder", demo_url_seeder),
("C4A Script", demo_c4a_script),
("PDF Support", demo_pdf_support)
]
for name, demo_func in demos:
@@ -309,7 +274,6 @@ async def main():
print("• Virtual Scroll: Capture all content from modern web pages")
print("• URL Seeder: Pre-discover and filter URLs efficiently")
print("• C4A Script: Simple language for complex automations")
print("• PDF Support: Extract content from PDF documents")
if __name__ == "__main__":

View File

@@ -30,8 +30,6 @@ dependencies = [
"pydantic>=2.10",
"pyOpenSSL>=24.3.0",
"psutil>=6.1.1",
"nltk>=3.9.1",
"playwright",
"rich>=13.9.4",
"cssselect>=1.2.0",
"httpx>=0.27.2",
@@ -44,7 +42,6 @@ dependencies = [
"brotli>=1.1.0",
"humanize>=4.10.0",
"lark>=1.2.2",
"sentence-transformers>=2.2.0",
"alphashape>=1.3.1",
"shapely>=2.0.0"
]
@@ -60,20 +57,20 @@ classifiers = [
]
[project.optional-dependencies]
pdf = ["PyPDF2"]
torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers"]
cosine = ["torch", "transformers", "nltk"]
sync = ["selenium"]
pdf = ["pypdf>=3.0.0"] # PyPDF2 is deprecated, use pypdf instead
torch = ["torch>=2.0.0", "nltk>=3.9.1", "scikit-learn>=1.3.0"]
transformer = ["transformers>=4.34.0", "tokenizers>=0.15.0", "sentence-transformers>=2.2.0"]
cosine = ["torch>=2.0.0", "transformers>=4.34.0", "nltk>=3.9.1", "sentence-transformers>=2.2.0"]
sync = ["selenium>=4.0.0"]
all = [
"PyPDF2",
"torch",
"nltk",
"scikit-learn",
"transformers",
"tokenizers",
"selenium",
"PyPDF2"
"pypdf>=3.0.0",
"torch>=2.0.0",
"nltk>=3.9.1",
"scikit-learn>=1.3.0",
"transformers>=4.34.0",
"tokenizers>=0.15.0",
"sentence-transformers>=2.2.0",
"selenium>=4.0.0"
]
[project.scripts]

View File

@@ -23,7 +23,7 @@ from crawl4ai import (
AsyncWebCrawler,
AdaptiveCrawler,
AdaptiveConfig,
CrawlState
AdaptiveCrawlResult
)

View File

@@ -13,7 +13,7 @@ import math
sys.path.append(str(Path(__file__).parent.parent))
from crawl4ai import AsyncWebCrawler
from crawl4ai.adaptive_crawler import CrawlState, StatisticalStrategy
from crawl4ai.adaptive_crawler import AdaptiveCrawlResult, StatisticalStrategy
from crawl4ai.models import CrawlResult
@@ -37,7 +37,7 @@ class ConfidenceTestHarness:
print("=" * 80)
# Initialize state
state = CrawlState(query=self.query)
state = AdaptiveCrawlResult(query=self.query)
# Create crawler
async with AsyncWebCrawler() as crawler:
@@ -107,7 +107,7 @@ class ConfidenceTestHarness:
state.metrics['prev_confidence'] = confidence
def _debug_coverage_calculation(self, state: CrawlState, query_terms: List[str]):
def _debug_coverage_calculation(self, state: AdaptiveCrawlResult, query_terms: List[str]):
"""Debug coverage calculation step by step"""
coverage_score = 0.0
max_possible_score = 0.0
@@ -136,7 +136,7 @@ class ConfidenceTestHarness:
new_coverage = self._calculate_coverage_new(state, query_terms)
print(f" → New Coverage: {new_coverage:.3f}")
def _calculate_coverage_new(self, state: CrawlState, query_terms: List[str]) -> float:
def _calculate_coverage_new(self, state: AdaptiveCrawlResult, query_terms: List[str]) -> float:
"""New coverage calculation without IDF"""
if not query_terms or state.total_documents == 0:
return 0.0

View File

@@ -15,7 +15,7 @@ import os
sys.path.append(str(Path(__file__).parent.parent.parent))
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
from crawl4ai.adaptive_crawler import EmbeddingStrategy, CrawlState
from crawl4ai.adaptive_crawler import EmbeddingStrategy, AdaptiveCrawlResult
from crawl4ai.models import CrawlResult
@@ -132,7 +132,7 @@ async def test_embedding_performance():
strategy.config = config
# Initialize state
state = CrawlState()
state = AdaptiveCrawlResult()
state.query = "async await coroutines event loops tasks"
# Start performance monitoring

View File

@@ -20,7 +20,7 @@ from crawl4ai import (
AsyncWebCrawler,
AdaptiveCrawler,
AdaptiveConfig,
CrawlState
AdaptiveCrawlResult
)
console = Console()

View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
Simple API Test for Crawl4AI Docker Server v0.7.0
Uses only built-in Python modules to test all endpoints.
"""
import urllib.request
import urllib.parse
import json
import time
import sys
from typing import Dict, List, Optional
# Configuration
BASE_URL = "http://localhost:11234" # Change to your server URL
TEST_TIMEOUT = 30
class SimpleApiTester:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.token = None
self.results = []
def log(self, message: str):
print(f"[INFO] {message}")
def test_get_endpoint(self, endpoint: str) -> Dict:
"""Test a GET endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
req = urllib.request.Request(url)
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "GET",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "GET",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
"""Test a POST endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "POST",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "POST",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def print_result(self, result: Dict):
"""Print a formatted test result"""
status_color = {
"PASS": "",
"FAIL": "",
"SKIP": "⏭️"
}
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
if result['status'] == 'FAIL' and 'error' in result:
print(f" Error: {result['error']}")
self.results.append(result)
def run_all_tests(self):
"""Run all API tests"""
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
print(f"📡 Testing server at: {self.base_url}")
print("=" * 60)
# # Test basic endpoints
# print("\n=== BASIC ENDPOINTS ===")
# # Health check
# result = self.test_get_endpoint("/health")
# self.print_result(result)
# # Schema endpoint
# result = self.test_get_endpoint("/schema")
# self.print_result(result)
# # Metrics endpoint
# result = self.test_get_endpoint("/metrics")
# self.print_result(result)
# # Root redirect
# result = self.test_get_endpoint("/")
# self.print_result(result)
# # Test authentication
# print("\n=== AUTHENTICATION ===")
# # Get token
# token_payload = {"email": "test@example.com"}
# result = self.test_post_endpoint("/token", token_payload)
# self.print_result(result)
# # Extract token if successful
# if result['status'] == 'PASS' and 'data' in result:
# token = result['data'].get('access_token')
# if token:
# self.token = token
# self.log(f"Successfully obtained auth token: {token[:20]}...")
# Test core APIs
print("\n=== CORE APIs ===")
test_url = "https://example.com"
# Test markdown endpoint
md_payload = {
"url": test_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", md_payload)
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
self.print_result(result)
# Test screenshot endpoint
screenshot_payload = {
"url": test_url,
"screenshot_wait_for": 2
}
result = self.test_post_endpoint("/screenshot", screenshot_payload)
self.print_result(result)
# Test PDF endpoint
pdf_payload = {"url": test_url}
result = self.test_post_endpoint("/pdf", pdf_payload)
self.print_result(result)
# Test JavaScript execution
js_payload = {
"url": test_url,
"scripts": ["(() => document.title)()"]
}
result = self.test_post_endpoint("/execute_js", js_payload)
self.print_result(result)
# Test crawl endpoint
crawl_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)
self.print_result(result)
# Test LLM endpoint
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
result = self.test_get_endpoint(llm_endpoint)
self.print_result(result)
# Test ask endpoint
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
result = self.test_get_endpoint(ask_endpoint)
print(result)
self.print_result(result)
# Test job APIs
print("\n=== JOB APIs ===")
# Test LLM job
llm_job_payload = {
"url": test_url,
"q": "Extract main content",
"cache": False
}
result = self.test_post_endpoint("/llm/job", llm_job_payload)
self.print_result(result)
# Test crawl job
crawl_job_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
self.print_result(result)
# Test MCP
print("\n=== MCP APIs ===")
# Test MCP schema
result = self.test_get_endpoint("/mcp/schema")
self.print_result(result)
# Test error handling
print("\n=== ERROR HANDLING ===")
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
result = self.test_post_endpoint("/md", invalid_payload)
self.print_result(result)
# Test invalid endpoint
result = self.test_get_endpoint("/nonexistent")
self.print_result(result)
# Print summary
self.print_summary()
def print_summary(self):
"""Print test results summary"""
print("\n" + "=" * 60)
print("📊 TEST RESULTS SUMMARY")
print("=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r['status'] == 'PASS')
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
print(f"Total Tests: {total}")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
if failed > 0:
print("\n❌ FAILED TESTS:")
for result in self.results:
if result['status'] == 'FAIL':
print(f"{result['method']} {result['endpoint']}")
if 'error' in result:
print(f" Error: {result['error']}")
# Performance statistics
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
if response_times:
avg_time = sum(response_times) / len(response_times)
max_time = max(response_times)
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
print(f"⏱️ Max Response Time: {max_time:.3f}s")
# Save detailed report
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
with open(report_file, 'w') as f:
json.dump({
"timestamp": time.time(),
"server_url": self.base_url,
"version": "0.7.0",
"summary": {
"total": total,
"passed": passed,
"failed": failed
},
"results": self.results
}, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
def main():
"""Main test runner"""
import argparse
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
args = parser.parse_args()
tester = SimpleApiTester(args.url)
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n🛑 Test suite interrupted by user")
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,7 @@ Test script for Link Extractor functionality
from crawl4ai.models import Link
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.async_configs import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
import asyncio
import sys
import os
@@ -237,7 +237,7 @@ def test_config_examples():
print(f" {key}: {value}")
print(" Usage:")
print(" from crawl4ai.async_configs import LinkPreviewConfig")
print(" from crawl4ai import LinkPreviewConfig")
print(" config = CrawlerRunConfig(")
print(" link_preview_config=LinkPreviewConfig(")
for key, value in config_dict.items():

View File

@@ -0,0 +1,211 @@
import asyncio
import httpx
import email.utils
from datetime import datetime
import json
from typing import Dict, Optional
import time
async def should_crawl(url: str, cache: Optional[Dict[str, str]] = None) -> bool:
"""
Check if a URL should be crawled based on HEAD request headers.
Args:
url: The URL to check
cache: Previous cache data containing etag, last_modified, digest, content_length
Returns:
True if the page has changed and should be crawled, False otherwise
"""
if cache is None:
cache = {}
headers = {
"Accept-Encoding": "identity",
"Want-Content-Digest": "sha-256",
}
if cache.get("etag"):
headers["If-None-Match"] = cache["etag"]
if cache.get("last_modified"):
headers["If-Modified-Since"] = cache["last_modified"]
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=5) as client:
response = await client.head(url, headers=headers)
# 304 Not Modified - content hasn't changed
if response.status_code == 304:
print(f"✓ 304 Not Modified - No need to crawl {url}")
return False
h = response.headers
# Check Content-Digest (most reliable)
if h.get("content-digest") and h["content-digest"] == cache.get("digest"):
print(f"✓ Content-Digest matches - No need to crawl {url}")
return False
# Check strong ETag
if h.get("etag") and h["etag"].startswith('"') and h["etag"] == cache.get("etag"):
print(f"✓ Strong ETag matches - No need to crawl {url}")
return False
# Check Last-Modified
if h.get("last-modified") and cache.get("last_modified"):
try:
lm_new = email.utils.parsedate_to_datetime(h["last-modified"])
lm_old = email.utils.parsedate_to_datetime(cache["last_modified"])
if lm_new <= lm_old:
print(f"✓ Last-Modified not newer - No need to crawl {url}")
return False
except:
pass
# Check Content-Length (weakest signal - only as a hint, not definitive)
# Note: Same content length doesn't mean same content!
# This should only be used when no other signals are available
if h.get("content-length") and cache.get("content_length"):
try:
if int(h["content-length"]) != cache.get("content_length"):
print(f"✗ Content-Length changed - Should crawl {url}")
return True
else:
print(f"⚠️ Content-Length unchanged but content might have changed - Should crawl {url}")
return True # When in doubt, crawl!
except:
pass
print(f"✗ Content has changed - Should crawl {url}")
return True
except Exception as e:
print(f"✗ Error checking {url}: {e}")
return True # On error, assume we should crawl
async def crawl_page(url: str) -> Dict[str, str]:
"""
Simulate crawling a page and extracting cache headers.
"""
print(f"\n🕷️ Crawling {url}...")
async with httpx.AsyncClient(follow_redirects=True, timeout=10) as client:
response = await client.get(url)
cache_data = {}
h = response.headers
if h.get("etag"):
cache_data["etag"] = h["etag"]
print(f" Stored ETag: {h['etag']}")
if h.get("last-modified"):
cache_data["last_modified"] = h["last-modified"]
print(f" Stored Last-Modified: {h['last-modified']}")
if h.get("content-digest"):
cache_data["digest"] = h["content-digest"]
print(f" Stored Content-Digest: {h['content-digest']}")
if h.get("content-length"):
cache_data["content_length"] = int(h["content-length"])
print(f" Stored Content-Length: {h['content-length']}")
print(f" Response size: {len(response.content)} bytes")
return cache_data
async def test_static_site():
"""Test with a static website (example.com)"""
print("=" * 60)
print("Testing with static site: example.com")
print("=" * 60)
url = "https://example.com"
# First crawl - always happens
cache = await crawl_page(url)
# Wait a bit
await asyncio.sleep(2)
# Second check - should not need to crawl
print(f"\n📊 Checking if we need to re-crawl...")
needs_crawl = await should_crawl(url, cache)
if not needs_crawl:
print("✅ Correctly identified: No need to re-crawl static content")
else:
print("❌ Unexpected: Static content flagged as changed")
async def test_dynamic_site():
"""Test with dynamic websites that change frequently"""
print("\n" + "=" * 60)
print("Testing with dynamic sites")
print("=" * 60)
# Test with a few dynamic sites
dynamic_sites = [
"https://api.github.com/", # GitHub API root (changes with rate limit info)
"https://worldtimeapi.org/api/timezone/UTC", # Current time API
"https://httpbin.org/uuid", # Generates new UUID each request
]
for url in dynamic_sites:
print(f"\n🔄 Testing {url}")
try:
# First crawl
cache = await crawl_page(url)
# Wait a bit
await asyncio.sleep(2)
# Check if content changed
print(f"\n📊 Checking if we need to re-crawl...")
needs_crawl = await should_crawl(url, cache)
if needs_crawl:
print("✅ Correctly identified: Dynamic content has changed")
else:
print("⚠️ Note: Dynamic content appears unchanged (might have caching)")
except Exception as e:
print(f"❌ Error testing {url}: {e}")
async def test_conditional_get():
"""Test conditional GET fallback when HEAD doesn't provide enough info"""
print("\n" + "=" * 60)
print("Testing conditional GET scenario")
print("=" * 60)
url = "https://httpbin.org/etag/test-etag-123"
# Simulate a scenario where we have an ETag
cache = {"etag": '"test-etag-123"'}
print(f"Testing with cached ETag: {cache['etag']}")
needs_crawl = await should_crawl(url, cache)
if not needs_crawl:
print("✅ ETag matched - no crawl needed")
else:
print("✅ ETag didn't match - crawl needed")
async def main():
"""Run all tests"""
print("🚀 Starting HEAD request change detection tests\n")
await test_static_site()
await test_dynamic_site()
await test_conditional_get()
print("\n✨ All tests completed!")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,186 @@
import asyncio
import httpx
import email.utils
from datetime import datetime
import json
from typing import Dict, Optional
import time
async def should_crawl(url: str, cache: Optional[Dict[str, str]] = None) -> bool:
"""
Check if a URL should be crawled based on HEAD request headers.
"""
if cache is None:
cache = {}
headers = {
"Accept-Encoding": "identity",
"Want-Content-Digest": "sha-256",
"User-Agent": "Mozilla/5.0 (compatible; crawl4ai/1.0)"
}
if cache.get("etag"):
headers["If-None-Match"] = cache["etag"]
if cache.get("last_modified"):
headers["If-Modified-Since"] = cache["last_modified"]
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=5) as client:
response = await client.head(url, headers=headers)
print(f"\nHEAD Response Status: {response.status_code}")
print(f"Headers received: {dict(response.headers)}")
# 304 Not Modified
if response.status_code == 304:
return False
h = response.headers
# Check headers in order of reliability
if h.get("content-digest") and h["content-digest"] == cache.get("digest"):
return False
if h.get("etag") and h["etag"].startswith('"') and h["etag"] == cache.get("etag"):
return False
if h.get("last-modified") and cache.get("last_modified"):
try:
lm_new = email.utils.parsedate_to_datetime(h["last-modified"])
lm_old = email.utils.parsedate_to_datetime(cache["last_modified"])
if lm_new <= lm_old:
return False
except:
pass
# Check Content-Length (weakest signal - only as a hint, not definitive)
# Note: Same content length doesn't mean same content!
if h.get("content-length") and cache.get("content_length"):
try:
if int(h["content-length"]) != cache.get("content_length"):
return True # Length changed, likely content changed
# If length is same, we can't be sure - default to crawling
except:
pass
return True
except Exception as e:
print(f"Error during HEAD request: {e}")
return True
async def test_with_changing_content():
"""Test with a real changing website"""
print("=" * 60)
print("Testing with real changing content")
print("=" * 60)
# Using httpbin's cache endpoint that changes after specified seconds
url = "https://httpbin.org/cache/1" # Cache for 1 second
print(f"\n1⃣ First request to {url}")
async with httpx.AsyncClient() as client:
response1 = await client.get(url)
cache = {}
if response1.headers.get("etag"):
cache["etag"] = response1.headers["etag"]
if response1.headers.get("last-modified"):
cache["last_modified"] = response1.headers["last-modified"]
print(f"Cached ETag: {cache.get('etag', 'None')}")
print(f"Cached Last-Modified: {cache.get('last_modified', 'None')}")
# Check immediately (should not need crawl)
print(f"\n2⃣ Checking immediately after first request...")
needs_crawl = await should_crawl(url, cache)
print(f"Result: {'NEED TO CRAWL' if needs_crawl else 'NO NEED TO CRAWL'}")
# Wait for cache to expire
print(f"\n⏳ Waiting 2 seconds for cache to expire...")
await asyncio.sleep(2)
# Check again (should need crawl now)
print(f"\n3⃣ Checking after cache expiry...")
needs_crawl = await should_crawl(url, cache)
print(f"Result: {'NEED TO CRAWL' if needs_crawl else 'NO NEED TO CRAWL'}")
async def test_news_website():
"""Test with a news website that updates frequently"""
print("\n" + "=" * 60)
print("Testing with news website (BBC)")
print("=" * 60)
url = "https://www.bbc.com"
print(f"\n1⃣ First crawl of {url}")
async with httpx.AsyncClient() as client:
response1 = await client.get(url)
cache = {}
h = response1.headers
if h.get("etag"):
cache["etag"] = h["etag"]
print(f"Stored ETag: {h['etag'][:50]}...")
if h.get("last-modified"):
cache["last_modified"] = h["last-modified"]
print(f"Stored Last-Modified: {h['last-modified']}")
if h.get("content-length"):
cache["content_length"] = int(h["content-length"])
print(f"Stored Content-Length: {h['content-length']}")
# Check multiple times
for i in range(3):
await asyncio.sleep(5)
print(f"\n📊 Check #{i+2} - {datetime.now().strftime('%H:%M:%S')}")
needs_crawl = await should_crawl(url, cache)
print(f"Result: {'NEED TO CRAWL ✓' if needs_crawl else 'NO NEED TO CRAWL ✗'}")
async def test_api_endpoint():
"""Test with an API that provides proper caching headers"""
print("\n" + "=" * 60)
print("Testing with GitHub API")
print("=" * 60)
# GitHub user API (updates when user data changes)
url = "https://api.github.com/users/github"
headers = {"User-Agent": "crawl4ai-test"}
print(f"\n1⃣ First request to {url}")
async with httpx.AsyncClient() as client:
response1 = await client.get(url, headers=headers)
cache = {}
h = response1.headers
if h.get("etag"):
cache["etag"] = h["etag"]
print(f"Stored ETag: {h['etag']}")
if h.get("last-modified"):
cache["last_modified"] = h["last-modified"]
print(f"Stored Last-Modified: {h['last-modified']}")
# Print rate limit info
print(f"Rate Limit Remaining: {h.get('x-ratelimit-remaining', 'N/A')}")
# Check if content changed
print(f"\n2⃣ Checking if content changed...")
needs_crawl = await should_crawl(url, cache)
print(f"Result: {'NEED TO CRAWL' if needs_crawl else 'NO NEED TO CRAWL (content unchanged)'}")
async def main():
"""Run all tests"""
print("🚀 Testing HEAD request change detection with real websites\n")
await test_with_changing_content()
await test_news_website()
await test_api_endpoint()
print("\n✨ All tests completed!")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,196 @@
"""
Test SMART cache mode functionality in crawl4ai.
This test demonstrates:
1. Initial crawl with caching enabled
2. Re-crawl with SMART mode on static content (should use cache)
3. Re-crawl with SMART mode on dynamic content (should re-crawl)
"""
import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig
from crawl4ai.cache_context import CacheMode
import time
from datetime import datetime
async def test_smart_cache_mode():
"""Test the SMART cache mode with both static and dynamic URLs"""
print("=" * 60)
print("Testing SMART Cache Mode")
print("=" * 60)
# URLs for testing
static_url = "https://example.com" # Rarely changes
dynamic_url = "https://httpbin.org/uuid" # Changes every request
async with AsyncWebCrawler(verbose=True) as crawler:
# Test 1: Initial crawl with caching enabled
print("\n1⃣ Initial crawl with ENABLED cache mode")
print("-" * 40)
# Crawl static URL
config_static = CrawlerRunConfig(
cache_mode=CacheMode.ENABLED,
verbose=True
)
result_static_1 = await crawler.arun(url=static_url, config=config_static)
print(f"✓ Static URL crawled: {len(result_static_1.html)} bytes")
print(f" Response headers: {list(result_static_1.response_headers.keys())[:5]}...")
# Crawl dynamic URL
config_dynamic = CrawlerRunConfig(
cache_mode=CacheMode.ENABLED,
verbose=True
)
result_dynamic_1 = await crawler.arun(url=dynamic_url, config=config_dynamic)
print(f"✓ Dynamic URL crawled: {len(result_dynamic_1.html)} bytes")
dynamic_content_1 = result_dynamic_1.html
# Wait a bit
await asyncio.sleep(2)
# Test 2: Re-crawl static URL with SMART mode
print("\n2⃣ Re-crawl static URL with SMART cache mode")
print("-" * 40)
config_smart = CrawlerRunConfig(
cache_mode=CacheMode.SMART, # This will be our new mode
verbose=True
)
start_time = time.time()
result_static_2 = await crawler.arun(url=static_url, config=config_smart)
elapsed = time.time() - start_time
print(f"✓ Static URL with SMART mode completed in {elapsed:.2f}s")
print(f" Should use cache (content unchanged)")
print(f" HTML length: {len(result_static_2.html)} bytes")
# Test 3: Re-crawl dynamic URL with SMART mode
print("\n3⃣ Re-crawl dynamic URL with SMART cache mode")
print("-" * 40)
start_time = time.time()
result_dynamic_2 = await crawler.arun(url=dynamic_url, config=config_smart)
elapsed = time.time() - start_time
dynamic_content_2 = result_dynamic_2.html
print(f"✓ Dynamic URL with SMART mode completed in {elapsed:.2f}s")
print(f" Should re-crawl (content changes every request)")
print(f" HTML length: {len(result_dynamic_2.html)} bytes")
print(f" Content changed: {dynamic_content_1 != dynamic_content_2}")
# Test 4: Test with a news website (content changes frequently)
print("\n4⃣ Testing with news website")
print("-" * 40)
news_url = "https://news.ycombinator.com"
# First crawl
result_news_1 = await crawler.arun(
url=news_url,
config=CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
)
print(f"✓ News site initial crawl: {len(result_news_1.html)} bytes")
# Wait a bit
await asyncio.sleep(5)
# Re-crawl with SMART mode
start_time = time.time()
result_news_2 = await crawler.arun(
url=news_url,
config=CrawlerRunConfig(cache_mode=CacheMode.SMART)
)
elapsed = time.time() - start_time
print(f"✓ News site SMART mode completed in {elapsed:.2f}s")
print(f" Content length changed: {len(result_news_1.html) != len(result_news_2.html)}")
# Summary
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
print("✅ SMART cache mode should:")
print(" - Use cache for static content (example.com)")
print(" - Re-crawl dynamic content (httpbin.org/uuid)")
print(" - Make intelligent decisions based on HEAD requests")
print(" - Save bandwidth on unchanged content")
async def test_smart_cache_edge_cases():
"""Test edge cases for SMART cache mode"""
print("\n" + "=" * 60)
print("Testing SMART Cache Mode Edge Cases")
print("=" * 60)
async with AsyncWebCrawler(verbose=True) as crawler:
# Test with URL that doesn't support HEAD
print("\n🔧 Testing URL with potential HEAD issues")
print("-" * 40)
# Some servers don't handle HEAD well
problematic_url = "https://httpbin.org/status/200"
# Initial crawl
await crawler.arun(
url=problematic_url,
config=CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
)
# Try SMART mode
result = await crawler.arun(
url=problematic_url,
config=CrawlerRunConfig(cache_mode=CacheMode.SMART)
)
print(f"✓ Handled potentially problematic URL: {result.success}")
# Test with URL that has no caching headers
print("\n🔧 Testing URL with no cache headers")
print("-" * 40)
no_cache_url = "https://httpbin.org/html"
# Initial crawl
await crawler.arun(
url=no_cache_url,
config=CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
)
# SMART mode should handle gracefully
result = await crawler.arun(
url=no_cache_url,
config=CrawlerRunConfig(cache_mode=CacheMode.SMART)
)
print(f"✓ Handled URL with no cache headers: {result.success}")
async def main():
"""Run all tests"""
try:
# Run main test
await test_smart_cache_mode()
# Run edge case tests
await test_smart_cache_edge_cases()
print("\n✨ All tests completed!")
except Exception as e:
print(f"\n❌ Error during testing: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# Note: This test will fail until SMART mode is implemented
print("⚠️ Note: This test expects CacheMode.SMART to be implemented")
print("⚠️ It will fail with AttributeError until the feature is added\n")
asyncio.run(main())

View File

@@ -0,0 +1,69 @@
"""
Simple test for SMART cache mode functionality.
"""
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig
from crawl4ai.cache_context import CacheMode
import time
async def test_smart_cache():
"""Test SMART cache mode with a simple example"""
print("Testing SMART Cache Mode")
print("-" * 40)
# Test URL
url = "https://example.com"
async with AsyncWebCrawler(verbose=True) as crawler:
# First crawl with normal caching
print("\n1. Initial crawl with ENABLED mode:")
config1 = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
result1 = await crawler.arun(url=url, config=config1)
print(f" Crawled: {len(result1.html)} bytes")
print(f" Headers: {list(result1.response_headers.keys())[:3]}...")
# Wait a moment
await asyncio.sleep(2)
# Re-crawl with SMART mode
print("\n2. Re-crawl with SMART mode:")
config2 = CrawlerRunConfig(cache_mode=CacheMode.SMART)
start = time.time()
result2 = await crawler.arun(url=url, config=config2)
elapsed = time.time() - start
print(f" Time: {elapsed:.2f}s")
print(f" Result: {len(result2.html)} bytes")
print(f" Should use cache (content unchanged)")
# Test with dynamic content
print("\n3. Testing with dynamic URL:")
dynamic_url = "https://httpbin.org/uuid"
# First crawl
config3 = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
result3 = await crawler.arun(url=dynamic_url, config=config3)
content1 = result3.html
# Re-crawl with SMART
config4 = CrawlerRunConfig(cache_mode=CacheMode.SMART)
result4 = await crawler.arun(url=dynamic_url, config=config4)
content2 = result4.html
print(f" Content changed: {content1 != content2}")
print(f" Should re-crawl (dynamic content)")
if __name__ == "__main__":
print(f"Python path: {sys.path[0]}")
print(f"CacheMode values: {[e.value for e in CacheMode]}")
print()
asyncio.run(test_smart_cache())