Compare commits

..

3 Commits

Author SHA1 Message Date
AHMET YILMAZ
65902a4773 feat: Enhance stealth compatibility with new and legacy APIs, add configuration support 2025-07-16 17:41:47 +08:00
AHMET YILMAZ
5c13baf574 feat: Add stealth option to BrowserConfig for enhanced browser behavior 2025-07-15 15:48:23 +08:00
AHMET YILMAZ
d2759824ef fix: Update playwright-stealth to v2.0.0+ compatibility
Fixes #1273

- Replace deprecated stealth_async import with Stealth class
- Add stealth flag to BrowserConfig (default: true)
- Update async_crawler_strategy to use Stealth().apply_stealth_async()
- Remove obsolete StealthConfig from browser_manager
- Maintain backward compatibility with existing stealth functionality

This fixes compatibility issues with playwright-stealth v2.0.0+ where the API changed from stealth_async function to Stealth class.

test: Add comprehensive tests for playwright-stealth v2.0.0+ compatibility

- Test Stealth class import and instantiation
- Test apply_stealth_async method availability
- Test BrowserConfig stealth flag functionality
- Test stealth flag serialization
- Verify backward compatibility with existing stealth functionality
2025-07-15 15:31:15 +08:00
29 changed files with 850 additions and 1072 deletions

View File

@@ -1,141 +0,0 @@
name: Release Pipeline
on:
push:
tags:
- 'v*'
- '!test-v*' # Exclude test tags
jobs:
release:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Extract version from tag
id: get_version
run: |
TAG_VERSION=${GITHUB_REF#refs/tags/v}
echo "VERSION=$TAG_VERSION" >> $GITHUB_OUTPUT
echo "Releasing version: $TAG_VERSION"
- name: Install package dependencies
run: |
pip install -e .
- name: Check version consistency
run: |
TAG_VERSION=${{ steps.get_version.outputs.VERSION }}
PACKAGE_VERSION=$(python -c "from crawl4ai.__version__ import __version__; print(__version__)")
echo "Tag version: $TAG_VERSION"
echo "Package version: $PACKAGE_VERSION"
if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then
echo "❌ Version mismatch! Tag: $TAG_VERSION, Package: $PACKAGE_VERSION"
echo "Please update crawl4ai/__version__.py to match the tag version"
exit 1
fi
echo "✅ Version check passed: $TAG_VERSION"
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
- name: Build package
run: python -m build
- name: Check package
run: twine check dist/*
- name: Upload to PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: |
echo "📦 Uploading to PyPI..."
twine upload dist/*
echo "✅ Package uploaded to https://pypi.org/project/crawl4ai/"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
- name: Extract major and minor versions
id: versions
run: |
VERSION=${{ steps.get_version.outputs.VERSION }}
MAJOR=$(echo $VERSION | cut -d. -f1)
MINOR=$(echo $VERSION | cut -d. -f1-2)
echo "MAJOR=$MAJOR" >> $GITHUB_OUTPUT
echo "MINOR=$MINOR" >> $GITHUB_OUTPUT
- name: Build and push Docker images
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}
unclecode/crawl4ai:${{ steps.versions.outputs.MINOR }}
unclecode/crawl4ai:${{ steps.versions.outputs.MAJOR }}
unclecode/crawl4ai:latest
platforms: linux/amd64,linux/arm64
- name: Create GitHub Release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: v${{ steps.get_version.outputs.VERSION }}
release_name: Release v${{ steps.get_version.outputs.VERSION }}
body: |
## 🎉 Crawl4AI v${{ steps.get_version.outputs.VERSION }} Released!
### 📦 Installation
**PyPI:**
```bash
pip install crawl4ai==${{ steps.get_version.outputs.VERSION }}
```
**Docker:**
```bash
docker pull unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}
docker pull unclecode/crawl4ai:latest
```
### 📝 What's Changed
See [CHANGELOG.md](https://github.com/${{ github.repository }}/blob/main/CHANGELOG.md) for details.
draft: false
prerelease: false
- name: Summary
run: |
echo "## 🚀 Release Complete!" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📦 PyPI Package" >> $GITHUB_STEP_SUMMARY
echo "- Version: ${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "- URL: https://pypi.org/project/crawl4ai/" >> $GITHUB_STEP_SUMMARY
echo "- Install: \`pip install crawl4ai==${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🐳 Docker Images" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.versions.outputs.MINOR }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:${{ steps.versions.outputs.MAJOR }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:latest\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📋 GitHub Release" >> $GITHUB_STEP_SUMMARY
echo "https://github.com/${{ github.repository }}/releases/tag/v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY

View File

@@ -1,116 +0,0 @@
name: Test Release Pipeline
on:
push:
tags:
- 'test-v*'
jobs:
test-release:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Extract version from tag
id: get_version
run: |
TAG_VERSION=${GITHUB_REF#refs/tags/test-v}
echo "VERSION=$TAG_VERSION" >> $GITHUB_OUTPUT
echo "Testing with version: $TAG_VERSION"
- name: Install package dependencies
run: |
pip install -e .
- name: Check version consistency
run: |
TAG_VERSION=${{ steps.get_version.outputs.VERSION }}
PACKAGE_VERSION=$(python -c "from crawl4ai.__version__ import __version__; print(__version__)")
echo "Tag version: $TAG_VERSION"
echo "Package version: $PACKAGE_VERSION"
if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then
echo "❌ Version mismatch! Tag: $TAG_VERSION, Package: $PACKAGE_VERSION"
echo "Please update crawl4ai/__version__.py to match the tag version"
exit 1
fi
echo "✅ Version check passed: $TAG_VERSION"
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
- name: Build package
run: python -m build
- name: Check package
run: twine check dist/*
- name: Upload to Test PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.TEST_PYPI_TOKEN }}
run: |
echo "📦 Uploading to Test PyPI..."
twine upload --repository testpypi dist/* || {
if [ $? -eq 1 ]; then
echo "⚠️ Upload failed - likely version already exists on Test PyPI"
echo "Continuing anyway for test purposes..."
else
exit 1
fi
}
echo "✅ Test PyPI step complete"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Docker test images
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}
unclecode/crawl4ai:test-latest
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Summary
run: |
echo "## 🎉 Test Release Complete!" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 📦 Test PyPI Package" >> $GITHUB_STEP_SUMMARY
echo "- Version: ${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "- URL: https://test.pypi.org/project/crawl4ai/" >> $GITHUB_STEP_SUMMARY
echo "- Install: \`pip install -i https://test.pypi.org/simple/ crawl4ai==${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🐳 Docker Test Images" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`unclecode/crawl4ai:test-latest\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### 🧹 Cleanup Commands" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`bash" >> $GITHUB_STEP_SUMMARY
echo "# Remove test tag" >> $GITHUB_STEP_SUMMARY
echo "git tag -d test-v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "git push origin :test-v${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "# Remove Docker test images" >> $GITHUB_STEP_SUMMARY
echo "docker rmi unclecode/crawl4ai:test-${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_STEP_SUMMARY
echo "docker rmi unclecode/crawl4ai:test-latest" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`" >> $GITHUB_STEP_SUMMARY

View File

@@ -28,7 +28,7 @@ Crawl4AI is the #1 trending GitHub repository, actively maintained by a vibrant
[✨ Check out latest update v0.7.0](#-recent-updates)
🎉 **Version 0.7.0 is now available!** The Adaptive Intelligence Update introduces groundbreaking features: Adaptive Crawling that learns website patterns, Virtual Scroll support for infinite pages, intelligent Link Preview with 3-layer scoring, Async URL Seeder for massive discovery, and significant performance improvements. [Read the release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.0.md)
🎉 **Version 0.7.0 is now available!** The Adaptive Intelligence Update introduces groundbreaking features: Adaptive Crawling that learns website patterns, Virtual Scroll support for infinite pages, intelligent Link Preview with 3-layer scoring, Async URL Seeder for massive discovery, and significant performance improvements. [Read the release notes →](https://docs.crawl4ai.com/blog/release-v0.7.0)
<details>
<summary>🤓 <strong>My Personal Story</strong></summary>
@@ -523,18 +523,15 @@ async def test_news_crawl():
- **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically:
```python
config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to stop crawling
max_depth=5, # Maximum crawl depth
max_pages=20, # Maximum number of pages to crawl
strategy="statistical"
confidence_threshold=0.7,
max_history=100,
learning_rate=0.2
)
async with AsyncWebCrawler() as crawler:
adaptive_crawler = AdaptiveCrawler(crawler, config)
state = await adaptive_crawler.digest(
start_url="https://news.example.com",
query="latest news content"
)
result = await crawler.arun(
"https://news.example.com",
config=CrawlerRunConfig(adaptive_config=config)
)
# Crawler learns patterns and improves extraction over time
```

View File

@@ -3,7 +3,7 @@ import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
# MODIFIED: Add SeedingConfig and VirtualScrollConfig here
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig
from .content_scraping_strategy import (
ContentScrapingStrategy,
@@ -173,7 +173,6 @@ __all__ = [
"CompilationResult",
"ValidationResult",
"ErrorDetail",
"LinkPreviewConfig"
]

View File

@@ -1,7 +1,7 @@
# crawl4ai/__version__.py
# This is the version that will be used for stable releases
__version__ = "0.7.2"
__version__ = "0.7.0"
# For nightly builds, this gets set during build process
__nightly_version__ = None

View File

@@ -12,6 +12,20 @@ from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
import hashlib
# Backward compatible stealth import
try:
# Try new tf-playwright-stealth API (Stealth class)
from playwright_stealth import Stealth
STEALTH_NEW_API = True
except ImportError:
try:
# Try old playwright-stealth API (stealth_async function)
from playwright_stealth import stealth_async
STEALTH_NEW_API = False
except ImportError:
# No stealth available
STEALTH_NEW_API = None
import uuid
from .js_snippet import load_js_script
from .models import AsyncCrawlResponse
@@ -31,6 +45,107 @@ from types import MappingProxyType
import contextlib
from functools import partial
# Add StealthConfig class for backward compatibility and new features
class StealthConfig:
"""
Configuration class for stealth settings that works with tf-playwright-stealth.
This maintains backward compatibility while supporting all tf-playwright-stealth features.
"""
def __init__(
self,
# Common settings
enabled: bool = True,
# Core tf-playwright-stealth parameters (matching the actual library)
chrome_app: bool = True,
chrome_csi: bool = True,
chrome_load_times: bool = True,
chrome_runtime: bool = False, # Note: library default is False
hairline: bool = True,
iframe_content_window: bool = True,
media_codecs: bool = True,
navigator_hardware_concurrency: bool = True,
navigator_languages: bool = True,
navigator_permissions: bool = True,
navigator_platform: bool = True,
navigator_plugins: bool = True,
navigator_user_agent: bool = True,
navigator_vendor: bool = True,
navigator_webdriver: bool = True,
sec_ch_ua: bool = True,
webgl_vendor: bool = True,
# Override parameters
navigator_languages_override: tuple = ("en-US", "en"),
navigator_platform_override: str = "Win32",
navigator_user_agent_override: str = None,
navigator_vendor_override: str = None,
sec_ch_ua_override: str = None,
webgl_renderer_override: str = None,
webgl_vendor_override: str = None,
# Advanced parameters
init_scripts_only: bool = False,
script_logging: bool = False,
# Legacy parameters for backward compatibility
webdriver: bool = None, # This will be mapped to navigator_webdriver
user_agent_override: bool = None, # This will be mapped to navigator_user_agent
window_outerdimensions: bool = None, # This parameter doesn't exist in tf-playwright-stealth
):
self.enabled = enabled
# Handle legacy parameter mapping for backward compatibility
if webdriver is not None:
navigator_webdriver = webdriver
if user_agent_override is not None:
navigator_user_agent = user_agent_override
# Store all stealth options for the Stealth class - filter out None values
self.stealth_options = {
k: v for k, v in {
'chrome_app': chrome_app,
'chrome_csi': chrome_csi,
'chrome_load_times': chrome_load_times,
'chrome_runtime': chrome_runtime,
'hairline': hairline,
'iframe_content_window': iframe_content_window,
'media_codecs': media_codecs,
'navigator_hardware_concurrency': navigator_hardware_concurrency,
'navigator_languages': navigator_languages,
'navigator_permissions': navigator_permissions,
'navigator_platform': navigator_platform,
'navigator_plugins': navigator_plugins,
'navigator_user_agent': navigator_user_agent,
'navigator_vendor': navigator_vendor,
'navigator_webdriver': navigator_webdriver,
'sec_ch_ua': sec_ch_ua,
'webgl_vendor': webgl_vendor,
'navigator_languages_override': navigator_languages_override,
'navigator_platform_override': navigator_platform_override,
'navigator_user_agent_override': navigator_user_agent_override,
'navigator_vendor_override': navigator_vendor_override,
'sec_ch_ua_override': sec_ch_ua_override,
'webgl_renderer_override': webgl_renderer_override,
'webgl_vendor_override': webgl_vendor_override,
'init_scripts_only': init_scripts_only,
'script_logging': script_logging,
}.items() if v is not None
}
@classmethod
def from_dict(cls, config_dict: dict) -> 'StealthConfig':
"""Create StealthConfig from dictionary for easy configuration"""
return cls(**config_dict)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'enabled': self.enabled,
**self.stealth_options
}
class AsyncCrawlerStrategy(ABC):
"""
Abstract base class for crawler strategies.
@@ -39,7 +154,7 @@ class AsyncCrawlerStrategy(ABC):
@abstractmethod
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
pass # 4 + 3
pass # 4 + 3
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
@@ -220,6 +335,79 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
self.headers = headers
async def _apply_stealth(self, page: Page, stealth_config: Optional[StealthConfig] = None):
"""
Apply stealth measures to the page with backward compatibility and enhanced configuration.
This method automatically applies stealth measures and now supports configuration
through StealthConfig while maintaining backward compatibility.
Currently supports:
- tf-playwright-stealth (Stealth class with extensive configuration)
- Old playwright-stealth v1.x (stealth_async function) - legacy support
Args:
page (Page): The Playwright page object
stealth_config (Optional[StealthConfig]): Configuration for stealth settings
"""
if STEALTH_NEW_API is None:
# No stealth library available - silently continue
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="playwright-stealth not available, skipping stealth measures",
tag="STEALTH"
)
return
# Use default config if none provided
if stealth_config is None:
stealth_config = StealthConfig()
# Skip if stealth is disabled
if not stealth_config.enabled:
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="Stealth measures disabled in configuration",
tag="STEALTH"
)
return
try:
if STEALTH_NEW_API:
# Use tf-playwright-stealth API with configuration support
# Filter out any invalid parameters that might cause issues
valid_options = {}
for key, value in stealth_config.stealth_options.items():
# Accept boolean parameters and specific string/tuple parameters
if isinstance(value, (bool, str, tuple)):
valid_options[key] = value
stealth = Stealth(**valid_options)
await stealth.apply_stealth_async(page)
config_info = f"with {len(valid_options)} options"
else:
# Use old API (v1.x) - configuration options are limited
await stealth_async(page)
config_info = "default (v1.x legacy)"
# Only log if logger is available and in debug mode
if self.logger and hasattr(self.logger, 'debug'):
api_version = "tf-playwright-stealth" if STEALTH_NEW_API else "v1.x"
self.logger.debug(
message="Applied stealth measures using {version} {config}",
tag="STEALTH",
params={"version": api_version, "config": config_info}
)
except Exception as e:
# Silently continue if stealth fails - don't break the crawling process
if self.logger:
self.logger.warning(
message="Stealth measures failed, continuing without stealth: {error}",
tag="STEALTH",
params={"error": str(e)}
)
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
"""
Wait for a condition in a smart way. This functions works as below:
@@ -532,6 +720,24 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Get page for session
page, context = await self.browser_manager.get_page(crawlerRunConfig=config)
# Apply stealth measures automatically (backward compatible) with optional config
# Check multiple possible locations for stealth config for flexibility
stealth_config = None
if hasattr(config, 'stealth_config') and config.stealth_config:
stealth_config = config.stealth_config
elif hasattr(config, 'stealth') and config.stealth:
# Alternative attribute name for backward compatibility
stealth_config = config.stealth if isinstance(config.stealth, StealthConfig) else StealthConfig.from_dict(config.stealth)
elif config.magic:
# Enable more aggressive stealth in magic mode
stealth_config = StealthConfig(
navigator_webdriver=False, # More aggressive stealth
webdriver=False,
chrome_app=False
)
await self._apply_stealth(page, stealth_config)
# await page.goto(URL)
# Add default cookie
@@ -824,7 +1030,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Error:
visibility_info = await self.check_visibility(page)
if self.browser_config.verbose:
if self.browser_config.config.verbose:
self.logger.debug(
message="Body visibility info: {info}",
tag="DEBUG",
@@ -933,7 +1139,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
tag="VIEWPORT",
params={"error": str(e)},
)
# Handle full page scanning
if config.scan_full_page:
# await self._handle_full_page_scan(page, config.scroll_delay)
@@ -1837,8 +2042,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# }}
# }})();
# """
# )
# """ NEW VERSION:
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.

View File

@@ -502,12 +502,9 @@ class AsyncWebCrawler:
metadata = result.get("metadata", {})
else:
cleaned_html = sanitize_input_encode(result.cleaned_html)
# media = result.media.model_dump()
# tables = media.pop("tables", [])
# links = result.links.model_dump()
media = result.media.model_dump() if hasattr(result.media, 'model_dump') else result.media
tables = media.pop("tables", []) if isinstance(media, dict) else []
links = result.links.model_dump() if hasattr(result.links, 'model_dump') else result.links
media = result.media.model_dump()
tables = media.pop("tables", [])
links = result.links.model_dump()
metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)

View File

@@ -16,7 +16,6 @@ from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from .utils import get_chromium_path
BROWSER_DISABLE_OPTIONS = [
"--disable-background-networking",
"--disable-background-timer-throttling",

View File

@@ -27,10 +27,7 @@ from crawl4ai import (
PruningContentFilter,
BrowserProfiler,
DefaultMarkdownGenerator,
LLMConfig,
BFSDeepCrawlStrategy,
DFSDeepCrawlStrategy,
BestFirstCrawlingStrategy,
LLMConfig
)
from crawl4ai.config import USER_SETTINGS
from litellm import completion
@@ -1017,11 +1014,9 @@ def cdp_cmd(user_data_dir: Optional[str], port: int, browser_type: str, headless
@click.option("--question", "-q", help="Ask a question about the crawled content")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--profile", "-p", help="Use a specific browser profile (by name)")
@click.option("--deep-crawl", type=click.Choice(["bfs", "dfs", "best-first"]), help="Enable deep crawling with specified strategy (bfs, dfs, or best-first)")
@click.option("--max-pages", type=int, default=10, help="Maximum number of pages to crawl in deep crawl mode")
def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config: str,
extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict,
output: str, output_file: str, bypass_cache: bool, question: str, verbose: bool, profile: str, deep_crawl: str, max_pages: int):
output: str, output_file: str, bypass_cache: bool, question: str, verbose: bool, profile: str):
"""Crawl a website and extract content
Simple Usage:
@@ -1161,27 +1156,6 @@ Always return valid, properly formatted JSON."""
crawler_cfg.scraping_strategy = LXMLWebScrapingStrategy()
# Handle deep crawling configuration
if deep_crawl:
if deep_crawl == "bfs":
crawler_cfg.deep_crawl_strategy = BFSDeepCrawlStrategy(
max_depth=3,
max_pages=max_pages
)
elif deep_crawl == "dfs":
crawler_cfg.deep_crawl_strategy = DFSDeepCrawlStrategy(
max_depth=3,
max_pages=max_pages
)
elif deep_crawl == "best-first":
crawler_cfg.deep_crawl_strategy = BestFirstCrawlingStrategy(
max_depth=3,
max_pages=max_pages
)
if verbose:
console.print(f"[green]Deep crawling enabled:[/green] {deep_crawl} strategy, max {max_pages} pages")
config = get_global_config()
browser_cfg.verbose = config.get("VERBOSE", False)
@@ -1196,60 +1170,39 @@ Always return valid, properly formatted JSON."""
verbose
)
# Handle deep crawl results (list) vs single result
if isinstance(result, list):
if len(result) == 0:
click.echo("No results found during deep crawling")
return
# Use the first result for question answering and output
main_result = result[0]
all_results = result
else:
# Single result from regular crawling
main_result = result
all_results = [result]
# Handle question
if question:
provider, token = setup_llm_config()
markdown = main_result.markdown.raw_markdown
markdown = result.markdown.raw_markdown
anyio.run(stream_llm_response, url, markdown, question, provider, token)
return
# Handle output
if not output_file:
if output == "all":
if isinstance(result, list):
output_data = [r.model_dump() for r in all_results]
click.echo(json.dumps(output_data, indent=2))
else:
click.echo(json.dumps(main_result.model_dump(), indent=2))
click.echo(json.dumps(result.model_dump(), indent=2))
elif output == "json":
print(main_result.extracted_content)
extracted_items = json.loads(main_result.extracted_content)
print(result.extracted_content)
extracted_items = json.loads(result.extracted_content)
click.echo(json.dumps(extracted_items, indent=2))
elif output in ["markdown", "md"]:
click.echo(main_result.markdown.raw_markdown)
click.echo(result.markdown.raw_markdown)
elif output in ["markdown-fit", "md-fit"]:
click.echo(main_result.markdown.fit_markdown)
click.echo(result.markdown.fit_markdown)
else:
if output == "all":
with open(output_file, "w") as f:
if isinstance(result, list):
output_data = [r.model_dump() for r in all_results]
f.write(json.dumps(output_data, indent=2))
else:
f.write(json.dumps(main_result.model_dump(), indent=2))
f.write(json.dumps(result.model_dump(), indent=2))
elif output == "json":
with open(output_file, "w") as f:
f.write(main_result.extracted_content)
f.write(result.extracted_content)
elif output in ["markdown", "md"]:
with open(output_file, "w") as f:
f.write(main_result.markdown.raw_markdown)
f.write(result.markdown.raw_markdown)
elif output in ["markdown-fit", "md-fit"]:
with open(output_file, "w") as f:
f.write(main_result.markdown.fit_markdown)
f.write(result.markdown.fit_markdown)
except Exception as e:
raise click.ClickException(str(e))
@@ -1401,11 +1354,9 @@ def profiles_cmd():
@click.option("--question", "-q", help="Ask a question about the crawled content")
@click.option("--verbose", "-v", is_flag=True)
@click.option("--profile", "-p", help="Use a specific browser profile (by name)")
@click.option("--deep-crawl", type=click.Choice(["bfs", "dfs", "best-first"]), help="Enable deep crawling with specified strategy")
@click.option("--max-pages", type=int, default=10, help="Maximum number of pages to crawl in deep crawl mode")
def default(url: str, example: bool, browser_config: str, crawler_config: str, filter_config: str,
extraction_config: str, json_extract: str, schema: str, browser: Dict, crawler: Dict,
output: str, bypass_cache: bool, question: str, verbose: bool, profile: str, deep_crawl: str, max_pages: int):
output: str, bypass_cache: bool, question: str, verbose: bool, profile: str):
"""Crawl4AI CLI - Web content extraction tool
Simple Usage:
@@ -1455,9 +1406,7 @@ def default(url: str, example: bool, browser_config: str, crawler_config: str, f
bypass_cache=bypass_cache,
question=question,
verbose=verbose,
profile=profile,
deep_crawl=deep_crawl,
max_pages=max_pages
profile=profile
)
def main():

View File

@@ -1145,10 +1145,10 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
link_data["intrinsic_score"] = intrinsic_score
except Exception:
# Fail gracefully - assign default score
link_data["intrinsic_score"] = 0
link_data["intrinsic_score"] = float('inf')
else:
# No scoring enabled - assign infinity (all links equal priority)
link_data["intrinsic_score"] = 0
link_data["intrinsic_score"] = float('inf')
is_external = is_external_url(normalized_href, base_domain)
if is_external:

View File

@@ -3342,13 +3342,7 @@ async def get_text_embeddings(
# Default: use sentence-transformers
else:
# Lazy load to avoid importing heavy libraries unless needed
try:
from sentence_transformers import SentenceTransformer
except ImportError:
raise ImportError(
"sentence-transformers is required for local embeddings. "
"Install it with: pip install 'crawl4ai[transformer]' or pip install sentence-transformers"
)
from sentence_transformers import SentenceTransformer
# Cache the model in function attribute to avoid reloading
if not hasattr(get_text_embeddings, '_models'):

View File

@@ -5,7 +5,6 @@ from typing import List, Tuple, Dict
from functools import partial
from uuid import uuid4
from datetime import datetime
from base64 import b64encode
import logging
from typing import Optional, AsyncGenerator
@@ -372,9 +371,6 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = json.dumps(result_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8')
@@ -447,19 +443,10 @@ async def handle_crawl_request(
mem_delta_mb = end_mem_mb - start_mem_mb # <--- Calculate delta
peak_mem_mb = max(peak_mem_mb if peak_mem_mb else 0, end_mem_mb) # <--- Get peak memory
logger.info(f"Memory usage: Start: {start_mem_mb} MB, End: {end_mem_mb} MB, Delta: {mem_delta_mb} MB, Peak: {peak_mem_mb} MB")
# Process results to handle PDF bytes
processed_results = []
for result in results:
result_dict = result.model_dump()
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
processed_results.append(result_dict)
return {
"success": True,
"results": processed_results,
"results": [result.model_dump() for result in results],
"server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization
- **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,41 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores
```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
import asyncio
from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
async def main():
# Configure adaptive crawler
config = AdaptiveConfig(
strategy="statistical", # or "embedding" for semantic understanding
max_pages=10,
confidence_threshold=0.7, # Stop at 70% confidence
top_k_links=3, # Follow top 3 links per page
min_gain_threshold=0.05 # Need 5% information gain to continue
# Initialize with custom learning parameters
config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to use learned patterns
max_history=100, # Remember last 100 crawls per domain
learning_rate=0.2, # How quickly to adapt to changes
patterns_per_page=3, # Patterns to learn per page type
extraction_strategy='css' # 'css' or 'xpath'
)
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
)
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
print("Starting adaptive crawl about Python decorators...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
# Crawler identifies and stores patterns
if result.success:
state = adaptive_crawler.get_state("news.example.com")
print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
asyncio.run(main())
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
```
**Expected Real-World Impact:**
@@ -88,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load
wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
)
# For e-commerce product grids (Instagram style)
@@ -96,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid",
scroll_count=30,
scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time
wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
)
# For news feeds with lazy loading
@@ -104,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed",
scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load
wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
)
# Use it in your crawl
@@ -148,63 +157,68 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### Intelligent Link Analysis and Scoring
### The Three-Layer Scoring System
```python
import asyncio
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
async def main():
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
include_internal=True,
include_external=False,
max_links=10,
concurrency=5,
query="python tutorial", # For contextual scoring
score_threshold=0.3,
verbose=True
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
# What to analyze
include_internal=True,
include_external=True,
max_links=100, # Analyze top 100 links
# Relevance scoring
query="machine learning tutorials", # Your interest
score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
)
# Use in your crawl
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
)
# Access scored and sorted links
if result.success and result.links:
for link in result.links.get("internal", []):
text = link.get('text', 'No text')[:40]
print(
text,
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10",
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1",
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
# Access scored and sorted links
for link in result.links["internal"][:10]: # Top 10 internal links
print(f"Score: {link['total_score']:.3f}")
print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
print(f" Description: {link['head_data']['meta']['description'][:100]}...")
```
**Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators
1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer)
- Link attributes (rel, title, class names)
- Anchor text quality and length
- URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm
2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title
- Meta description analysis
- Content preview scoring
3. **Total Score**: Combined score for final ranking
3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -221,34 +235,58 @@ asyncio.run(main())
### Technical Architecture
```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig
async def main():
async with AsyncUrlSeeder() as seeder:
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter
extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
# Basic discovery - find all product pages
seeder_config = SeedingConfig(
# Discovery sources
source="sitemap+cc", # Sitemap + Common Crawl
# Filtering
pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation
live_check=True, # Verify URLs are alive
max_urls=5000, # Stop at 5000 URLs
# Performance
concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
)
asyncio.run(main())
seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
```
**Discovery Methods:**
@@ -271,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized
```python
# Optimized crawling with v0.7.0 improvements
# Before v0.7.0 (slow)
results = []
for url in urls:
result = await crawler.arun(
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
result = await crawler.arun(url)
results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
```
**Performance Gains:**
@@ -292,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes

View File

@@ -1,43 +0,0 @@
# 🛠️ Crawl4AI v0.7.1: Minor Cleanup Update
*July 17, 2025 • 2 min read*
---
A small maintenance release that removes unused code and improves documentation.
## 🎯 What's Changed
- **Removed unused StealthConfig** from `crawl4ai/browser_manager.py`
- **Updated documentation** with better examples and parameter explanations
- **Fixed virtual scroll configuration** examples in docs
## 🧹 Code Cleanup
Removed unused `StealthConfig` import and configuration that wasn't being used anywhere in the codebase. The project uses its own custom stealth implementation through JavaScript injection instead.
```python
# Removed unused code:
from playwright_stealth import StealthConfig
stealth_config = StealthConfig(...) # This was never used
```
## 📖 Documentation Updates
- Fixed adaptive crawling parameter examples
- Updated session management documentation
- Corrected virtual scroll configuration examples
## 🚀 Installation
```bash
pip install crawl4ai==0.7.1
```
No breaking changes - upgrade directly from v0.7.0.
---
Questions? Issues?
- GitHub: [github.com/unclecode/crawl4ai](https://github.com/unclecode/crawl4ai)
- Discord: [discord.gg/crawl4ai](https://discord.gg/jP8KfhDhyN)

View File

@@ -18,7 +18,7 @@ Usage:
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig
from crawl4ai.async_configs import LinkPreviewConfig
async def basic_link_head_extraction():

View File

@@ -49,75 +49,46 @@ from crawl4ai import JsonCssExtractionStrategy
from crawl4ai.cache_context import CacheMode
async def crawl_dynamic_content():
url = "https://github.com/microsoft/TypeScript/commits/main"
session_id = "wait_for_session"
all_commits = []
async with AsyncWebCrawler() as crawler:
session_id = "github_commits_session"
url = "https://github.com/microsoft/TypeScript/commits/main"
all_commits = []
js_next_page = """
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4');
if (commits.length > 0) {
window.lastCommit = commits[0].textContent.trim();
}
const button = document.querySelector('a[data-testid="pagination-next-button"]');
if (button) {button.click(); console.log('button clicked') }
"""
# Define extraction schema
schema = {
"name": "Commit Extractor",
"baseSelector": "li.Box-sc-g0xbh4-0",
"fields": [{
"name": "title", "selector": "h4.markdown-title", "type": "text"
}],
}
extraction_strategy = JsonCssExtractionStrategy(schema)
wait_for = """() => {
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4');
if (commits.length === 0) return false;
const firstCommit = commits[0].textContent.trim();
return firstCommit !== window.lastCommit;
}"""
schema = {
"name": "Commit Extractor",
"baseSelector": "li[data-testid='commit-row-item']",
"fields": [
{
"name": "title",
"selector": "h4 a",
"type": "text",
"transform": "strip",
},
],
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
browser_config = BrowserConfig(
verbose=True,
headless=False,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# JavaScript and wait configurations
js_next_page = """document.querySelector('a[data-testid="pagination-next-button"]').click();"""
wait_for = """() => document.querySelectorAll('li.Box-sc-g0xbh4-0').length > 0"""
# Crawl multiple pages
for page in range(3):
crawler_config = CrawlerRunConfig(
config = CrawlerRunConfig(
url=url,
session_id=session_id,
css_selector="li[data-testid='commit-row-item']",
extraction_strategy=extraction_strategy,
js_code=js_next_page if page > 0 else None,
wait_for=wait_for if page > 0 else None,
js_only=page > 0,
cache_mode=CacheMode.BYPASS,
capture_console_messages=True,
cache_mode=CacheMode.BYPASS
)
result = await crawler.arun(url=url, config=crawler_config)
if result.console_messages:
print(f"Page {page + 1} console messages:", result.console_messages)
if result.extracted_content:
# print(f"Page {page + 1} result:", result.extracted_content)
result = await crawler.arun(config=config)
if result.success:
commits = json.loads(result.extracted_content)
all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits")
else:
print(f"Page {page + 1}: No content extracted")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Clean up session
await crawler.crawler_strategy.kill_session(session_id)
return all_commits
```
---

View File

@@ -91,12 +91,13 @@ async def crawl_twitter_timeline():
wait_after_scroll=1.0 # Twitter needs time to load
)
browser_config = BrowserConfig(headless=True) # Set to False to watch it work
config = CrawlerRunConfig(
virtual_scroll_config=virtual_config
virtual_scroll_config=virtual_config,
# Optional: Set headless=False to watch it work
# browser_config=BrowserConfig(headless=False)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://twitter.com/search?q=AI",
config=config
@@ -199,7 +200,7 @@ Use **scan_full_page** when:
Virtual Scroll works seamlessly with extraction strategies:
```python
from crawl4ai import LLMExtractionStrategy, LLMConfig
from crawl4ai import LLMExtractionStrategy
# Define extraction schema
schema = {
@@ -221,7 +222,7 @@ config = CrawlerRunConfig(
scroll_count=20
),
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
provider="openai/gpt-4o-mini",
schema=schema
)
)

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization
- **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,41 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores
```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig
import asyncio
from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
async def main():
# Configure adaptive crawler
config = AdaptiveConfig(
strategy="statistical", # or "embedding" for semantic understanding
max_pages=10,
confidence_threshold=0.7, # Stop at 70% confidence
top_k_links=3, # Follow top 3 links per page
min_gain_threshold=0.05 # Need 5% information gain to continue
# Initialize with custom learning parameters
config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to use learned patterns
max_history=100, # Remember last 100 crawls per domain
learning_rate=0.2, # How quickly to adapt to changes
patterns_per_page=3, # Patterns to learn per page type
extraction_strategy='css' # 'css' or 'xpath'
)
adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://news.example.com/article/12345",
config=CrawlerRunConfig(
adaptive_config=config,
extraction_hints={ # Optional hints to speed up learning
"title": "article h1",
"content": "article .body-content"
}
)
)
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
print("Starting adaptive crawl about Python decorators...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/glossary.html",
query="python decorators functions wrapping"
)
print(f"\n✅ Crawling Complete!")
print(f"• Confidence Level: {adaptive.confidence:.0%}")
print(f"• Pages Crawled: {len(result.crawled_urls)}")
print(f"• Knowledge Base: {len(adaptive.state.knowledge_base)} documents")
# Get most relevant content
relevant = adaptive.get_relevant_content(top_k=3)
print(f"\nMost Relevant Pages:")
for i, page in enumerate(relevant, 1):
print(f"{i}. {page['url']} (relevance: {page['score']:.2%})")
# Crawler identifies and stores patterns
if result.success:
state = adaptive_crawler.get_state("news.example.com")
print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
asyncio.run(main())
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
```
**Expected Real-World Impact:**
@@ -88,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load
wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
)
# For e-commerce product grids (Instagram style)
@@ -96,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid",
scroll_count=30,
scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time
wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
)
# For news feeds with lazy loading
@@ -104,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed",
scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load
wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
)
# Use it in your crawl
@@ -148,63 +157,68 @@ async with AsyncWebCrawler() as crawler:
**My Solution:** I implemented a three-layer scoring system that analyzes links like a human would—considering their position, context, and relevance to your goals.
### Intelligent Link Analysis and Scoring
### The Three-Layer Scoring System
```python
import asyncio
from crawl4ai import CrawlerRunConfig, CacheMode, AsyncWebCrawler
from crawl4ai.adaptive_crawler import LinkPreviewConfig
from crawl4ai import LinkPreviewConfig
async def main():
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
include_internal=True,
include_external=False,
max_links=10,
concurrency=5,
query="python tutorial", # For contextual scoring
score_threshold=0.3,
verbose=True
# Configure intelligent link analysis
link_config = LinkPreviewConfig(
# What to analyze
include_internal=True,
include_external=True,
max_links=100, # Analyze top 100 links
# Relevance scoring
query="machine learning tutorials", # Your interest
score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
)
# Use in your crawl
result = await crawler.arun(
"https://tech-blog.example.com",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True
)
# Use in your crawl
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
"https://www.geeksforgeeks.org/",
config=CrawlerRunConfig(
link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring
cache_mode=CacheMode.BYPASS
)
)
)
# Access scored and sorted links
if result.success and result.links:
for link in result.links.get("internal", []):
text = link.get('text', 'No text')[:40]
print(
text,
f"{link.get('intrinsic_score', 0):.1f}/10" if link.get('intrinsic_score') is not None else "0.0/10",
f"{link.get('contextual_score', 0):.2f}/1" if link.get('contextual_score') is not None else "0.00/1",
f"{link.get('total_score', 0):.3f}" if link.get('total_score') is not None else "0.000"
)
asyncio.run(main())
# Access scored and sorted links
for link in result.links["internal"][:10]: # Top 10 internal links
print(f"Score: {link['total_score']:.3f}")
print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
print(f" Description: {link['head_data']['meta']['description'][:100]}...")
```
**Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators
1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer)
- Link attributes (rel, title, class names)
- Anchor text quality and length
- URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm
2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title
- Meta description analysis
- Content preview scoring
3. **Total Score**: Combined score for final ranking
3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -221,34 +235,58 @@ asyncio.run(main())
### Technical Architecture
```python
import asyncio
from crawl4ai import AsyncUrlSeeder, SeedingConfig
async def main():
async with AsyncUrlSeeder() as seeder:
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter
extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
print(f"\n{i}. {url_info['url']}")
if url_info.get('relevance_score'):
print(f" Relevance: {url_info['relevance_score']:.3f}")
if url_info.get('head_data', {}).get('title'):
print(f" Title: {url_info['head_data']['title'][:60]}...")
# Basic discovery - find all product pages
seeder_config = SeedingConfig(
# Discovery sources
source="sitemap+cc", # Sitemap + Common Crawl
# Filtering
pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation
live_check=True, # Verify URLs are alive
max_urls=5000, # Stop at 5000 URLs
# Performance
concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limiting
)
asyncio.run(main())
seeder = AsyncUrlSeeder(seeder_config)
urls = await seeder.discover("https://shop.example.com")
# Advanced: Relevance-based discovery
research_config = SeedingConfig(
source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only
# Content relevance
extract_head=True, # Get meta tags
query="quantum computing tutorials",
scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only
# Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache
)
# Discover with progress tracking
discovered = []
async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered.extend(batch)
print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata
for url_data in discovered[:5]:
print(f"URL: {url_data['url']}")
print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['title']}")
```
**Discovery Methods:**
@@ -271,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized
```python
# Optimized crawling with v0.7.0 improvements
# Before v0.7.0 (slow)
results = []
for url in urls:
result = await crawler.arun(
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
result = await crawler.arun(url)
results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
```
**Performance Gains:**
@@ -292,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes

View File

@@ -35,7 +35,7 @@ from crawl4ai import AsyncWebCrawler, AdaptiveCrawler
async def main():
async with AsyncWebCrawler() as crawler:
# Create an adaptive crawler (config is optional)
# Create an adaptive crawler
adaptive = AdaptiveCrawler(crawler)
# Start crawling with a query
@@ -59,13 +59,13 @@ async def main():
from crawl4ai import AdaptiveConfig
config = AdaptiveConfig(
confidence_threshold=0.8, # Stop when 80% confident (default: 0.7)
max_pages=30, # Maximum pages to crawl (default: 20)
top_k_links=5, # Links to follow per page (default: 3)
confidence_threshold=0.7, # Stop when 70% confident (default: 0.8)
max_pages=20, # Maximum pages to crawl (default: 50)
top_k_links=3, # Links to follow per page (default: 5)
min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1)
)
adaptive = AdaptiveCrawler(crawler, config)
adaptive = AdaptiveCrawler(crawler, config=config)
```
## Crawling Strategies
@@ -198,8 +198,8 @@ if result.metrics.get('is_irrelevant', False):
The confidence score (0-1) indicates how sufficient the gathered information is:
- **0.0-0.3**: Insufficient information, needs more crawling
- **0.3-0.6**: Partial information, may answer basic queries
- **0.6-0.7**: Good coverage, can answer most queries
- **0.7-1.0**: Excellent coverage, comprehensive information
- **0.6-0.8**: Good coverage, can answer most queries
- **0.8-1.0**: Excellent coverage, comprehensive information
### Statistics Display
@@ -257,9 +257,9 @@ new_adaptive.import_knowledge_base("knowledge_base.jsonl")
- Avoid overly broad queries
### 2. Threshold Tuning
- Start with default (0.7) for general use
- Lower to 0.5-0.6 for exploratory crawling
- Raise to 0.8+ for exhaustive coverage
- Start with default (0.8) for general use
- Lower to 0.6-0.7 for exploratory crawling
- Raise to 0.9+ for exhaustive coverage
### 3. Performance Optimization
- Use appropriate `max_pages` limits

View File

@@ -52,9 +52,11 @@ That's it! In just a few lines, you've automated a complete search workflow.
Want to learn by doing? We've got you covered:
**🚀 [Live Demo](https://docs.crawl4ai.com/apps/c4a-script/)** - Try C4A-Script in your browser right now!
**🚀 [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)** - Try C4A-Script in your browser right now!
**📁 [Tutorial Examples](https://github.com/unclecode/crawl4ai/blob/main/docs/examples/c4a_script/)** - Complete examples with source code
**📁 [Tutorial Examples](/examples/c4a_script/)** - Complete examples with source code
**🛠️ [Local Tutorial](/examples/c4a_script/tutorial/)** - Run the interactive tutorial on your machine
### Running the Tutorial Locally

View File

@@ -125,7 +125,7 @@ Here's a full example you can copy, paste, and run immediately:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig
from crawl4ai.async_configs import LinkPreviewConfig
async def extract_link_heads_example():
"""
@@ -237,7 +237,7 @@ if __name__ == "__main__":
The `LinkPreviewConfig` class supports these options:
```python
from crawl4ai import LinkPreviewConfig
from crawl4ai.async_configs import LinkPreviewConfig
link_preview_config = LinkPreviewConfig(
# BASIC SETTINGS

View File

@@ -137,7 +137,7 @@ async def smart_blog_crawler():
word_count_threshold=300 # Only substantial articles
)
# Extract URLs and crawl them
# Extract URLs and stream results as they come
tutorial_urls = [t["url"] for t in tutorials[:10]]
results = await crawler.arun_many(tutorial_urls, config=config)
@@ -231,7 +231,7 @@ Common Crawl is a massive public dataset that regularly crawls the entire web. I
```python
# Use both sources
config = SeedingConfig(source="sitemap+cc")
config = SeedingConfig(source="cc+sitemap")
urls = await seeder.urls("example.com", config)
```
@@ -241,13 +241,13 @@ The `SeedingConfig` object is your control panel. Here's everything you can conf
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `source` | str | "sitemap+cc" | URL source: "cc" (Common Crawl), "sitemap", or "sitemap+cc" |
| `source` | str | "cc" | URL source: "cc" (Common Crawl), "sitemap", or "cc+sitemap" |
| `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") |
| `extract_head` | bool | False | Extract metadata from page `<head>` |
| `live_check` | bool | False | Verify URLs are accessible |
| `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) |
| `concurrency` | int | 10 | Parallel workers for fetching |
| `hits_per_sec` | int | 5 | Rate limit for requests |
| `hits_per_sec` | int | None | Rate limit for requests |
| `force` | bool | False | Bypass cache, fetch fresh data |
| `verbose` | bool | False | Show detailed progress |
| `query` | str | None | Search query for BM25 scoring |
@@ -522,7 +522,7 @@ urls = await seeder.urls("docs.example.com", config)
```python
# Find specific products
config = SeedingConfig(
source="sitemap+cc", # Use both sources
source="cc+sitemap", # Use both sources
extract_head=True,
query="wireless headphones noise canceling",
scoring_method="bm25",
@@ -782,7 +782,7 @@ class ResearchAssistant:
# Step 1: Discover relevant URLs
config = SeedingConfig(
source="sitemap+cc", # Maximum coverage
source="cc+sitemap", # Maximum coverage
extract_head=True, # Get metadata
query=topic, # Research topic
scoring_method="bm25", # Smart scoring
@@ -832,8 +832,7 @@ class ResearchAssistant:
# Extract URLs and crawl all articles
article_urls = [article['url'] for article in top_articles]
results = []
crawl_results = await crawler.arun_many(article_urls, config=config)
async for result in crawl_results:
async for result in await crawler.arun_many(article_urls, config=config):
if result.success:
results.append({
'url': result.url,
@@ -934,10 +933,10 @@ config = SeedingConfig(concurrency=10, hits_per_sec=5)
# When crawling many URLs
async with AsyncWebCrawler() as crawler:
# Assuming urls is a list of URL strings
crawl_results = await crawler.arun_many(urls, config=config)
results = await crawler.arun_many(urls, config=config)
# Process as they arrive
async for result in crawl_results:
async for result in results:
process_immediately(result) # Don't wait for all
```
@@ -1021,7 +1020,7 @@ config = SeedingConfig(
# E-commerce product discovery
config = SeedingConfig(
source="sitemap+cc",
source="cc+sitemap",
pattern="*/product/*",
extract_head=True,
live_check=True

View File

@@ -28,7 +28,7 @@ from rich import box
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, AdaptiveCrawler, AdaptiveConfig, BrowserConfig, CacheMode
from crawl4ai import AsyncUrlSeeder, SeedingConfig
from crawl4ai import LinkPreviewConfig, VirtualScrollConfig
from crawl4ai.async_configs import LinkPreviewConfig, VirtualScrollConfig
from crawl4ai import c4a_compile, CompilationResult
# Initialize Rich console for beautiful output

View File

@@ -13,13 +13,14 @@ from crawl4ai import (
BrowserConfig,
CacheMode,
# New imports for v0.7.0
VirtualScrollConfig,
LinkPreviewConfig,
VirtualScrollConfig,
AdaptiveCrawler,
AdaptiveConfig,
AsyncUrlSeeder,
SeedingConfig,
c4a_compile,
CompilationResult
)
@@ -169,16 +170,16 @@ async def demo_url_seeder():
# Discover Python tutorial URLs
config = SeedingConfig(
source="sitemap", # Use sitemap
pattern="*python*", # URL pattern filter
pattern="*tutorial*", # URL pattern filter
extract_head=True, # Get metadata
query="python tutorial", # For relevance scoring
query="python async programming", # For relevance scoring
scoring_method="bm25",
score_threshold=0.2,
max_urls=10
)
print("Discovering Python async tutorial URLs...")
urls = await seeder.urls("https://www.geeksforgeeks.org/", config)
urls = await seeder.urls("docs.python.org", config)
print(f"\n✅ Found {len(urls)} relevant URLs:")
for i, url_info in enumerate(urls[:5], 1):
@@ -244,6 +245,39 @@ IF (EXISTS `.price-filter`) THEN CLICK `input[data-max-price="100"]`
print(f"❌ Compilation error: {result.first_error.message}")
async def demo_pdf_support():
"""
Demo 6: PDF Parsing Support
Shows how to extract content from PDF files.
Note: Requires 'pip install crawl4ai[pdf]'
"""
print("\n" + "="*60)
print("📄 DEMO 6: PDF Parsing Support")
print("="*60)
try:
# Check if PDF support is installed
import PyPDF2
# Example: Process a PDF URL
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
pdf=True, # Enable PDF generation
extract_text_from_pdf=True # Extract text content
)
print("PDF parsing is available!")
print("You can now crawl PDF URLs and extract their content.")
print("\nExample usage:")
print(' result = await crawler.arun("https://example.com/document.pdf")')
print(' pdf_text = result.extracted_content # Contains extracted text')
except ImportError:
print("⚠️ PDF support not installed.")
print("Install with: pip install crawl4ai[pdf]")
async def main():
"""Run all demos"""
print("\n🚀 Crawl4AI v0.7.0 Feature Demonstrations")
@@ -255,6 +289,7 @@ async def main():
("Virtual Scroll", demo_virtual_scroll),
("URL Seeder", demo_url_seeder),
("C4A Script", demo_c4a_script),
("PDF Support", demo_pdf_support)
]
for name, demo_func in demos:
@@ -274,6 +309,7 @@ async def main():
print("• Virtual Scroll: Capture all content from modern web pages")
print("• URL Seeder: Pre-discover and filter URLs efficiently")
print("• C4A Script: Simple language for complex automations")
print("• PDF Support: Extract content from PDF documents")
if __name__ == "__main__":

View File

@@ -44,6 +44,7 @@ dependencies = [
"brotli>=1.1.0",
"humanize>=4.10.0",
"lark>=1.2.2",
"sentence-transformers>=2.2.0",
"alphashape>=1.3.1",
"shapely>=2.0.0"
]
@@ -61,8 +62,8 @@ classifiers = [
[project.optional-dependencies]
pdf = ["PyPDF2"]
torch = ["torch", "nltk", "scikit-learn"]
transformer = ["transformers", "tokenizers", "sentence-transformers"]
cosine = ["torch", "transformers", "nltk", "sentence-transformers"]
transformer = ["transformers", "tokenizers"]
cosine = ["torch", "transformers", "nltk"]
sync = ["selenium"]
all = [
"PyPDF2",
@@ -71,8 +72,8 @@ all = [
"scikit-learn",
"transformers",
"tokenizers",
"sentence-transformers",
"selenium"
"selenium",
"PyPDF2"
]
[project.scripts]

View File

@@ -24,6 +24,7 @@ cssselect>=1.2.0
chardet>=5.2.0
brotli>=1.1.0
httpx[http2]>=0.27.2
sentence-transformers>=2.2.0
alphashape>=1.3.1
shapely>=2.0.0

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Test suite for playwright-stealth backward compatibility.
Tests that stealth functionality works automatically without user configuration.
"""
import pytest
import asyncio
from unittest.mock import Mock, patch, MagicMock
class TestPlaywrightStealthCompatibility:
"""Test playwright-stealth backward compatibility with transparent operation"""
def test_api_detection_works(self):
"""Test that API detection works correctly"""
from crawl4ai.async_crawler_strategy import STEALTH_NEW_API
# The value depends on which version is installed, but should not be undefined
assert STEALTH_NEW_API is not None or STEALTH_NEW_API is False or STEALTH_NEW_API is None
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_apply_stealth_new_api(self, mock_stealth_class):
"""Test stealth application with new API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock()
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify new API was used
mock_stealth_class.assert_called_once()
mock_stealth_instance.apply_stealth_async.assert_called_once_with(mock_page)
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', False)
async def test_apply_stealth_legacy_api(self):
"""Test stealth application with legacy API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Mock stealth_async function by setting it as a module attribute
mock_stealth_async = Mock()
mock_stealth_async.return_value = None
# Import the module to add the mock function
import crawl4ai.async_crawler_strategy
crawl4ai.async_crawler_strategy.stealth_async = mock_stealth_async
try:
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify legacy API was used
mock_stealth_async.assert_called_once_with(mock_page)
finally:
# Clean up
if hasattr(crawl4ai.async_crawler_strategy, 'stealth_async'):
delattr(crawl4ai.async_crawler_strategy, 'stealth_async')
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', None)
async def test_apply_stealth_no_library(self):
"""Test stealth application when no stealth library is available"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently even without stealth
await strategy._apply_stealth(mock_page)
# Should complete without error even when no stealth is available
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_stealth_error_handling(self, mock_stealth_class):
"""Test that stealth errors are handled gracefully without breaking crawling"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock to raise an error
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock(side_effect=Exception("Stealth failed"))
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should not raise an error, continue silently
await strategy._apply_stealth(mock_page)
# Should complete without raising the stealth error
def test_strategy_creation_without_config(self):
"""Test that strategy can be created without any stealth configuration"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Should work without any stealth-related parameters
strategy = AsyncPlaywrightCrawlerStrategy()
assert strategy is not None
assert hasattr(strategy, '_apply_stealth')
def test_browser_config_works_without_stealth_param(self):
"""Test that BrowserConfig works without stealth parameter"""
from crawl4ai.async_configs import BrowserConfig
# Should work without stealth parameter
config = BrowserConfig()
assert config is not None
# Should also work with other parameters
config = BrowserConfig(headless=False, browser_type="firefox")
assert config.headless == False
assert config.browser_type == "firefox"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,345 +0,0 @@
#!/usr/bin/env python3
"""
Simple API Test for Crawl4AI Docker Server v0.7.0
Uses only built-in Python modules to test all endpoints.
"""
import urllib.request
import urllib.parse
import json
import time
import sys
from typing import Dict, List, Optional
# Configuration
BASE_URL = "http://localhost:11234" # Change to your server URL
TEST_TIMEOUT = 30
class SimpleApiTester:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.token = None
self.results = []
def log(self, message: str):
print(f"[INFO] {message}")
def test_get_endpoint(self, endpoint: str) -> Dict:
"""Test a GET endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
req = urllib.request.Request(url)
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "GET",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "GET",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
"""Test a POST endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "POST",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "POST",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def print_result(self, result: Dict):
"""Print a formatted test result"""
status_color = {
"PASS": "",
"FAIL": "",
"SKIP": "⏭️"
}
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
if result['status'] == 'FAIL' and 'error' in result:
print(f" Error: {result['error']}")
self.results.append(result)
def run_all_tests(self):
"""Run all API tests"""
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
print(f"📡 Testing server at: {self.base_url}")
print("=" * 60)
# # Test basic endpoints
# print("\n=== BASIC ENDPOINTS ===")
# # Health check
# result = self.test_get_endpoint("/health")
# self.print_result(result)
# # Schema endpoint
# result = self.test_get_endpoint("/schema")
# self.print_result(result)
# # Metrics endpoint
# result = self.test_get_endpoint("/metrics")
# self.print_result(result)
# # Root redirect
# result = self.test_get_endpoint("/")
# self.print_result(result)
# # Test authentication
# print("\n=== AUTHENTICATION ===")
# # Get token
# token_payload = {"email": "test@example.com"}
# result = self.test_post_endpoint("/token", token_payload)
# self.print_result(result)
# # Extract token if successful
# if result['status'] == 'PASS' and 'data' in result:
# token = result['data'].get('access_token')
# if token:
# self.token = token
# self.log(f"Successfully obtained auth token: {token[:20]}...")
# Test core APIs
print("\n=== CORE APIs ===")
test_url = "https://example.com"
# Test markdown endpoint
md_payload = {
"url": test_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", md_payload)
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
self.print_result(result)
# Test screenshot endpoint
screenshot_payload = {
"url": test_url,
"screenshot_wait_for": 2
}
result = self.test_post_endpoint("/screenshot", screenshot_payload)
self.print_result(result)
# Test PDF endpoint
pdf_payload = {"url": test_url}
result = self.test_post_endpoint("/pdf", pdf_payload)
self.print_result(result)
# Test JavaScript execution
js_payload = {
"url": test_url,
"scripts": ["(() => document.title)()"]
}
result = self.test_post_endpoint("/execute_js", js_payload)
self.print_result(result)
# Test crawl endpoint
crawl_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)
self.print_result(result)
# Test LLM endpoint
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
result = self.test_get_endpoint(llm_endpoint)
self.print_result(result)
# Test ask endpoint
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
result = self.test_get_endpoint(ask_endpoint)
print(result)
self.print_result(result)
# Test job APIs
print("\n=== JOB APIs ===")
# Test LLM job
llm_job_payload = {
"url": test_url,
"q": "Extract main content",
"cache": False
}
result = self.test_post_endpoint("/llm/job", llm_job_payload)
self.print_result(result)
# Test crawl job
crawl_job_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
self.print_result(result)
# Test MCP
print("\n=== MCP APIs ===")
# Test MCP schema
result = self.test_get_endpoint("/mcp/schema")
self.print_result(result)
# Test error handling
print("\n=== ERROR HANDLING ===")
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
result = self.test_post_endpoint("/md", invalid_payload)
self.print_result(result)
# Test invalid endpoint
result = self.test_get_endpoint("/nonexistent")
self.print_result(result)
# Print summary
self.print_summary()
def print_summary(self):
"""Print test results summary"""
print("\n" + "=" * 60)
print("📊 TEST RESULTS SUMMARY")
print("=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r['status'] == 'PASS')
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
print(f"Total Tests: {total}")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
if failed > 0:
print("\n❌ FAILED TESTS:")
for result in self.results:
if result['status'] == 'FAIL':
print(f"{result['method']} {result['endpoint']}")
if 'error' in result:
print(f" Error: {result['error']}")
# Performance statistics
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
if response_times:
avg_time = sum(response_times) / len(response_times)
max_time = max(response_times)
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
print(f"⏱️ Max Response Time: {max_time:.3f}s")
# Save detailed report
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
with open(report_file, 'w') as f:
json.dump({
"timestamp": time.time(),
"server_url": self.base_url,
"version": "0.7.0",
"summary": {
"total": total,
"passed": passed,
"failed": failed
},
"results": self.results
}, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
def main():
"""Main test runner"""
import argparse
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
args = parser.parse_args()
tester = SimpleApiTester(args.url)
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n🛑 Test suite interrupted by user")
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,7 @@ Test script for Link Extractor functionality
from crawl4ai.models import Link
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai import LinkPreviewConfig
from crawl4ai.async_configs import LinkPreviewConfig
import asyncio
import sys
import os
@@ -237,7 +237,7 @@ def test_config_examples():
print(f" {key}: {value}")
print(" Usage:")
print(" from crawl4ai import LinkPreviewConfig")
print(" from crawl4ai.async_configs import LinkPreviewConfig")
print(" config = CrawlerRunConfig(")
print(" link_preview_config=LinkPreviewConfig(")
for key, value in config_dict.items():