Compare commits

..

3 Commits

Author SHA1 Message Date
AHMET YILMAZ
65902a4773 feat: Enhance stealth compatibility with new and legacy APIs, add configuration support 2025-07-16 17:41:47 +08:00
AHMET YILMAZ
5c13baf574 feat: Add stealth option to BrowserConfig for enhanced browser behavior 2025-07-15 15:48:23 +08:00
AHMET YILMAZ
d2759824ef fix: Update playwright-stealth to v2.0.0+ compatibility
Fixes #1273

- Replace deprecated stealth_async import with Stealth class
- Add stealth flag to BrowserConfig (default: true)
- Update async_crawler_strategy to use Stealth().apply_stealth_async()
- Remove obsolete StealthConfig from browser_manager
- Maintain backward compatibility with existing stealth functionality

This fixes compatibility issues with playwright-stealth v2.0.0+ where the API changed from stealth_async function to Stealth class.

test: Add comprehensive tests for playwright-stealth v2.0.0+ compatibility

- Test Stealth class import and instantiation
- Test apply_stealth_async method availability
- Test BrowserConfig stealth flag functionality
- Test stealth flag serialization
- Verify backward compatibility with existing stealth functionality
2025-07-15 15:31:15 +08:00
29 changed files with 850 additions and 1072 deletions

View File

@@ -1,141 +0,0 @@
name: Release Pipeline
on:
push:
tags:
- 'v*'
- '!test-v*' # Exclude test tags
jobs:
release:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Extract version from tag
id: get_version
run: |
TAG_VERSION=${GITHUB_REF#refs/tags/v}
echo "VERSION=$TAG_VERSION" >> $GITHUB_OUTPUT
echo "Releasing version: $TAG_VERSION"
- name: Install package dependencies
run: |
pip install -e .
- name: Check version consistency
run: |
TAG_VERSION=${{ steps.get_version.outputs.VERSION }}
PACKAGE_VERSION=$(python -c "from crawl4ai.__version__ import __version__; print(__version__)")
echo "Tag version: $TAG_VERSION"
echo "Package version: $PACKAGE_VERSION"
if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then
echo "❌ Version mismatch! Tag: $TAG_VERSION, Package: $PACKAGE_VERSION"
echo "Please update crawl4ai/__version__.py to match the tag version"
exit 1
fi
echo "✅ Version check passed: $TAG_VERSION"
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
- name: Build package
run: python -m build
- name: Check package
run: twine check dist/*
- name: Upload to PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: |
echo "📦 Uploading to PyPI..."
twine upload dist/*
echo "✅ Package uploaded to https://pypi.org/project/crawl4ai/"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
- name: Extract major and minor versions
id: versions
run: |
VERSION=${{ steps.get_version.outputs.VERSION }}
MAJOR=$(echo $VERSION | cut -d. -f1)
MINOR=$(echo $VERSION | cut -d. -f1-2)
echo "MAJOR=$MAJOR" >> $GITHUB_OUTPUT
echo "MINOR=$MINOR" >> $GITHUB_OUTPUT
- name: Build and push Docker images
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}
unclecode/crawl4ai:${{ steps.versions.outputs.MINOR }}
unclecode/crawl4ai:${{ steps.versions.outputs.MAJOR }}
unclecode/crawl4ai:latest
platforms: linux/amd64,linux/arm64
- name: Create GitHub Release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: v${{ steps.get_version.outputs.VERSION }}
release_name: Release v${{ steps.get_version.outputs.VERSION }}
body: |
## 🎉 Crawl4AI v${{ steps.get_version.outputs.VERSION }} Released!
### 📦 Installation
**PyPI:**
```bash
pip install crawl4ai==${{ steps.get_version.outputs.VERSION }}
```
**Docker:**
```bash
docker pull unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}
docker pull unclecode/crawl4ai:latest
```
### 📝 What's Changed
See [CHANGELOG.md](https://github.com/${{ github.repository }}/blob/main/CHANGELOG.md) for details.
draft: false
prerelease: false
- name: Summary
run: |
echo "## 🚀 Release Complete!" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📦 PyPI Package" >> $GITHUB_STEP_SUMMARY
echo "- Version: ${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "- URL: https://pypi.org/project/crawl4ai/" >> $GITHUB_STEP_SUMMARY
echo "- Install: \`pip install crawl4ai==${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🐳 Docker Images" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.versions.outputs.MINOR }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.versions.outputs.MAJOR }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:latest\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📋 GitHub Release" >> $GITHUB_STEP_SUMMARY
echo "https://github.com/${{ github.repository }}/releases/tag/v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY

View File

@@ -1,116 +0,0 @@
name: Test Release Pipeline
on:
push:
tags:
- 'test-v*'
jobs:
test-release:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Extract version from tag
id: get_version
run: |
TAG_VERSION=${GITHUB_REF#refs/tags/test-v}
echo "VERSION=$TAG_VERSION" >> $GITHUB_OUTPUT
echo "Testing with version: $TAG_VERSION"
- name: Install package dependencies
run: |
pip install -e .
- name: Check version consistency
run: |
TAG_VERSION=${{ steps.get_version.outputs.VERSION }}
PACKAGE_VERSION=$(python -c "from crawl4ai.__version__ import __version__; print(__version__)")
echo "Tag version: $TAG_VERSION"
echo "Package version: $PACKAGE_VERSION"
if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then
echo "❌ Version mismatch! Tag: $TAG_VERSION, Package: $PACKAGE_VERSION"
echo "Please update crawl4ai/__version__.py to match the tag version"
exit 1
fi
echo "✅ Version check passed: $TAG_VERSION"
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
- name: Build package
run: python -m build
- name: Check package
run: twine check dist/*
- name: Upload to Test PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.TEST_PYPI_TOKEN }}
run: |
echo "📦 Uploading to Test PyPI..."
twine upload --repository testpypi dist/* || {
if [ $? -eq 1 ]; then
echo "⚠️ Upload failed - likely version already exists on Test PyPI"
echo "Continuing anyway for test purposes..."
else
exit 1
fi
}
echo "✅ Test PyPI step complete"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Docker test images
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}
unclecode/crawl4ai:test-latest
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Summary
run: |
echo "## 🎉 Test Release Complete!" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📦 Test PyPI Package" >> $GITHUB_STEP_SUMMARY
echo "- Version: ${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "- URL: https://test.pypi.org/project/crawl4ai/" >> $GITHUB_STEP_SUMMARY
echo "- Install: \`pip install -i https://test.pypi.org/simple/ crawl4ai==${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🐳 Docker Test Images" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:test-latest\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🧹 Cleanup Commands" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`bash" >> $GITHUB_STEP_SUMMARY
echo "# Remove test tag" >> $GITHUB_STEP_SUMMARY
echo "git tag -d test-v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "git push origin :test-v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "# Remove Docker test images" >> $GITHUB_STEP_SUMMARY
echo "docker rmi unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "docker rmi unclecode/crawl4ai:test-latest" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`" >> $GITHUB_STEP_SUMMARY

View File

@@ -28,7 +28,7 @@ Crawl4AI is the #1 trending GitHub repository, actively maintained by a vibrant
[✨ Check out latest update v0.7.0](#-recent-updates) [✨ Check out latest update v0.7.0](#-recent-updates)
🎉 **Version 0.7.0 is now available!** The Adaptive Intelligence Update introduces groundbreaking features: Adaptive Crawling that learns website patterns, Virtual Scroll support for infinite pages, intelligent Link Preview with 3-layer scoring, Async URL Seeder for massive discovery, and significant performance improvements. [Read the release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.0.md) 🎉 **Version 0.7.0 is now available!** The Adaptive Intelligence Update introduces groundbreaking features: Adaptive Crawling that learns website patterns, Virtual Scroll support for infinite pages, intelligent Link Preview with 3-layer scoring, Async URL Seeder for massive discovery, and significant performance improvements. [Read the release notes →](https://docs.crawl4ai.com/blog/release-v0.7.0)
<details> <details>
<summary>🤓 <strong>My Personal Story</strong></summary> <summary>🤓 <strong>My Personal Story</strong></summary>
@@ -523,18 +523,15 @@ async def test_news_crawl():
- **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically: - **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically:
```python ```python
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to stop crawling confidence_threshold=0.7,
max_depth=5, # Maximum crawl depth max_history=100,
max_pages=20, # Maximum number of pages to crawl learning_rate=0.2
strategy="statistical"
) )
async with AsyncWebCrawler() as crawler: result = await crawler.arun(
adaptive_crawler = AdaptiveCrawler(crawler, config) "https://news.example.com",
state = await adaptive_crawler.digest( config=CrawlerRunConfig(adaptive_config=config)
start_url="https://news.example.com", )
query="latest news content"
)
# Crawler learns patterns and improves extraction over time # Crawler learns patterns and improves extraction over time
``` ```

View File

@@ -3,7 +3,7 @@ import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig and VirtualScrollConfig here # MODIFIED: Add SeedingConfig and VirtualScrollConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig
from .content_scraping_strategy import ( from .content_scraping_strategy import (
ContentScrapingStrategy, ContentScrapingStrategy,
@@ -173,7 +173,6 @@ __all__ = [
"CompilationResult", "CompilationResult",
"ValidationResult", "ValidationResult",
"ErrorDetail", "ErrorDetail",
"LinkPreviewConfig"
] ]

View File

@@ -1,7 +1,7 @@
# crawl4ai/__version__.py # crawl4ai/__version__.py
# This is the version that will be used for stable releases # This is the version that will be used for stable releases
__version__ = "0.7.2" __version__ = "0.7.0"
# For nightly builds, this gets set during build process # For nightly builds, this gets set during build process
__nightly_version__ = None __nightly_version__ = None

View File

@@ -12,6 +12,20 @@ from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO from io import BytesIO
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
import hashlib import hashlib
# Backward compatible stealth import
try:
# Try new tf-playwright-stealth API (Stealth class)
from playwright_stealth import Stealth
STEALTH_NEW_API = True
except ImportError:
try:
# Try old playwright-stealth API (stealth_async function)
from playwright_stealth import stealth_async
STEALTH_NEW_API = False
except ImportError:
# No stealth available
STEALTH_NEW_API = None
import uuid import uuid
from .js_snippet import load_js_script from .js_snippet import load_js_script
from .models import AsyncCrawlResponse from .models import AsyncCrawlResponse
@@ -31,6 +45,107 @@ from types import MappingProxyType
import contextlib import contextlib
from functools import partial from functools import partial
# Add StealthConfig class for backward compatibility and new features
class StealthConfig:
"""
Configuration class for stealth settings that works with tf-playwright-stealth.
This maintains backward compatibility while supporting all tf-playwright-stealth features.
"""
def __init__(
self,
# Common settings
enabled: bool = True,
# Core tf-playwright-stealth parameters (matching the actual library)
chrome_app: bool = True,
chrome_csi: bool = True,
chrome_load_times: bool = True,
chrome_runtime: bool = False, # Note: library default is False
hairline: bool = True,
iframe_content_window: bool = True,
media_codecs: bool = True,
navigator_hardware_concurrency: bool = True,
navigator_languages: bool = True,
navigator_permissions: bool = True,
navigator_platform: bool = True,
navigator_plugins: bool = True,
navigator_user_agent: bool = True,
navigator_vendor: bool = True,
navigator_webdriver: bool = True,
sec_ch_ua: bool = True,
webgl_vendor: bool = True,
# Override parameters
navigator_languages_override: tuple = ("en-US", "en"),
navigator_platform_override: str = "Win32",
navigator_user_agent_override: str = None,
navigator_vendor_override: str = None,
sec_ch_ua_override: str = None,
webgl_renderer_override: str = None,
webgl_vendor_override: str = None,
# Advanced parameters
init_scripts_only: bool = False,
script_logging: bool = False,
# Legacy parameters for backward compatibility
webdriver: bool = None, # This will be mapped to navigator_webdriver
user_agent_override: bool = None, # This will be mapped to navigator_user_agent
window_outerdimensions: bool = None, # This parameter doesn't exist in tf-playwright-stealth
):
self.enabled = enabled
# Handle legacy parameter mapping for backward compatibility
if webdriver is not None:
navigator_webdriver = webdriver
if user_agent_override is not None:
navigator_user_agent = user_agent_override
# Store all stealth options for the Stealth class - filter out None values
self.stealth_options = {
k: v for k, v in {
'chrome_app': chrome_app,
'chrome_csi': chrome_csi,
'chrome_load_times': chrome_load_times,
'chrome_runtime': chrome_runtime,
'hairline': hairline,
'iframe_content_window': iframe_content_window,
'media_codecs': media_codecs,
'navigator_hardware_concurrency': navigator_hardware_concurrency,
'navigator_languages': navigator_languages,
'navigator_permissions': navigator_permissions,
'navigator_platform': navigator_platform,
'navigator_plugins': navigator_plugins,
'navigator_user_agent': navigator_user_agent,
'navigator_vendor': navigator_vendor,
'navigator_webdriver': navigator_webdriver,
'sec_ch_ua': sec_ch_ua,
'webgl_vendor': webgl_vendor,
'navigator_languages_override': navigator_languages_override,
'navigator_platform_override': navigator_platform_override,
'navigator_user_agent_override': navigator_user_agent_override,
'navigator_vendor_override': navigator_vendor_override,
'sec_ch_ua_override': sec_ch_ua_override,
'webgl_renderer_override': webgl_renderer_override,
'webgl_vendor_override': webgl_vendor_override,
'init_scripts_only': init_scripts_only,
'script_logging': script_logging,
}.items() if v is not None
}
@classmethod
def from_dict(cls, config_dict: dict) -> 'StealthConfig':
"""Create StealthConfig from dictionary for easy configuration"""
return cls(**config_dict)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'enabled': self.enabled,
**self.stealth_options
}
class AsyncCrawlerStrategy(ABC): class AsyncCrawlerStrategy(ABC):
""" """
Abstract base class for crawler strategies. Abstract base class for crawler strategies.
@@ -39,7 +154,7 @@ class AsyncCrawlerStrategy(ABC):
@abstractmethod @abstractmethod
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse: async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
pass # 4 + 3 pass # 4 + 3
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
""" """
@@ -220,6 +335,79 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
""" """
self.headers = headers self.headers = headers
async def _apply_stealth(self, page: Page, stealth_config: Optional[StealthConfig] = None):
"""
Apply stealth measures to the page with backward compatibility and enhanced configuration.
This method automatically applies stealth measures and now supports configuration
through StealthConfig while maintaining backward compatibility.
Currently supports:
- tf-playwright-stealth (Stealth class with extensive configuration)
- Old playwright-stealth v1.x (stealth_async function) - legacy support
Args:
page (Page): The Playwright page object
stealth_config (Optional[StealthConfig]): Configuration for stealth settings
"""
if STEALTH_NEW_API is None:
# No stealth library available - silently continue
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="playwright-stealth not available, skipping stealth measures",
tag="STEALTH"
)
return
# Use default config if none provided
if stealth_config is None:
stealth_config = StealthConfig()
# Skip if stealth is disabled
if not stealth_config.enabled:
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="Stealth measures disabled in configuration",
tag="STEALTH"
)
return
try:
if STEALTH_NEW_API:
# Use tf-playwright-stealth API with configuration support
# Filter out any invalid parameters that might cause issues
valid_options = {}
for key, value in stealth_config.stealth_options.items():
# Accept boolean parameters and specific string/tuple parameters
if isinstance(value, (bool, str, tuple)):
valid_options[key] = value
stealth = Stealth(**valid_options)
await stealth.apply_stealth_async(page)
config_info = f"with {len(valid_options)} options"
else:
# Use old API (v1.x) - configuration options are limited
await stealth_async(page)
config_info = "default (v1.x legacy)"
# Only log if logger is available and in debug mode
if self.logger and hasattr(self.logger, 'debug'):
api_version = "tf-playwright-stealth" if STEALTH_NEW_API else "v1.x"
self.logger.debug(
message="Applied stealth measures using {version} {config}",
tag="STEALTH",
params={"version": api_version, "config": config_info}
)
except Exception as e:
# Silently continue if stealth fails - don't break the crawling process
if self.logger:
self.logger.warning(
message="Stealth measures failed, continuing without stealth: {error}",
tag="STEALTH",
params={"error": str(e)}
)
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000): async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
""" """
Wait for a condition in a smart way. This functions works as below: Wait for a condition in a smart way. This functions works as below:
@@ -532,6 +720,24 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Get page for session # Get page for session
page, context = await self.browser_manager.get_page(crawlerRunConfig=config) page, context = await self.browser_manager.get_page(crawlerRunConfig=config)
# Apply stealth measures automatically (backward compatible) with optional config
# Check multiple possible locations for stealth config for flexibility
stealth_config = None
if hasattr(config, 'stealth_config') and config.stealth_config:
stealth_config = config.stealth_config
elif hasattr(config, 'stealth') and config.stealth:
# Alternative attribute name for backward compatibility
stealth_config = config.stealth if isinstance(config.stealth, StealthConfig) else StealthConfig.from_dict(config.stealth)
elif config.magic:
# Enable more aggressive stealth in magic mode
stealth_config = StealthConfig(
navigator_webdriver=False, # More aggressive stealth
webdriver=False,
chrome_app=False
)
await self._apply_stealth(page, stealth_config)
# await page.goto(URL) # await page.goto(URL)
# Add default cookie # Add default cookie
@@ -824,7 +1030,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Error: except Error:
visibility_info = await self.check_visibility(page) visibility_info = await self.check_visibility(page)
if self.browser_config.verbose: if self.browser_config.config.verbose:
self.logger.debug( self.logger.debug(
message="Body visibility info: {info}", message="Body visibility info: {info}",
tag="DEBUG", tag="DEBUG",
@@ -933,7 +1139,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
tag="VIEWPORT", tag="VIEWPORT",
params={"error": str(e)}, params={"error": str(e)},
) )
# Handle full page scanning # Handle full page scanning
if config.scan_full_page: if config.scan_full_page:
# await self._handle_full_page_scan(page, config.scroll_delay) # await self._handle_full_page_scan(page, config.scroll_delay)
@@ -1837,8 +2042,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# }} # }}
# }})(); # }})();
# """ # """
# )
# """ NEW VERSION: # """ NEW VERSION:
# When {script} contains statements (e.g., const link = …; link.click();), # When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'. # this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.

View File

@@ -502,12 +502,9 @@ class AsyncWebCrawler:
metadata = result.get("metadata", {}) metadata = result.get("metadata", {})
else: else:
cleaned_html = sanitize_input_encode(result.cleaned_html) cleaned_html = sanitize_input_encode(result.cleaned_html)
# media = result.media.model_dump() media = result.media.model_dump()
# tables = media.pop("tables", []) tables = media.pop("tables", [])
# links = result.links.model_dump() links = result.links.model_dump()
media = result.media.model_dump() if hasattr(result.media, 'model_dump') else result.media
tables = media.pop("tables", []) if isinstance(media, dict) else []
links = result.links.model_dump() if hasattr(result.links, 'model_dump') else result.links
metadata = result.metadata metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000) fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)

View File

@@ -16,7 +16,6 @@ from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig from .async_configs import BrowserConfig, CrawlerRunConfig
from .utils import get_chromium_path from .utils import get_chromium_path
BROWSER_DISABLE_OPTIONS = [ BROWSER_DISABLE_OPTIONS = [
"--disable-background-networking", "--disable-background-networking",
"--disable-background-timer-throttling", "--disable-background-timer-throttling",

View File

@@ -27,10 +27,7 @@ from crawl4ai import (
PruningContentFilter, PruningContentFilter,
BrowserProfiler, BrowserProfiler,
DefaultMarkdownGenerator, DefaultMarkdownGenerator,
LLMConfig, LLMConfig
BFSDeepCrawlStrategy,
DFSDeepCrawlStrategy,
BestFirstCrawlingStrategy,
) )
from crawl4ai.config import USER_SETTINGS from crawl4ai.config import USER_SETTINGS
from litellm import completion from litellm import completion
@@ -1017,11 +1014,9 @@ def cdp_cmd(user_data_dir: Optional[str], port: int, browser_type: str, headless
@click.option("--question", "-q", help="Ask a question about the crawled content") @click.option("--question", "-q", help="Ask a question about the crawled content")
@click.option("--verbose", "-v", is_flag=True) @click.option("--verbose", "-v", is_flag=True)
@click.option("--profile", "-p", help="Use a specific browser profile (by name)") @click.option("--profile", "-p", help="Use a specific browser profile (by name)")
@click.option("--deep-crawl", type=click.Choice(["bfs", "dfs", "best-first"]), help="Enable deep crawling with specified strategy (bfs, dfs, or best-first)")
@click.option("--max-pages", type=int, default=10, help="Maximum number of pages to crawl in deep crawl mode")
def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config: str, def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config: str,
extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict, extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict,
output: str, output_file: str, bypass_cache: bool, question: str, verbose: bool, profile: str, deep_crawl: str, max_pages: int): output: str, output_file: str, bypass_cache: bool, question: str, verbose: bool, profile: str):
"""Crawl a website and extract content """Crawl a website and extract content
Simple Usage: Simple Usage:
@@ -1161,27 +1156,6 @@ Always return valid, properly formatted JSON."""
crawler_cfg.scraping_strategy = LXMLWebScrapingStrategy() crawler_cfg.scraping_strategy = LXMLWebScrapingStrategy()
# Handle deep crawling configuration
if deep_crawl:
if deep_crawl == "bfs":
crawler_cfg.deep_crawl_strategy = BFSDeepCrawlStrategy(
max_depth=3,
max_pages=max_pages
)
elif deep_crawl == "dfs":
crawler_cfg.deep_crawl_strategy = DFSDeepCrawlStrategy(
max_depth=3,
max_pages=max_pages
)
elif deep_crawl == "best-first":
crawler_cfg.deep_crawl_strategy = BestFirstCrawlingStrategy(
max_depth=3,
max_pages=max_pages
)
if verbose:
console.print(f"[green]Deep crawling enabled:[/green] {deep_crawl} strategy, max {max_pages} pages")
config = get_global_config() config = get_global_config()
browser_cfg.verbose = config.get("VERBOSE", False) browser_cfg.verbose = config.get("VERBOSE", False)
@@ -1196,60 +1170,39 @@ Always return valid, properly formatted JSON."""
verbose verbose
) )
# Handle deep crawl results (list) vs single result
if isinstance(result, list):
if len(result) == 0:
click.echo("No results found during deep crawling")
return
# Use the first result for question answering and output
main_result = result[0]
all_results = result
else:
# Single result from regular crawling
main_result = result
all_results = [result]
# Handle question # Handle question
if question: if question:
provider, token = setup_llm_config() provider, token = setup_llm_config()
markdown = main_result.markdown.raw_markdown markdown = result.markdown.raw_markdown
anyio.run(stream_llm_response, url, markdown, question, provider, token) anyio.run(stream_llm_response, url, markdown, question, provider, token)
return return
# Handle output # Handle output
if not output_file: if not output_file:
if output == "all": if output == "all":
if isinstance(result, list): click.echo(json.dumps(result.model_dump(), indent=2))
output_data = [r.model_dump() for r in all_results]
click.echo(json.dumps(output_data, indent=2))
else:
click.echo(json.dumps(main_result.model_dump(), indent=2))
elif output == "json": elif output == "json":
print(main_result.extracted_content) print(result.extracted_content)
extracted_items = json.loads(main_result.extracted_content) extracted_items = json.loads(result.extracted_content)
click.echo(json.dumps(extracted_items, indent=2)) click.echo(json.dumps(extracted_items, indent=2))
elif output in ["markdown", "md"]: elif output in ["markdown", "md"]:
click.echo(main_result.markdown.raw_markdown) click.echo(result.markdown.raw_markdown)
elif output in ["markdown-fit", "md-fit"]: elif output in ["markdown-fit", "md-fit"]:
click.echo(main_result.markdown.fit_markdown) click.echo(result.markdown.fit_markdown)
else: else:
if output == "all": if output == "all":
with open(output_file, "w") as f: with open(output_file, "w") as f:
if isinstance(result, list): f.write(json.dumps(result.model_dump(), indent=2))
output_data = [r.model_dump() for r in all_results]
f.write(json.dumps(output_data, indent=2))
else:
f.write(json.dumps(main_result.model_dump(), indent=2))
elif output == "json": elif output == "json":
with open(output_file, "w") as f: with open(output_file, "w") as f:
f.write(main_result.extracted_content) f.write(result.extracted_content)
elif output in ["markdown", "md"]: elif output in ["markdown", "md"]:
with open(output_file, "w") as f: with open(output_file, "w") as f:
f.write(main_result.markdown.raw_markdown) f.write(result.markdown.raw_markdown)
elif output in ["markdown-fit", "md-fit"]: elif output in ["markdown-fit", "md-fit"]:
with open(output_file, "w") as f: with open(output_file, "w") as f:
f.write(main_result.markdown.fit_markdown) f.write(result.markdown.fit_markdown)
except Exception as e: except Exception as e:
raise click.ClickException(str(e)) raise click.ClickException(str(e))
@@ -1401,11 +1354,9 @@ def profiles_cmd():
@click.option("--question", "-q", help="Ask a question about the crawled content") @click.option("--question", "-q", help="Ask a question about the crawled content")
@click.option("--verbose", "-v", is_flag=True) @click.option("--verbose", "-v", is_flag=True)
@click.option("--profile", "-p", help="Use a specific browser profile (by name)") @click.option("--profile", "-p", help="Use a specific browser profile (by name)")
@click.option("--deep-crawl", type=click.Choice(["bfs", "dfs", "best-first"]), help="Enable deep crawling with specified strategy")
@click.option("--max-pages", type=int, default=10, help="Maximum number of pages to crawl in deep crawl mode")
def default(url: str, example: bool, browser_config: str, crawler_config: str, filter_config: str, def default(url: str, example: bool, browser_config: str, crawler_config: str, filter_config: str,
extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict, extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict,
output: str, bypass_cache: bool, question: str, verbose: bool, profile: str, deep_crawl: str, max_pages: int): output: str, bypass_cache: bool, question: str, verbose: bool, profile: str):
"""Crawl4AI CLI - Web content extraction tool """Crawl4AI CLI - Web content extraction tool
Simple Usage: Simple Usage:
@@ -1455,9 +1406,7 @@ def default(url: str, example: bool, browser_config: str, crawler_config: str, f
bypass_cache=bypass_cache, bypass_cache=bypass_cache,
question=question, question=question,
verbose=verbose, verbose=verbose,
profile=profile, profile=profile
deep_crawl=deep_crawl,
max_pages=max_pages
) )
def main(): def main():

View File

@@ -1145,10 +1145,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
link_data["intrinsic_score"] = intrinsic_score link_data["intrinsic_score"] = intrinsic_score
except Exception: except Exception:
# Fail gracefully - assign default score # Fail gracefully - assign default score
link_data["intrinsic_score"] = 0 link_data["intrinsic_score"] = float('inf')
else: else:
# No scoring enabled - assign infinity (all links equal priority) # No scoring enabled - assign infinity (all links equal priority)
link_data["intrinsic_score"] = 0 link_data["intrinsic_score"] = float('inf')
is_external = is_external_url(normalized_href, base_domain) is_external = is_external_url(normalized_href, base_domain)
if is_external: if is_external:

View File

@@ -3342,13 +3342,7 @@ async def get_text_embeddings(
# Default: use sentence-transformers # Default: use sentence-transformers
else: else:
# Lazy load to avoid importing heavy libraries unless needed # Lazy load to avoid importing heavy libraries unless needed
try: from sentence_transformers import SentenceTransformer
from sentence_transformers import SentenceTransformer
except ImportError:
raise ImportError(
"sentence-transformers is required for local embeddings. "
"Install it with: pip install 'crawl4ai[transformer]' or pip install sentence-transformers"
)
# Cache the model in function attribute to avoid reloading # Cache the model in function attribute to avoid reloading
if not hasattr(get_text_embeddings, '_models'): if not hasattr(get_text_embeddings, '_models'):

View File

@@ -5,7 +5,6 @@ from typing import List, Tuple, Dict
from functools import partial from functools import partial
from uuid import uuid4 from uuid import uuid4
from datetime import datetime from datetime import datetime
from base64 import b64encode
import logging import logging
from typing import Optional, AsyncGenerator from typing import Optional, AsyncGenerator
@@ -372,9 +371,6 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb() server_memory_mb = _get_memory_mb()
result_dict = result.model_dump() result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb result_dict['server_memory_mb'] = server_memory_mb
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = json.dumps(result_dict, default=datetime_handler) + "\n" data = json.dumps(result_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8') yield data.encode('utf-8')
@@ -447,19 +443,10 @@ async def handle_crawl_request(
mem_delta_mb = end_mem_mb - start_mem_mb # <--- Calculate delta mem_delta_mb = end_mem_mb - start_mem_mb # <--- Calculate delta
peak_mem_mb = max(peak_mem_mb if peak_mem_mb else 0, end_mem_mb) # <--- Get peak memory peak_mem_mb = max(peak_mem_mb if peak_mem_mb else 0, end_mem_mb) # <--- Get peak memory
logger.info(f"Memory usage: Start: {start_mem_mb} MB, End: {end_mem_mb} MB, Delta: {mem_delta_mb} MB, Peak: {peak_mem_mb} MB") logger.info(f"Memory usage: Start: {start_mem_mb} MB, End: {end_mem_mb} MB, Delta: {mem_delta_mb} MB, Peak: {peak_mem_mb} MB")
# Process results to handle PDF bytes
processed_results = []
for result in results:
result_dict = result.model_dump()
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
processed_results.append(result_dict)
return { return {
"success": True, "success": True,
"results": processed_results, "results": [result.model_dump() for result in results],
"server_processing_time_s": end_time - start_time, "server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb, "server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb "server_peak_memory_mb": peak_mem_mb

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns - **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages - **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization - **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering - **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements - **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning ## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,41 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores - Extraction confidence scores
```python ```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
import asyncio
async def main(): # Initialize with custom learning parameters
config = AdaptiveConfig(
# Configure adaptive crawler confidence_threshold=0.7, # Min confidence to use learned patterns
config = AdaptiveConfig( max_history=100, # Remember last 100 crawls per domain
strategy="statistical", # or "embedding" for semantic understanding learning_rate=0.2, # How quickly to adapt to changes
max_pages=10, patterns_per_page=3, # Patterns to learn per page type
confidence_threshold=0.7, # Stop at 70% confidence extraction_strategy='css' # 'css' or 'xpath'
top_k_links=3, # Follow top 3 links per page )
min_gain_threshold=0.05 # Need 5% information gain to continue
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
) )
async with AsyncWebCrawler(verbose=False) as crawler: # Crawler identifies and stores patterns
adaptive = AdaptiveCrawler(crawler, config) if result.success:
state = adaptive_crawler.get_state("news.example.com")
print("Starting adaptive crawl about Python decorators...") print(f"Learned {len(state.patterns)} patterns")
result = await adaptive.digest( print(f"Confidence: {state.avg_confidence:.2%}")
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
asyncio.run(main()) # Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
``` ```
**Expected Real-World Impact:** **Expected Real-World Impact:**
@@ -88,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']", container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
) )
# For e-commerce product grids (Instagram style) # For e-commerce product grids (Instagram style)
@@ -96,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid", container_selector="main .product-grid",
scroll_count=30, scroll_count=30,
scroll_by=800, # Fixed pixel scrolling scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
) )
# For news feeds with lazy loading # For news feeds with lazy loading
@@ -104,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed", container_selector=".article-feed",
scroll_count=50, scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
) )
# Use it in your crawl # Use it in your crawl
@@ -148,63 +157,68 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals. **My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### Intelligent Link Analysis and Scoring ### The Three-Layer Scoring System
```python ```python
import asyncio from crawl4ai import LinkPreviewConfig
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
async def main(): # Configure intelligent link analysis
# Configure intelligent link analysis link_config = LinkPreviewConfig(
link_config = LinkPreviewConfig( # What to analyze
include_internal=True, include_internal=True,
include_external=False, include_external=True,
max_links=10, max_links=100, # Analyze top 100 links
concurrency=5,
query="python tutorial", # For contextual scoring # Relevance scoring
score_threshold=0.3, query="machine learning tutorials", # Your interest
verbose=True score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
) )
# Use in your crawl )
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
# Access scored and sorted links # Access scored and sorted links
if result.success and result.links: for link in result.links["internal"][:10]: # Top 10 internal links
for link in result.links.get("internal", []): print(f"Score: {link['total_score']:.3f}")
text = link.get('text', 'No text')[:40] print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print( print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
text, print(f" URL: {link['href']}")
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10", print(f" Title: {link['head_data']['title']}")
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1", print(f" Description: {link['head_data']['meta']['description'][:100]}...")
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
``` ```
**Scoring Components:** **Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators 1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer) - Position on page (navigation, content, footer)
- Link attributes (rel, title, class names) - Link attributes (rel, title, class names)
- Anchor text quality and length - Anchor text quality and length
- URL structure and depth - URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm 2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title - Keyword matching in link text and title
- Meta description analysis - Meta description analysis
- Content preview scoring - Content preview scoring
3. **Total Score**: Combined score for final ranking 3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:** **Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links - **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -221,34 +235,58 @@ asyncio.run(main())
### Technical Architecture ### Technical Architecture
```python ```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig from crawl4ai import AsyncUrlSeeder, SeedingConfig
async def main(): # Basic discovery - find all product pages
async with AsyncUrlSeeder() as seeder: seeder_config = SeedingConfig(
# Discover Python tutorial URLs # Discovery sources
config = SeedingConfig( source="sitemap+cc", # Sitemap + Common Crawl
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter # Filtering
extract_head=True, # Get metadata pattern="*/product/*", # URL pattern matching
query="python tutorial", # For relevance scoring ignore_patterns=["*/reviews/*", "*/questions/*"],
scoring_method="bm25",
score_threshold=0.2, # Validation
max_urls=10 live_check=True, # Verify URLs are alive
) max_urls=5000, # Stop at 5000 URLs
print("Discovering Python async tutorial URLs...") # Performance
urls = await seeder.urls("https://www.geeksforgeeks.org/", config) concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
print(f"\n✅ Found {len(urls)} relevant URLs:") )
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
asyncio.run(main()) seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
``` ```
**Discovery Methods:** **Discovery Methods:**
@@ -271,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized ### What We Optimized
```python ```python
# Optimized crawling with v0.7.0 improvements # Before v0.7.0 (slow)
results = [] results = []
for url in urls: for url in urls:
result = await crawler.arun( result = await crawler.arun(url)
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
results.append(result) results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
``` ```
**Performance Gains:** **Performance Gains:**
@@ -292,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing - **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests - **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes ## 🔧 Important Changes

View File

@@ -1,43 +0,0 @@
# 🛠️ Crawl4AI v0.7.1: Minor Cleanup Update
*July 17, 2025 • 2 min read*
---
A small maintenance release that removes unused code and improves documentation.
## 🎯 What's Changed
- **Removed unused StealthConfig** from `crawl4ai/browser_manager.py`
- **Updated documentation** with better examples and parameter explanations
- **Fixed virtual scroll configuration** examples in docs
## 🧹 Code Cleanup
Removed unused `StealthConfig` import and configuration that wasn't being used anywhere in the codebase. The project uses its own custom stealth implementation through JavaScript injection instead.
```python
# Removed unused code:
from playwright_stealth import StealthConfig
stealth_config = StealthConfig(...) # This was never used
```
## 📖 Documentation Updates
- Fixed adaptive crawling parameter examples
- Updated session management documentation
- Corrected virtual scroll configuration examples
## 🚀 Installation
```bash
pip install crawl4ai==0.7.1
```
No breaking changes - upgrade directly from v0.7.0.
---
Questions? Issues?
- GitHub: [github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
- Discord: [discord.gg/crawl4ai](https://discord.gg/jP8KfhDhyN)

View File

@@ -18,7 +18,7 @@ Usage:
import asyncio import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig from crawl4ai.async_configs import LinkPreviewConfig
async def basic_link_head_extraction(): async def basic_link_head_extraction():

View File

@@ -49,75 +49,46 @@ from crawl4ai import JsonCssExtractionStrategy
from crawl4ai.cache_context import CacheMode from crawl4ai.cache_context import CacheMode
async def crawl_dynamic_content(): async def crawl_dynamic_content():
url = "https://github.com/microsoft/TypeScript/commits/main" async with AsyncWebCrawler() as crawler:
session_id = "wait_for_session" session_id = "github_commits_session"
all_commits = [] url = "https://github.com/microsoft/TypeScript/commits/main"
all_commits = []
js_next_page = """ # Define extraction schema
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4'); schema = {
if (commits.length > 0) { "name": "Commit Extractor",
window.lastCommit = commits[0].textContent.trim(); "baseSelector": "li.Box-sc-g0xbh4-0",
} "fields": [{
const button = document.querySelector('a[data-testid="pagination-next-button"]'); "name": "title", "selector": "h4.markdown-title", "type": "text"
if (button) {button.click(); console.log('button clicked') } }],
""" }
extraction_strategy = JsonCssExtractionStrategy(schema)
wait_for = """() => { # JavaScript and wait configurations
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4'); js_next_page = """document.querySelector('a[data-testid="pagination-next-button"]').click();"""
if (commits.length === 0) return false; wait_for = """() => document.querySelectorAll('li.Box-sc-g0xbh4-0').length > 0"""
const firstCommit = commits[0].textContent.trim();
return firstCommit !== window.lastCommit; # Crawl multiple pages
}"""
schema = {
"name": "Commit Extractor",
"baseSelector": "li[data-testid='commit-row-item']",
"fields": [
{
"name": "title",
"selector": "h4 a",
"type": "text",
"transform": "strip",
},
],
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
browser_config = BrowserConfig(
verbose=True,
headless=False,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
for page in range(3): for page in range(3):
crawler_config = CrawlerRunConfig( config = CrawlerRunConfig(
url=url,
session_id=session_id, session_id=session_id,
css_selector="li[data-testid='commit-row-item']",
extraction_strategy=extraction_strategy, extraction_strategy=extraction_strategy,
js_code=js_next_page if page > 0 else None, js_code=js_next_page if page > 0 else None,
wait_for=wait_for if page > 0 else None, wait_for=wait_for if page > 0 else None,
js_only=page > 0, js_only=page > 0,
cache_mode=CacheMode.BYPASS, cache_mode=CacheMode.BYPASS
capture_console_messages=True,
) )
result = await crawler.arun(url=url, config=crawler_config) result = await crawler.arun(config=config)
if result.success:
if result.console_messages:
print(f"Page {page + 1} console messages:", result.console_messages)
if result.extracted_content:
# print(f"Page {page + 1} result:", result.extracted_content)
commits = json.loads(result.extracted_content) commits = json.loads(result.extracted_content)
all_commits.extend(commits) all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits") print(f"Page {page + 1}: Found {len(commits)} commits")
else:
print(f"Page {page + 1}: No content extracted")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Clean up session # Clean up session
await crawler.crawler_strategy.kill_session(session_id) await crawler.crawler_strategy.kill_session(session_id)
return all_commits
``` ```
--- ---

View File

@@ -91,12 +91,13 @@ async def crawl_twitter_timeline():
wait_after_scroll=1.0 # Twitter needs time to load wait_after_scroll=1.0 # Twitter needs time to load
) )
browser_config = BrowserConfig(headless=True) # Set to False to watch it work
config = CrawlerRunConfig( config = CrawlerRunConfig(
virtual_scroll_config=virtual_config virtual_scroll_config=virtual_config,
# Optional: Set headless=False to watch it work
# browser_config=BrowserConfig(headless=False)
) )
async with AsyncWebCrawler(config=browser_config) as crawler: async with AsyncWebCrawler() as crawler:
result = await crawler.arun( result = await crawler.arun(
url="https://twitter.com/search?q=AI", url="https://twitter.com/search?q=AI",
config=config config=config
@@ -199,7 +200,7 @@ Use **scan_full_page** when:
Virtual Scroll works seamlessly with extraction strategies: Virtual Scroll works seamlessly with extraction strategies:
```python ```python
from crawl4ai import LLMExtractionStrategy, LLMConfig from crawl4ai import LLMExtractionStrategy
# Define extraction schema # Define extraction schema
schema = { schema = {
@@ -221,7 +222,7 @@ config = CrawlerRunConfig(
scroll_count=20 scroll_count=20
), ),
extraction_strategy=LLMExtractionStrategy( extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"), provider="openai/gpt-4o-mini",
schema=schema schema=schema
) )
) )

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns - **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages - **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization - **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering - **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements - **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning ## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,41 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores - Extraction confidence scores
```python ```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
import asyncio
async def main(): # Initialize with custom learning parameters
config = AdaptiveConfig(
# Configure adaptive crawler confidence_threshold=0.7, # Min confidence to use learned patterns
config = AdaptiveConfig( max_history=100, # Remember last 100 crawls per domain
strategy="statistical", # or "embedding" for semantic understanding learning_rate=0.2, # How quickly to adapt to changes
max_pages=10, patterns_per_page=3, # Patterns to learn per page type
confidence_threshold=0.7, # Stop at 70% confidence extraction_strategy='css' # 'css' or 'xpath'
top_k_links=3, # Follow top 3 links per page )
min_gain_threshold=0.05 # Need 5% information gain to continue
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
) )
async with AsyncWebCrawler(verbose=False) as crawler: # Crawler identifies and stores patterns
adaptive = AdaptiveCrawler(crawler, config) if result.success:
state = adaptive_crawler.get_state("news.example.com")
print("Starting adaptive crawl about Python decorators...") print(f"Learned {len(state.patterns)} patterns")
result = await adaptive.digest( print(f"Confidence: {state.avg_confidence:.2%}")
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
asyncio.run(main()) # Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
``` ```
**Expected Real-World Impact:** **Expected Real-World Impact:**
@@ -88,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']", container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
) )
# For e-commerce product grids (Instagram style) # For e-commerce product grids (Instagram style)
@@ -96,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid", container_selector="main .product-grid",
scroll_count=30, scroll_count=30,
scroll_by=800, # Fixed pixel scrolling scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
) )
# For news feeds with lazy loading # For news feeds with lazy loading
@@ -104,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed", container_selector=".article-feed",
scroll_count=50, scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
) )
# Use it in your crawl # Use it in your crawl
@@ -148,63 +157,68 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals. **My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### Intelligent Link Analysis and Scoring ### The Three-Layer Scoring System
```python ```python
import asyncio from crawl4ai import LinkPreviewConfig
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
async def main(): # Configure intelligent link analysis
# Configure intelligent link analysis link_config = LinkPreviewConfig(
link_config = LinkPreviewConfig( # What to analyze
include_internal=True, include_internal=True,
include_external=False, include_external=True,
max_links=10, max_links=100, # Analyze top 100 links
concurrency=5,
query="python tutorial", # For contextual scoring # Relevance scoring
score_threshold=0.3, query="machine learning tutorials", # Your interest
verbose=True score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
) )
# Use in your crawl )
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
# Access scored and sorted links # Access scored and sorted links
if result.success and result.links: for link in result.links["internal"][:10]: # Top 10 internal links
for link in result.links.get("internal", []): print(f"Score: {link['total_score']:.3f}")
text = link.get('text', 'No text')[:40] print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print( print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
text, print(f" URL: {link['href']}")
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10", print(f" Title: {link['head_data']['title']}")
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1", print(f" Description: {link['head_data']['meta']['description'][:100]}...")
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
``` ```
**Scoring Components:** **Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators 1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer) - Position on page (navigation, content, footer)
- Link attributes (rel, title, class names) - Link attributes (rel, title, class names)
- Anchor text quality and length - Anchor text quality and length
- URL structure and depth - URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm 2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title - Keyword matching in link text and title
- Meta description analysis - Meta description analysis
- Content preview scoring - Content preview scoring
3. **Total Score**: Combined score for final ranking 3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:** **Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links - **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -221,34 +235,58 @@ asyncio.run(main())
### Technical Architecture ### Technical Architecture
```python ```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig from crawl4ai import AsyncUrlSeeder, SeedingConfig
async def main(): # Basic discovery - find all product pages
async with AsyncUrlSeeder() as seeder: seeder_config = SeedingConfig(
# Discover Python tutorial URLs # Discovery sources
config = SeedingConfig( source="sitemap+cc", # Sitemap + Common Crawl
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter # Filtering
extract_head=True, # Get metadata pattern="*/product/*", # URL pattern matching
query="python tutorial", # For relevance scoring ignore_patterns=["*/reviews/*", "*/questions/*"],
scoring_method="bm25",
score_threshold=0.2, # Validation
max_urls=10 live_check=True, # Verify URLs are alive
) max_urls=5000, # Stop at 5000 URLs
print("Discovering Python async tutorial URLs...") # Performance
urls = await seeder.urls("https://www.geeksforgeeks.org/", config) concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
print(f"\n✅ Found {len(urls)} relevant URLs:") )
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
asyncio.run(main()) seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
``` ```
**Discovery Methods:** **Discovery Methods:**
@@ -271,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized ### What We Optimized
```python ```python
# Optimized crawling with v0.7.0 improvements # Before v0.7.0 (slow)
results = [] results = []
for url in urls: for url in urls:
result = await crawler.arun( result = await crawler.arun(url)
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
results.append(result) results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
``` ```
**Performance Gains:** **Performance Gains:**
@@ -292,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing - **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests - **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes ## 🔧 Important Changes

View File

@@ -35,7 +35,7 @@ from crawl4ai import AsyncWebCrawler, AdaptiveCrawler
async def main(): async def main():
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
# Create an adaptive crawler (config is optional) # Create an adaptive crawler
adaptive = AdaptiveCrawler(crawler) adaptive = AdaptiveCrawler(crawler)
# Start crawling with a query # Start crawling with a query
@@ -59,13 +59,13 @@ async def main():
from crawl4ai import AdaptiveConfig from crawl4ai import AdaptiveConfig
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.8, # Stop when 80% confident (default: 0.7) confidence_threshold=0.7, # Stop when 70% confident (default: 0.8)
max_pages=30, # Maximum pages to crawl (default: 20) max_pages=20, # Maximum pages to crawl (default: 50)
top_k_links=5, # Links to follow per page (default: 3) top_k_links=3, # Links to follow per page (default: 5)
min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1) min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1)
) )
adaptive = AdaptiveCrawler(crawler, config) adaptive = AdaptiveCrawler(crawler, config=config)
``` ```
## Crawling Strategies ## Crawling Strategies
@@ -198,8 +198,8 @@ if result.metrics.get('is_irrelevant', False):
The confidence score (0-1) indicates how sufficient the gathered information is: The confidence score (0-1) indicates how sufficient the gathered information is:
- **0.0-0.3**: Insufficient information, needs more crawling - **0.0-0.3**: Insufficient information, needs more crawling
- **0.3-0.6**: Partial information, may answer basic queries - **0.3-0.6**: Partial information, may answer basic queries
- **0.6-0.7**: Good coverage, can answer most queries - **0.6-0.8**: Good coverage, can answer most queries
- **0.7-1.0**: Excellent coverage, comprehensive information - **0.8-1.0**: Excellent coverage, comprehensive information
### Statistics Display ### Statistics Display
@@ -257,9 +257,9 @@ new_adaptive.import_knowledge_base("knowledge_base.jsonl")
- Avoid overly broad queries - Avoid overly broad queries
### 2. Threshold Tuning ### 2. Threshold Tuning
- Start with default (0.7) for general use - Start with default (0.8) for general use
- Lower to 0.5-0.6 for exploratory crawling - Lower to 0.6-0.7 for exploratory crawling
- Raise to 0.8+ for exhaustive coverage - Raise to 0.9+ for exhaustive coverage
### 3. Performance Optimization ### 3. Performance Optimization
- Use appropriate `max_pages` limits - Use appropriate `max_pages` limits

View File

@@ -52,9 +52,11 @@ That's it! In just a few lines, you've automated a complete search workflow.
Want to learn by doing? We've got you covered: Want to learn by doing? We've got you covered:
**🚀 [Live Demo](https://docs.crawl4ai.com/apps/c4a-script/)** - Try C4A-Script in your browser right now! **🚀 [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)** - Try C4A-Script in your browser right now!
**📁 [Tutorial Examples](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/c4a_script/)** - Complete examples with source code **📁 [Tutorial Examples](/examples/c4a_script/)** - Complete examples with source code
**🛠️ [Local Tutorial](/examples/c4a_script/tutorial/)** - Run the interactive tutorial on your machine
### Running the Tutorial Locally ### Running the Tutorial Locally

View File

@@ -125,7 +125,7 @@ Here's a full example you can copy, paste, and run immediately:
```python ```python
import asyncio import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig from crawl4ai.async_configs import LinkPreviewConfig
async def extract_link_heads_example(): async def extract_link_heads_example():
""" """
@@ -237,7 +237,7 @@ if __name__ == "__main__":
The `LinkPreviewConfig` class supports these options: The `LinkPreviewConfig` class supports these options:
```python ```python
from crawl4ai import LinkPreviewConfig from crawl4ai.async_configs import LinkPreviewConfig
link_preview_config = LinkPreviewConfig( link_preview_config = LinkPreviewConfig(
# BASIC SETTINGS # BASIC SETTINGS

View File

@@ -137,7 +137,7 @@ async def smart_blog_crawler():
word_count_threshold=300 # Only substantial articles word_count_threshold=300 # Only substantial articles
) )
# Extract URLs and crawl them # Extract URLs and stream results as they come
tutorial_urls = [t["url"] for t in tutorials[:10]] tutorial_urls = [t["url"] for t in tutorials[:10]]
results = await crawler.arun_many(tutorial_urls, config=config) results = await crawler.arun_many(tutorial_urls, config=config)
@@ -231,7 +231,7 @@ Common Crawl is a massive public dataset that regularly crawls the entire web. I
```python ```python
# Use both sources # Use both sources
config = SeedingConfig(source="sitemap+cc") config = SeedingConfig(source="cc+sitemap")
urls = await seeder.urls("example.com", config) urls = await seeder.urls("example.com", config)
``` ```
@@ -241,13 +241,13 @@ The `SeedingConfig` object is your control panel. Here's everything you can conf
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `source` | str | "sitemap+cc" | URL source: "cc" (Common Crawl), "sitemap", or "sitemap+cc" | | `source` | str | "cc" | URL source: "cc" (Common Crawl), "sitemap", or "cc+sitemap" |
| `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") | | `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") |
| `extract_head` | bool | False | Extract metadata from page `<head>` | | `extract_head` | bool | False | Extract metadata from page `<head>` |
| `live_check` | bool | False | Verify URLs are accessible | | `live_check` | bool | False | Verify URLs are accessible |
| `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) | | `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) |
| `concurrency` | int | 10 | Parallel workers for fetching | | `concurrency` | int | 10 | Parallel workers for fetching |
| `hits_per_sec` | int | 5 | Rate limit for requests | | `hits_per_sec` | int | None | Rate limit for requests |
| `force` | bool | False | Bypass cache, fetch fresh data | | `force` | bool | False | Bypass cache, fetch fresh data |
| `verbose` | bool | False | Show detailed progress | | `verbose` | bool | False | Show detailed progress |
| `query` | str | None | Search query for BM25 scoring | | `query` | str | None | Search query for BM25 scoring |
@@ -522,7 +522,7 @@ urls = await seeder.urls("docs.example.com", config)
```python ```python
# Find specific products # Find specific products
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", # Use both sources source="cc+sitemap", # Use both sources
extract_head=True, extract_head=True,
query="wireless headphones noise canceling", query="wireless headphones noise canceling",
scoring_method="bm25", scoring_method="bm25",
@@ -782,7 +782,7 @@ class ResearchAssistant:
# Step 1: Discover relevant URLs # Step 1: Discover relevant URLs
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", # Maximum coverage source="cc+sitemap", # Maximum coverage
extract_head=True, # Get metadata extract_head=True, # Get metadata
query=topic, # Research topic query=topic, # Research topic
scoring_method="bm25", # Smart scoring scoring_method="bm25", # Smart scoring
@@ -832,8 +832,7 @@ class ResearchAssistant:
# Extract URLs and crawl all articles # Extract URLs and crawl all articles
article_urls = [article['url'] for article in top_articles] article_urls = [article['url'] for article in top_articles]
results = [] results = []
crawl_results = await crawler.arun_many(article_urls, config=config) async for result in await crawler.arun_many(article_urls, config=config):
async for result in crawl_results:
if result.success: if result.success:
results.append({ results.append({
'url': result.url, 'url': result.url,
@@ -934,10 +933,10 @@ config = SeedingConfig(concurrency=10, hits_per_sec=5)
# When crawling many URLs # When crawling many URLs
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
# Assuming urls is a list of URL strings # Assuming urls is a list of URL strings
crawl_results = await crawler.arun_many(urls, config=config) results = await crawler.arun_many(urls, config=config)
# Process as they arrive # Process as they arrive
async for result in crawl_results: async for result in results:
process_immediately(result) # Don't wait for all process_immediately(result) # Don't wait for all
``` ```
@@ -1021,7 +1020,7 @@ config = SeedingConfig(
# E-commerce product discovery # E-commerce product discovery
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", source="cc+sitemap",
pattern="*/product/*", pattern="*/product/*",
extract_head=True, extract_head=True,
live_check=True live_check=True

View File

@@ -28,7 +28,7 @@ from rich import box
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, AdaptiveCrawler, AdaptiveConfig, BrowserConfig, CacheMode from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, AdaptiveCrawler, AdaptiveConfig, BrowserConfig, CacheMode
from crawl4ai import AsyncUrlSeeder, SeedingConfig from crawl4ai import AsyncUrlSeeder, SeedingConfig
from crawl4ai import LinkPreviewConfig, VirtualScrollConfig from crawl4ai.async_configs import LinkPreviewConfig, VirtualScrollConfig
from crawl4ai import c4a_compile, CompilationResult from crawl4ai import c4a_compile, CompilationResult
# Initialize Rich console for beautiful output # Initialize Rich console for beautiful output

View File

@@ -13,13 +13,14 @@ from crawl4ai import (
BrowserConfig, BrowserConfig,
CacheMode, CacheMode,
# New imports for v0.7.0 # New imports for v0.7.0
VirtualScrollConfig,
LinkPreviewConfig, LinkPreviewConfig,
VirtualScrollConfig,
AdaptiveCrawler, AdaptiveCrawler,
AdaptiveConfig, AdaptiveConfig,
AsyncUrlSeeder, AsyncUrlSeeder,
SeedingConfig, SeedingConfig,
c4a_compile, c4a_compile,
CompilationResult
) )
@@ -169,16 +170,16 @@ async def demo_url_seeder():
# Discover Python tutorial URLs # Discover Python tutorial URLs
config = SeedingConfig( config = SeedingConfig(
source="sitemap", # Use sitemap source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter pattern="*tutorial*", # URL pattern filter
extract_head=True, # Get metadata extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring query="python async programming", # For relevance scoring
scoring_method="bm25", scoring_method="bm25",
score_threshold=0.2, score_threshold=0.2,
max_urls=10 max_urls=10
) )
print("Discovering Python async tutorial URLs...") print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config) urls = await seeder.urls("docs.python.org", config)
print(f"\n✅ Found {len(urls)} relevant URLs:") print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1): for i, url_info in enumerate(urls[:5], 1):
@@ -244,6 +245,39 @@ IF (EXISTS `.price-filter`) THEN CLICK `input[data-max-price="100"]`
print(f"❌ Compilation error: {result.first_error.message}") print(f"❌ Compilation error: {result.first_error.message}")
async def demo_pdf_support():
"""
Demo 6: PDF Parsing Support
Shows how to extract content from PDF files.
Note: Requires 'pip install crawl4ai[pdf]'
"""
print("\n" + "="*60)
print("📄 DEMO 6: PDF Parsing Support")
print("="*60)
try:
# Check if PDF support is installed
import PyPDF2
# Example: Process a PDF URL
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
pdf=True, # Enable PDF generation
extract_text_from_pdf=True # Extract text content
)
print("PDF parsing is available!")
print("You can now crawl PDF URLs and extract their content.")
print("\nExample usage:")
print(' result = await crawler.arun("https://example.com/document.pdf")')
print(' pdf_text = result.extracted_content # Contains extracted text')
except ImportError:
print("⚠️ PDF support not installed.")
print("Install with: pip install crawl4ai[pdf]")
async def main(): async def main():
"""Run all demos""" """Run all demos"""
print("\n🚀 Crawl4AI v0.7.0 Feature Demonstrations") print("\n🚀 Crawl4AI v0.7.0 Feature Demonstrations")
@@ -255,6 +289,7 @@ async def main():
("Virtual Scroll", demo_virtual_scroll), ("Virtual Scroll", demo_virtual_scroll),
("URL Seeder", demo_url_seeder), ("URL Seeder", demo_url_seeder),
("C4A Script", demo_c4a_script), ("C4A Script", demo_c4a_script),
("PDF Support", demo_pdf_support)
] ]
for name, demo_func in demos: for name, demo_func in demos:
@@ -274,6 +309,7 @@ async def main():
print("• Virtual Scroll: Capture all content from modern web pages") print("• Virtual Scroll: Capture all content from modern web pages")
print("• URL Seeder: Pre-discover and filter URLs efficiently") print("• URL Seeder: Pre-discover and filter URLs efficiently")
print("• C4A Script: Simple language for complex automations") print("• C4A Script: Simple language for complex automations")
print("• PDF Support: Extract content from PDF documents")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -44,6 +44,7 @@ dependencies = [
"brotli>=1.1.0", "brotli>=1.1.0",
"humanize>=4.10.0", "humanize>=4.10.0",
"lark>=1.2.2", "lark>=1.2.2",
"sentence-transformers>=2.2.0",
"alphashape>=1.3.1", "alphashape>=1.3.1",
"shapely>=2.0.0" "shapely>=2.0.0"
] ]
@@ -61,8 +62,8 @@ classifiers = [
[project.optional-dependencies] [project.optional-dependencies]
pdf = ["PyPDF2"] pdf = ["PyPDF2"]
torch = ["torch", "nltk", "scikit-learn"] torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers", "sentence-transformers"] transformer = ["transformers", "tokenizers"]
cosine = ["torch", "transformers", "nltk", "sentence-transformers"] cosine = ["torch", "transformers", "nltk"]
sync = ["selenium"] sync = ["selenium"]
all = [ all = [
"PyPDF2", "PyPDF2",
@@ -71,8 +72,8 @@ all = [
"scikit-learn", "scikit-learn",
"transformers", "transformers",
"tokenizers", "tokenizers",
"sentence-transformers", "selenium",
"selenium" "PyPDF2"
] ]
[project.scripts] [project.scripts]

View File

@@ -24,6 +24,7 @@ cssselect>=1.2.0
chardet>=5.2.0 chardet>=5.2.0
brotli>=1.1.0 brotli>=1.1.0
httpx[http2]>=0.27.2 httpx[http2]>=0.27.2
sentence-transformers>=2.2.0
alphashape>=1.3.1 alphashape>=1.3.1
shapely>=2.0.0 shapely>=2.0.0

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Test suite for playwright-stealth backward compatibility.
Tests that stealth functionality works automatically without user configuration.
"""
import pytest
import asyncio
from unittest.mock import Mock, patch, MagicMock
class TestPlaywrightStealthCompatibility:
"""Test playwright-stealth backward compatibility with transparent operation"""
def test_api_detection_works(self):
"""Test that API detection works correctly"""
from crawl4ai.async_crawler_strategy import STEALTH_NEW_API
# The value depends on which version is installed, but should not be undefined
assert STEALTH_NEW_API is not None or STEALTH_NEW_API is False or STEALTH_NEW_API is None
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_apply_stealth_new_api(self, mock_stealth_class):
"""Test stealth application with new API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock()
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify new API was used
mock_stealth_class.assert_called_once()
mock_stealth_instance.apply_stealth_async.assert_called_once_with(mock_page)
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', False)
async def test_apply_stealth_legacy_api(self):
"""Test stealth application with legacy API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Mock stealth_async function by setting it as a module attribute
mock_stealth_async = Mock()
mock_stealth_async.return_value = None
# Import the module to add the mock function
import crawl4ai.async_crawler_strategy
crawl4ai.async_crawler_strategy.stealth_async = mock_stealth_async
try:
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify legacy API was used
mock_stealth_async.assert_called_once_with(mock_page)
finally:
# Clean up
if hasattr(crawl4ai.async_crawler_strategy, 'stealth_async'):
delattr(crawl4ai.async_crawler_strategy, 'stealth_async')
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', None)
async def test_apply_stealth_no_library(self):
"""Test stealth application when no stealth library is available"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently even without stealth
await strategy._apply_stealth(mock_page)
# Should complete without error even when no stealth is available
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_stealth_error_handling(self, mock_stealth_class):
"""Test that stealth errors are handled gracefully without breaking crawling"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock to raise an error
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock(side_effect=Exception("Stealth failed"))
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should not raise an error, continue silently
await strategy._apply_stealth(mock_page)
# Should complete without raising the stealth error
def test_strategy_creation_without_config(self):
"""Test that strategy can be created without any stealth configuration"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Should work without any stealth-related parameters
strategy = AsyncPlaywrightCrawlerStrategy()
assert strategy is not None
assert hasattr(strategy, '_apply_stealth')
def test_browser_config_works_without_stealth_param(self):
"""Test that BrowserConfig works without stealth parameter"""
from crawl4ai.async_configs import BrowserConfig
# Should work without stealth parameter
config = BrowserConfig()
assert config is not None
# Should also work with other parameters
config = BrowserConfig(headless=False, browser_type="firefox")
assert config.headless == False
assert config.browser_type == "firefox"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,345 +0,0 @@
#!/usr/bin/env python3
"""
Simple API Test for Crawl4AI Docker Server v0.7.0
Uses only built-in Python modules to test all endpoints.
"""
import urllib.request
import urllib.parse
import json
import time
import sys
from typing import Dict, List, Optional
# Configuration
BASE_URL = "http://localhost:11234" # Change to your server URL
TEST_TIMEOUT = 30
class SimpleApiTester:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.token = None
self.results = []
def log(self, message: str):
print(f"[INFO] {message}")
def test_get_endpoint(self, endpoint: str) -> Dict:
"""Test a GET endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
req = urllib.request.Request(url)
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "GET",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "GET",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
"""Test a POST endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "POST",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "POST",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def print_result(self, result: Dict):
"""Print a formatted test result"""
status_color = {
"PASS": "",
"FAIL": "",
"SKIP": "⏭️"
}
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
if result['status'] == 'FAIL' and 'error' in result:
print(f" Error: {result['error']}")
self.results.append(result)
def run_all_tests(self):
"""Run all API tests"""
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
print(f"📡 Testing server at: {self.base_url}")
print("=" * 60)
# # Test basic endpoints
# print("\n=== BASIC ENDPOINTS ===")
# # Health check
# result = self.test_get_endpoint("/health")
# self.print_result(result)
# # Schema endpoint
# result = self.test_get_endpoint("/schema")
# self.print_result(result)
# # Metrics endpoint
# result = self.test_get_endpoint("/metrics")
# self.print_result(result)
# # Root redirect
# result = self.test_get_endpoint("/")
# self.print_result(result)
# # Test authentication
# print("\n=== AUTHENTICATION ===")
# # Get token
# token_payload = {"email": "test@example.com"}
# result = self.test_post_endpoint("/token", token_payload)
# self.print_result(result)
# # Extract token if successful
# if result['status'] == 'PASS' and 'data' in result:
# token = result['data'].get('access_token')
# if token:
# self.token = token
# self.log(f"Successfully obtained auth token: {token[:20]}...")
# Test core APIs
print("\n=== CORE APIs ===")
test_url = "https://example.com"
# Test markdown endpoint
md_payload = {
"url": test_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", md_payload)
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
self.print_result(result)
# Test screenshot endpoint
screenshot_payload = {
"url": test_url,
"screenshot_wait_for": 2
}
result = self.test_post_endpoint("/screenshot", screenshot_payload)
self.print_result(result)
# Test PDF endpoint
pdf_payload = {"url": test_url}
result = self.test_post_endpoint("/pdf", pdf_payload)
self.print_result(result)
# Test JavaScript execution
js_payload = {
"url": test_url,
"scripts": ["(() => document.title)()"]
}
result = self.test_post_endpoint("/execute_js", js_payload)
self.print_result(result)
# Test crawl endpoint
crawl_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)
self.print_result(result)
# Test LLM endpoint
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
result = self.test_get_endpoint(llm_endpoint)
self.print_result(result)
# Test ask endpoint
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
result = self.test_get_endpoint(ask_endpoint)
print(result)
self.print_result(result)
# Test job APIs
print("\n=== JOB APIs ===")
# Test LLM job
llm_job_payload = {
"url": test_url,
"q": "Extract main content",
"cache": False
}
result = self.test_post_endpoint("/llm/job", llm_job_payload)
self.print_result(result)
# Test crawl job
crawl_job_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
self.print_result(result)
# Test MCP
print("\n=== MCP APIs ===")
# Test MCP schema
result = self.test_get_endpoint("/mcp/schema")
self.print_result(result)
# Test error handling
print("\n=== ERROR HANDLING ===")
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
result = self.test_post_endpoint("/md", invalid_payload)
self.print_result(result)
# Test invalid endpoint
result = self.test_get_endpoint("/nonexistent")
self.print_result(result)
# Print summary
self.print_summary()
def print_summary(self):
"""Print test results summary"""
print("\n" + "=" * 60)
print("📊 TEST RESULTS SUMMARY")
print("=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r['status'] == 'PASS')
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
print(f"Total Tests: {total}")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
if failed > 0:
print("\n❌ FAILED TESTS:")
for result in self.results:
if result['status'] == 'FAIL':
print(f"{result['method']} {result['endpoint']}")
if 'error' in result:
print(f" Error: {result['error']}")
# Performance statistics
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
if response_times:
avg_time = sum(response_times) / len(response_times)
max_time = max(response_times)
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
print(f"⏱️ Max Response Time: {max_time:.3f}s")
# Save detailed report
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
with open(report_file, 'w') as f:
json.dump({
"timestamp": time.time(),
"server_url": self.base_url,
"version": "0.7.0",
"summary": {
"total": total,
"passed": passed,
"failed": failed
},
"results": self.results
}, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
def main():
"""Main test runner"""
import argparse
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
args = parser.parse_args()
tester = SimpleApiTester(args.url)
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n🛑 Test suite interrupted by user")
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,7 @@ Test script for Link Extractor functionality
from crawl4ai.models import Link from crawl4ai.models import Link
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig from crawl4ai.async_configs import LinkPreviewConfig
import asyncio import asyncio
import sys import sys
import os import os
@@ -237,7 +237,7 @@ def test_config_examples():
print(f" {key}: {value}") print(f" {key}: {value}")
print(" Usage:") print(" Usage:")
print(" from crawl4ai import LinkPreviewConfig") print(" from crawl4ai.async_configs import LinkPreviewConfig")
print(" config = CrawlerRunConfig(") print(" config = CrawlerRunConfig(")
print(" link_preview_config=LinkPreviewConfig(") print(" link_preview_config=LinkPreviewConfig(")
for key, value in config_dict.items(): for key, value in config_dict.items():