Compare commits

..

1 Commits

Author SHA1 Message Date
ntohidi
0eaa9f9895 fix: handle infinity values in JSON serialization for API responses
- Add sanitize_json_data() function to convert infinity/NaN to JSON-compliant strings
- Fix /execute_js endpoint returning ValueError: Out of range float values are not JSON compliant: inf
- Fix /crawl endpoint batch responses with infinity values
- Fix /crawl/stream endpoint streaming responses with infinity values
- Fix /crawl/job endpoint background job responses with infinity values

The sanitize_json_data() function recursively processes response data:
- float('inf') → \"Infinity\"
- float('-inf') → \"-Infinity\"
- float('nan') → \"NaN\"

This prevents JSON serialization errors when JavaScript execution or crawling operations produce infinity values, ensuring all API endpoints return valid JSON.

Fixes: API endpoints crashing with infinity JSON serialization errors
Affects: /execute_js, /crawl, /crawl/stream, /crawl/job endpoints
2025-07-15 13:49:07 +02:00
10 changed files with 715 additions and 262 deletions

View File

@@ -523,18 +523,15 @@ async def test_news_crawl():
- **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically: - **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically:
```python ```python
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to stop crawling confidence_threshold=0.7,
max_depth=5, # Maximum crawl depth max_history=100,
max_pages=20, # Maximum number of pages to crawl learning_rate=0.2
strategy="statistical"
) )
async with AsyncWebCrawler() as crawler: result = await crawler.arun(
adaptive_crawler = AdaptiveCrawler(crawler, config) "https://news.example.com",
state = await adaptive_crawler.digest( config=CrawlerRunConfig(adaptive_config=config)
start_url="https://news.example.com", )
query="latest news content"
)
# Crawler learns patterns and improves extraction over time # Crawler learns patterns and improves extraction over time
``` ```

View File

@@ -54,6 +54,27 @@ def _get_memory_mb():
logger.warning(f"Could not get memory info: {e}") logger.warning(f"Could not get memory info: {e}")
return None return None
# --- Helper to sanitize JSON data ---
def sanitize_json_data(data):
"""
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
"""
import math
if isinstance(data, dict):
return {k: sanitize_json_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_json_data(item) for item in data]
elif isinstance(data, float):
if math.isinf(data):
return "Infinity" if data > 0 else "-Infinity"
elif math.isnan(data):
return "NaN"
else:
return data
else:
return data
async def handle_llm_qa( async def handle_llm_qa(
url: str, url: str,
@@ -371,8 +392,10 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb() server_memory_mb = _get_memory_mb()
result_dict = result.model_dump() result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb result_dict['server_memory_mb'] = server_memory_mb
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") # Sanitize data to handle infinity values
data = json.dumps(result_dict, default=datetime_handler) + "\n" sanitized_dict = sanitize_json_data(result_dict)
logger.info(f"Streaming result for {sanitized_dict.get('url', 'unknown')}")
data = json.dumps(sanitized_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8') yield data.encode('utf-8')
except Exception as e: except Exception as e:
logger.error(f"Serialization error: {e}") logger.error(f"Serialization error: {e}")
@@ -446,7 +469,7 @@ async def handle_crawl_request(
return { return {
"success": True, "success": True,
"results": [result.model_dump() for result in results], "results": [sanitize_json_data(result.model_dump()) for result in results],
"server_processing_time_s": end_time - start_time, "server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb, "server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb "server_peak_memory_mb": peak_mem_mb

View File

@@ -331,6 +331,27 @@ async def generate_pdf(
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()} return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
def sanitize_json_data(data):
"""
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
"""
import math
if isinstance(data, dict):
return {k: sanitize_json_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_json_data(item) for item in data]
elif isinstance(data, float):
if math.isinf(data):
return "Infinity" if data > 0 else "-Infinity"
elif math.isnan(data):
return "NaN"
else:
return data
else:
return data
@app.post("/execute_js") @app.post("/execute_js")
@limiter.limit(config["rate_limiting"]["default_limit"]) @limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("execute_js") @mcp_tool("execute_js")
@@ -389,7 +410,9 @@ async def execute_js(
results = await crawler.arun(url=body.url, config=cfg) results = await crawler.arun(url=body.url, config=cfg)
# Return JSON-serializable dict of the first CrawlResult # Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump() data = results[0].model_dump()
return JSONResponse(data) # Sanitize data to handle infinity values
sanitized_data = sanitize_json_data(data)
return JSONResponse(sanitized_data)
@app.get("/llm/{url:path}") @app.get("/llm/{url:path}")

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns - **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages - **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization - **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering - **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements - **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning ## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,34 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores - Extraction confidence scores
```python ```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
# Initialize with custom adaptive parameters # Initialize with custom learning parameters
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to stop crawling confidence_threshold=0.7, # Min confidence to use learned patterns
max_depth=5, # Maximum crawl depth max_history=100, # Remember last 100 crawls per domain
max_pages=20, # Maximum number of pages to crawl learning_rate=0.2, # How quickly to adapt to changes
top_k_links=3, # Number of top links to follow per page patterns_per_page=3, # Patterns to learn per page type
strategy="statistical", # 'statistical' or 'embedding' extraction_strategy='css' # 'css' or 'xpath'
coverage_weight=0.4, # Weight for coverage in confidence calculation
consistency_weight=0.3, # Weight for consistency in confidence calculation
saturation_weight=0.3 # Weight for saturation in confidence calculation
) )
# Initialize adaptive crawler with web crawler adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
adaptive_crawler = AdaptiveCrawler(crawler, config) result = await crawler.arun(
"https://news.example.com/article/12345",
# Crawl and learn patterns config=CrawlerRunConfig(
state = await adaptive_crawler.digest( adaptive_config=config,
start_url="https://news.example.com/article/12345", extraction_hints={ # Optional hints to speed up learning
query="latest news articles and content" "title": "article h1",
"content": "article .body-content"
}
)
) )
# Access results and confidence # Crawler identifies and stores patterns
print(f"Confidence Level: {adaptive_crawler.confidence:.0%}") if result.success:
print(f"Pages Crawled: {len(state.crawled_urls)}") state = adaptive_crawler.get_state("news.example.com")
print(f"Knowledge Base: {len(adaptive_crawler.state.knowledge_base)} documents") print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
``` ```
**Expected Real-World Impact:** **Expected Real-World Impact:**
@@ -81,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']", container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
) )
# For e-commerce product grids (Instagram style) # For e-commerce product grids (Instagram style)
@@ -89,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid", container_selector="main .product-grid",
scroll_count=30, scroll_count=30,
scroll_by=800, # Fixed pixel scrolling scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
) )
# For news feeds with lazy loading # For news feeds with lazy loading
@@ -97,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed", container_selector=".article-feed",
scroll_count=50, scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
) )
# Use it in your crawl # Use it in your crawl
@@ -144,17 +160,29 @@ async with AsyncWebCrawler() as crawler:
### The Three-Layer Scoring System ### The Three-Layer Scoring System
```python ```python
from crawl4ai import LinkPreviewConfig, CrawlerRunConfig, CacheMode from crawl4ai import LinkPreviewConfig
# Configure intelligent link analysis # Configure intelligent link analysis
link_config = LinkPreviewConfig( link_config = LinkPreviewConfig(
# What to analyze
include_internal=True, include_internal=True,
include_external=False, include_external=True,
max_links=10, max_links=100, # Analyze top 100 links
concurrency=5,
query="python tutorial", # For contextual scoring # Relevance scoring
score_threshold=0.3, query="machine learning tutorials", # Your interest
verbose=True score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
) )
# Use in your crawl # Use in your crawl
@@ -162,51 +190,35 @@ result = await crawler.arun(
"https://tech-blog.example.com", "https://tech-blog.example.com",
config=CrawlerRunConfig( config=CrawlerRunConfig(
link_preview_config=link_config, link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring score_links=True
cache_mode=CacheMode.BYPASS
) )
) )
# Access scored and sorted links # Access scored and sorted links
if result.success and result.links: for link in result.links["internal"][:10]: # Top 10 internal links
# Get scored links print(f"Score: {link['total_score']:.3f}")
internal_links = result.links.get("internal", []) print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
scored_links = [l for l in internal_links if l.get("total_score")] print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
scored_links.sort(key=lambda x: x.get("total_score", 0), reverse=True) print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
# Create a scoring table print(f" Description: {link['head_data']['meta']['description'][:100]}...")
table = Table(title="Link Scoring Results", box=box.ROUNDED)
table.add_column("Link Text", style="cyan", width=40)
table.add_column("Intrinsic Score", justify="center")
table.add_column("Contextual Score", justify="center")
table.add_column("Total Score", justify="center", style="bold green")
for link in scored_links[:5]:
text = link.get('text', 'No text')[:40]
table.add_row(
text,
f"{link.get('intrinsic_score', 0):.1f}/10",
f"{link.get('contextual_score', 0):.2f}/1",
f"{link.get('total_score', 0):.3f}"
)
console.print(table)
``` ```
**Scoring Components:** **Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators 1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer) - Position on page (navigation, content, footer)
- Link attributes (rel, title, class names) - Link attributes (rel, title, class names)
- Anchor text quality and length - Anchor text quality and length
- URL structure and depth - URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm 2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title - Keyword matching in link text and title
- Meta description analysis - Meta description analysis
- Content preview scoring - Content preview scoring
3. **Total Score**: Combined score for final ranking 3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:** **Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links - **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -228,53 +240,53 @@ from crawl4ai import AsyncUrlSeeder, SeedingConfig
# Basic discovery - find all product pages # Basic discovery - find all product pages
seeder_config = SeedingConfig( seeder_config = SeedingConfig(
# Discovery sources # Discovery sources
source="cc+sitemap", # Sitemap + Common Crawl source="sitemap+cc", # Sitemap + Common Crawl
# Filtering # Filtering
pattern="*/product/*", # URL pattern matching pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation # Validation
live_check=True, # Verify URLs are alive live_check=True, # Verify URLs are alive
max_urls=50, # Stop at 50 URLs max_urls=5000, # Stop at 5000 URLs
# Performance # Performance
concurrency=100, # Maximum concurrent requests for live checks/head extraction concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limit in requests per second to avoid overwhelming servers hits_per_sec=10 # Rate limiting
) )
async with AsyncUrlSeeder() as seeder: seeder = AsyncUrlSeeder(seeder_config)
console.print("Discovering URLs from Python docs...") urls = await seeder.discover("https://shop.example.com")
urls = await seeder.urls("docs.python.org", seeding_config)
console.print(f"\n✓ Discovered {len(urls)} URLs")
# Advanced: Relevance-based discovery # Advanced: Relevance-based discovery
research_config = SeedingConfig( research_config = SeedingConfig(
source="sitemap+cc", # Sitemap + Common Crawl source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only pattern="*/blog/*", # Blog posts only
# Content relevance # Content relevance
extract_head=True, # Get meta tags extract_head=True, # Get meta tags
query="quantum computing tutorials", query="quantum computing tutorials",
scoring_method="bm25", # BM25 scoring method scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only score_threshold=0.4, # High relevance only
# Smart filtering # Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc. filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache force=True # Bypass cache
) )
# Discover with progress tracking # Discover with progress tracking
discovered = [] discovered = []
async with AsyncUrlSeeder() as seeder: async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered = await seeder.urls("https://physics-blog.com", research_config) discovered.extend(batch)
console.print(f"\n✓ Discovered {len(discovered)} URLs") print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata # Results include scores and metadata
for url_data in discovered[:5]: for url_data in discovered[:5]:
print(f"URL: {url_data['url']}") print(f"URL: {url_data['url']}")
print(f"Score: {url_data['relevance_score']:.3f}") print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['head_data']['title']}") print(f"Title: {url_data['title']}")
``` ```
**Discovery Methods:** **Discovery Methods:**
@@ -297,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized ### What We Optimized
```python ```python
# Optimized crawling with v0.7.0 improvements # Before v0.7.0 (slow)
results = [] results = []
for url in urls: for url in urls:
result = await crawler.arun( result = await crawler.arun(url)
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
results.append(result) results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
``` ```
**Performance Gains:** **Performance Gains:**
@@ -318,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing - **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests - **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes ## 🔧 Important Changes

View File

@@ -49,75 +49,46 @@ from crawl4ai import JsonCssExtractionStrategy
from crawl4ai.cache_context import CacheMode from crawl4ai.cache_context import CacheMode
async def crawl_dynamic_content(): async def crawl_dynamic_content():
url = "https://github.com/microsoft/TypeScript/commits/main" async with AsyncWebCrawler() as crawler:
session_id = "wait_for_session" session_id = "github_commits_session"
all_commits = [] url = "https://github.com/microsoft/TypeScript/commits/main"
all_commits = []
js_next_page = """ # Define extraction schema
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4'); schema = {
if (commits.length > 0) { "name": "Commit Extractor",
window.lastCommit = commits[0].textContent.trim(); "baseSelector": "li.Box-sc-g0xbh4-0",
} "fields": [{
const button = document.querySelector('a[data-testid="pagination-next-button"]'); "name": "title", "selector": "h4.markdown-title", "type": "text"
if (button) {button.click(); console.log('button clicked') } }],
""" }
extraction_strategy = JsonCssExtractionStrategy(schema)
wait_for = """() => { # JavaScript and wait configurations
const commits = document.querySelectorAll('li[data-testid="commit-row-item"] h4'); js_next_page = """document.querySelector('a[data-testid="pagination-next-button"]').click();"""
if (commits.length === 0) return false; wait_for = """() => document.querySelectorAll('li.Box-sc-g0xbh4-0').length > 0"""
const firstCommit = commits[0].textContent.trim();
return firstCommit !== window.lastCommit; # Crawl multiple pages
}"""
schema = {
"name": "Commit Extractor",
"baseSelector": "li[data-testid='commit-row-item']",
"fields": [
{
"name": "title",
"selector": "h4 a",
"type": "text",
"transform": "strip",
},
],
}
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
browser_config = BrowserConfig(
verbose=True,
headless=False,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
for page in range(3): for page in range(3):
crawler_config = CrawlerRunConfig( config = CrawlerRunConfig(
url=url,
session_id=session_id, session_id=session_id,
css_selector="li[data-testid='commit-row-item']",
extraction_strategy=extraction_strategy, extraction_strategy=extraction_strategy,
js_code=js_next_page if page > 0 else None, js_code=js_next_page if page > 0 else None,
wait_for=wait_for if page > 0 else None, wait_for=wait_for if page > 0 else None,
js_only=page > 0, js_only=page > 0,
cache_mode=CacheMode.BYPASS, cache_mode=CacheMode.BYPASS
capture_console_messages=True,
) )
result = await crawler.arun(url=url, config=crawler_config) result = await crawler.arun(config=config)
if result.success:
if result.console_messages:
print(f"Page {page + 1} console messages:", result.console_messages)
if result.extracted_content:
# print(f"Page {page + 1} result:", result.extracted_content)
commits = json.loads(result.extracted_content) commits = json.loads(result.extracted_content)
all_commits.extend(commits) all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits") print(f"Page {page + 1}: Found {len(commits)} commits")
else:
print(f"Page {page + 1}: No content extracted")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Clean up session # Clean up session
await crawler.crawler_strategy.kill_session(session_id) await crawler.crawler_strategy.kill_session(session_id)
return all_commits
``` ```
--- ---

View File

@@ -91,12 +91,13 @@ async def crawl_twitter_timeline():
wait_after_scroll=1.0 # Twitter needs time to load wait_after_scroll=1.0 # Twitter needs time to load
) )
browser_config = BrowserConfig(headless=True) # Set to False to watch it work
config = CrawlerRunConfig( config = CrawlerRunConfig(
virtual_scroll_config=virtual_config virtual_scroll_config=virtual_config,
# Optional: Set headless=False to watch it work
# browser_config=BrowserConfig(headless=False)
) )
async with AsyncWebCrawler(config=browser_config) as crawler: async with AsyncWebCrawler() as crawler:
result = await crawler.arun( result = await crawler.arun(
url="https://twitter.com/search?q=AI", url="https://twitter.com/search?q=AI",
config=config config=config
@@ -199,7 +200,7 @@ Use **scan_full_page** when:
Virtual Scroll works seamlessly with extraction strategies: Virtual Scroll works seamlessly with extraction strategies:
```python ```python
from crawl4ai import LLMExtractionStrategy, LLMConfig from crawl4ai import LLMExtractionStrategy
# Define extraction schema # Define extraction schema
schema = { schema = {
@@ -221,7 +222,7 @@ config = CrawlerRunConfig(
scroll_count=20 scroll_count=20
), ),
extraction_strategy=LLMExtractionStrategy( extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"), provider="openai/gpt-4o-mini",
schema=schema schema=schema
) )
) )

View File

@@ -10,8 +10,9 @@ Today I'm releasing Crawl4AI v0.7.0—the Adaptive Intelligence Update. This rel
- **Adaptive Crawling**: Your crawler now learns and adapts to website patterns - **Adaptive Crawling**: Your crawler now learns and adapts to website patterns
- **Virtual Scroll Support**: Complete content extraction from infinite scroll pages - **Virtual Scroll Support**: Complete content extraction from infinite scroll pages
- **Link Preview with Intelligent Scoring**: Intelligent link analysis and prioritization - **Link Preview with 3-Layer Scoring**: Intelligent link analysis and prioritization
- **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering - **Async URL Seeder**: Discover thousands of URLs in seconds with intelligent filtering
- **PDF Parsing**: Extract data from PDF documents
- **Performance Optimizations**: Significant speed and memory improvements - **Performance Optimizations**: Significant speed and memory improvements
## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning ## 🧠 Adaptive Crawling: Intelligence Through Pattern Learning
@@ -29,34 +30,44 @@ The Adaptive Crawler maintains a persistent state for each domain, tracking:
- Extraction confidence scores - Extraction confidence scores
```python ```python
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig from crawl4ai import AdaptiveCrawler, AdaptiveConfig, CrawlState
# Initialize with custom adaptive parameters # Initialize with custom learning parameters
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.7, # Min confidence to stop crawling confidence_threshold=0.7, # Min confidence to use learned patterns
max_depth=5, # Maximum crawl depth max_history=100, # Remember last 100 crawls per domain
max_pages=20, # Maximum number of pages to crawl learning_rate=0.2, # How quickly to adapt to changes
top_k_links=3, # Number of top links to follow per page patterns_per_page=3, # Patterns to learn per page type
strategy="statistical", # 'statistical' or 'embedding' extraction_strategy='css' # 'css' or 'xpath'
coverage_weight=0.4, # Weight for coverage in confidence calculation
consistency_weight=0.3, # Weight for consistency in confidence calculation
saturation_weight=0.3 # Weight for saturation in confidence calculation
) )
# Initialize adaptive crawler with web crawler adaptive_crawler = AdaptiveCrawler(config)
# First crawl - crawler learns the structure
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
adaptive_crawler = AdaptiveCrawler(crawler, config) result = await crawler.arun(
"https://news.example.com/article/12345",
# Crawl and learn patterns config=CrawlerRunConfig(
state = await adaptive_crawler.digest( adaptive_config=config,
start_url="https://news.example.com/article/12345", extraction_hints={ # Optional hints to speed up learning
query="latest news articles and content" "title": "article h1",
"content": "article .body-content"
}
)
) )
# Access results and confidence # Crawler identifies and stores patterns
print(f"Confidence Level: {adaptive_crawler.confidence:.0%}") if result.success:
print(f"Pages Crawled: {len(state.crawled_urls)}") state = adaptive_crawler.get_state("news.example.com")
print(f"Knowledge Base: {len(adaptive_crawler.state.knowledge_base)} documents") print(f"Learned {len(state.patterns)} patterns")
print(f"Confidence: {state.avg_confidence:.2%}")
# Subsequent crawls - uses learned patterns
result2 = await crawler.arun(
"https://news.example.com/article/67890",
config=CrawlerRunConfig(adaptive_config=config)
)
# Automatically extracts using learned patterns!
``` ```
**Expected Real-World Impact:** **Expected Real-World Impact:**
@@ -81,7 +92,9 @@ twitter_config = VirtualScrollConfig(
container_selector="[data-testid='primaryColumn']", container_selector="[data-testid='primaryColumn']",
scroll_count=20, # Number of scrolls scroll_count=20, # Number of scrolls
scroll_by="container_height", # Smart scrolling by container size scroll_by="container_height", # Smart scrolling by container size
wait_after_scroll=1.0 # Let content load wait_after_scroll=1.0, # Let content load
capture_method="incremental", # Capture new content on each scroll
deduplicate=True # Remove duplicate elements
) )
# For e-commerce product grids (Instagram style) # For e-commerce product grids (Instagram style)
@@ -89,7 +102,8 @@ grid_config = VirtualScrollConfig(
container_selector="main .product-grid", container_selector="main .product-grid",
scroll_count=30, scroll_count=30,
scroll_by=800, # Fixed pixel scrolling scroll_by=800, # Fixed pixel scrolling
wait_after_scroll=1.5 # Images need time wait_after_scroll=1.5, # Images need time
stop_on_no_change=True # Smart stopping
) )
# For news feeds with lazy loading # For news feeds with lazy loading
@@ -97,7 +111,9 @@ news_config = VirtualScrollConfig(
container_selector=".article-feed", container_selector=".article-feed",
scroll_count=50, scroll_count=50,
scroll_by="page_height", # Viewport-based scrolling scroll_by="page_height", # Viewport-based scrolling
wait_after_scroll=0.5 # Wait for content to load wait_after_scroll=0.5,
wait_for_selector=".article-card", # Wait for specific elements
timeout=30000 # Max 30 seconds total
) )
# Use it in your crawl # Use it in your crawl
@@ -144,17 +160,29 @@ async with AsyncWebCrawler() as crawler:
### The Three-Layer Scoring System ### The Three-Layer Scoring System
```python ```python
from crawl4ai import LinkPreviewConfig, CrawlerRunConfig, CacheMode from crawl4ai import LinkPreviewConfig
# Configure intelligent link analysis # Configure intelligent link analysis
link_config = LinkPreviewConfig( link_config = LinkPreviewConfig(
# What to analyze
include_internal=True, include_internal=True,
include_external=False, include_external=True,
max_links=10, max_links=100, # Analyze top 100 links
concurrency=5,
query="python tutorial", # For contextual scoring # Relevance scoring
score_threshold=0.3, query="machine learning tutorials", # Your interest
verbose=True score_threshold=0.3, # Minimum relevance score
# Performance
concurrent_requests=10, # Parallel processing
timeout_per_link=5000, # 5s per link
# Advanced scoring weights
scoring_weights={
"intrinsic": 0.3, # Link quality indicators
"contextual": 0.5, # Relevance to query
"popularity": 0.2 # Link prominence
}
) )
# Use in your crawl # Use in your crawl
@@ -162,51 +190,35 @@ result = await crawler.arun(
"https://tech-blog.example.com", "https://tech-blog.example.com",
config=CrawlerRunConfig( config=CrawlerRunConfig(
link_preview_config=link_config, link_preview_config=link_config,
score_links=True, # Enable intrinsic scoring score_links=True
cache_mode=CacheMode.BYPASS
) )
) )
# Access scored and sorted links # Access scored and sorted links
if result.success and result.links: for link in result.links["internal"][:10]: # Top 10 internal links
# Get scored links print(f"Score: {link['total_score']:.3f}")
internal_links = result.links.get("internal", []) print(f" Intrinsic: {link['intrinsic_score']:.1f}/10") # Position, attributes
scored_links = [l for l in internal_links if l.get("total_score")] print(f" Contextual: {link['contextual_score']:.1f}/1") # Relevance to query
scored_links.sort(key=lambda x: x.get("total_score", 0), reverse=True) print(f" URL: {link['href']}")
print(f" Title: {link['head_data']['title']}")
# Create a scoring table print(f" Description: {link['head_data']['meta']['description'][:100]}...")
table = Table(title="Link Scoring Results", box=box.ROUNDED)
table.add_column("Link Text", style="cyan", width=40)
table.add_column("Intrinsic Score", justify="center")
table.add_column("Contextual Score", justify="center")
table.add_column("Total Score", justify="center", style="bold green")
for link in scored_links[:5]:
text = link.get('text', 'No text')[:40]
table.add_row(
text,
f"{link.get('intrinsic_score', 0):.1f}/10",
f"{link.get('contextual_score', 0):.2f}/1",
f"{link.get('total_score', 0):.3f}"
)
console.print(table)
``` ```
**Scoring Components:** **Scoring Components:**
1. **Intrinsic Score**: Based on link quality indicators 1. **Intrinsic Score (0-10)**: Based on link quality indicators
- Position on page (navigation, content, footer) - Position on page (navigation, content, footer)
- Link attributes (rel, title, class names) - Link attributes (rel, title, class names)
- Anchor text quality and length - Anchor text quality and length
- URL structure and depth - URL structure and depth
2. **Contextual Score**: Relevance to your query using BM25 algorithm 2. **Contextual Score (0-1)**: Relevance to your query
- Semantic similarity using embeddings
- Keyword matching in link text and title - Keyword matching in link text and title
- Meta description analysis - Meta description analysis
- Content preview scoring - Content preview scoring
3. **Total Score**: Combined score for final ranking 3. **Total Score**: Weighted combination for final ranking
**Expected Real-World Impact:** **Expected Real-World Impact:**
- **Research Efficiency**: Find relevant papers 10x faster by following only high-score links - **Research Efficiency**: Find relevant papers 10x faster by following only high-score links
@@ -228,53 +240,53 @@ from crawl4ai import AsyncUrlSeeder, SeedingConfig
# Basic discovery - find all product pages # Basic discovery - find all product pages
seeder_config = SeedingConfig( seeder_config = SeedingConfig(
# Discovery sources # Discovery sources
source="cc+sitemap", # Sitemap + Common Crawl source="sitemap+cc", # Sitemap + Common Crawl
# Filtering # Filtering
pattern="*/product/*", # URL pattern matching pattern="*/product/*", # URL pattern matching
ignore_patterns=["*/reviews/*", "*/questions/*"],
# Validation # Validation
live_check=True, # Verify URLs are alive live_check=True, # Verify URLs are alive
max_urls=50, # Stop at 50 URLs max_urls=5000, # Stop at 5000 URLs
# Performance # Performance
concurrency=100, # Maximum concurrent requests for live checks/head extraction concurrency=100, # Parallel requests
hits_per_sec=10 # Rate limit in requests per second to avoid overwhelming servers hits_per_sec=10 # Rate limiting
) )
async with AsyncUrlSeeder() as seeder: seeder = AsyncUrlSeeder(seeder_config)
console.print("Discovering URLs from Python docs...") urls = await seeder.discover("https://shop.example.com")
urls = await seeder.urls("docs.python.org", seeding_config)
console.print(f"\n✓ Discovered {len(urls)} URLs")
# Advanced: Relevance-based discovery # Advanced: Relevance-based discovery
research_config = SeedingConfig( research_config = SeedingConfig(
source="sitemap+cc", # Sitemap + Common Crawl source="crawl+sitemap", # Deep crawl + sitemap
pattern="*/blog/*", # Blog posts only pattern="*/blog/*", # Blog posts only
# Content relevance # Content relevance
extract_head=True, # Get meta tags extract_head=True, # Get meta tags
query="quantum computing tutorials", query="quantum computing tutorials",
scoring_method="bm25", # BM25 scoring method scoring_method="bm25", # Or "semantic" (coming soon)
score_threshold=0.4, # High relevance only score_threshold=0.4, # High relevance only
# Smart filtering # Smart filtering
filter_nonsense_urls=True, # Remove .xml, .txt, etc. filter_nonsense_urls=True, # Remove .xml, .txt, etc.
min_content_length=500, # Skip thin content
force=True # Bypass cache force=True # Bypass cache
) )
# Discover with progress tracking # Discover with progress tracking
discovered = [] discovered = []
async with AsyncUrlSeeder() as seeder: async for batch in seeder.discover_iter("https://physics-blog.com", research_config):
discovered = await seeder.urls("https://physics-blog.com", research_config) discovered.extend(batch)
console.print(f"\n✓ Discovered {len(discovered)} URLs") print(f"Found {len(discovered)} relevant URLs so far...")
# Results include scores and metadata # Results include scores and metadata
for url_data in discovered[:5]: for url_data in discovered[:5]:
print(f"URL: {url_data['url']}") print(f"URL: {url_data['url']}")
print(f"Score: {url_data['relevance_score']:.3f}") print(f"Score: {url_data['score']:.3f}")
print(f"Title: {url_data['head_data']['title']}") print(f"Title: {url_data['title']}")
``` ```
**Discovery Methods:** **Discovery Methods:**
@@ -297,18 +309,35 @@ This release includes significant performance improvements through optimized res
### What We Optimized ### What We Optimized
```python ```python
# Optimized crawling with v0.7.0 improvements # Before v0.7.0 (slow)
results = [] results = []
for url in urls: for url in urls:
result = await crawler.arun( result = await crawler.arun(url)
url,
config=CrawlerRunConfig(
# Performance optimizations
wait_until="domcontentloaded", # Faster than networkidle
cache_mode=CacheMode.ENABLED # Enable caching
)
)
results.append(result) results.append(result)
# After v0.7.0 (fast)
# Automatic batching and connection pooling
results = await crawler.arun_batch(
urls,
config=CrawlerRunConfig(
# New performance options
batch_size=10, # Process 10 URLs concurrently
reuse_browser=True, # Keep browser warm
eager_loading=False, # Load only what's needed
streaming_extraction=True, # Stream large extractions
# Optimized defaults
wait_until="domcontentloaded", # Faster than networkidle
exclude_external_resources=True, # Skip third-party assets
block_ads=True # Ad blocking built-in
)
)
# Memory-efficient streaming for large crawls
async for result in crawler.arun_stream(large_url_list):
# Process results as they complete
await process_result(result)
# Memory is freed after each iteration
``` ```
**Performance Gains:** **Performance Gains:**
@@ -318,6 +347,24 @@ for url in urls:
- **Memory Usage**: 60% reduction with streaming processing - **Memory Usage**: 60% reduction with streaming processing
- **Concurrent Crawls**: Handle 5x more parallel requests - **Concurrent Crawls**: Handle 5x more parallel requests
## 📄 PDF Support
PDF extraction is now natively supported in Crawl4AI.
```python
# Extract data from PDF documents
result = await crawler.arun(
"https://example.com/report.pdf",
config=CrawlerRunConfig(
pdf_extraction=True,
extraction_strategy=JsonCssExtractionStrategy({
# Works on converted PDF structure
"title": {"selector": "h1", "type": "text"},
"sections": {"selector": "h2", "type": "list"}
})
)
)
```
## 🔧 Important Changes ## 🔧 Important Changes

View File

@@ -35,7 +35,7 @@ from crawl4ai import AsyncWebCrawler, AdaptiveCrawler
async def main(): async def main():
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
# Create an adaptive crawler (config is optional) # Create an adaptive crawler
adaptive = AdaptiveCrawler(crawler) adaptive = AdaptiveCrawler(crawler)
# Start crawling with a query # Start crawling with a query
@@ -59,13 +59,13 @@ async def main():
from crawl4ai import AdaptiveConfig from crawl4ai import AdaptiveConfig
config = AdaptiveConfig( config = AdaptiveConfig(
confidence_threshold=0.8, # Stop when 80% confident (default: 0.7) confidence_threshold=0.7, # Stop when 70% confident (default: 0.8)
max_pages=30, # Maximum pages to crawl (default: 20) max_pages=20, # Maximum pages to crawl (default: 50)
top_k_links=5, # Links to follow per page (default: 3) top_k_links=3, # Links to follow per page (default: 5)
min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1) min_gain_threshold=0.05 # Minimum expected gain to continue (default: 0.1)
) )
adaptive = AdaptiveCrawler(crawler, config) adaptive = AdaptiveCrawler(crawler, config=config)
``` ```
## Crawling Strategies ## Crawling Strategies
@@ -198,8 +198,8 @@ if result.metrics.get('is_irrelevant', False):
The confidence score (0-1) indicates how sufficient the gathered information is: The confidence score (0-1) indicates how sufficient the gathered information is:
- **0.0-0.3**: Insufficient information, needs more crawling - **0.0-0.3**: Insufficient information, needs more crawling
- **0.3-0.6**: Partial information, may answer basic queries - **0.3-0.6**: Partial information, may answer basic queries
- **0.6-0.7**: Good coverage, can answer most queries - **0.6-0.8**: Good coverage, can answer most queries
- **0.7-1.0**: Excellent coverage, comprehensive information - **0.8-1.0**: Excellent coverage, comprehensive information
### Statistics Display ### Statistics Display
@@ -257,9 +257,9 @@ new_adaptive.import_knowledge_base("knowledge_base.jsonl")
- Avoid overly broad queries - Avoid overly broad queries
### 2. Threshold Tuning ### 2. Threshold Tuning
- Start with default (0.7) for general use - Start with default (0.8) for general use
- Lower to 0.5-0.6 for exploratory crawling - Lower to 0.6-0.7 for exploratory crawling
- Raise to 0.8+ for exhaustive coverage - Raise to 0.9+ for exhaustive coverage
### 3. Performance Optimization ### 3. Performance Optimization
- Use appropriate `max_pages` limits - Use appropriate `max_pages` limits

View File

@@ -137,7 +137,7 @@ async def smart_blog_crawler():
word_count_threshold=300 # Only substantial articles word_count_threshold=300 # Only substantial articles
) )
# Extract URLs and crawl them # Extract URLs and stream results as they come
tutorial_urls = [t["url"] for t in tutorials[:10]] tutorial_urls = [t["url"] for t in tutorials[:10]]
results = await crawler.arun_many(tutorial_urls, config=config) results = await crawler.arun_many(tutorial_urls, config=config)
@@ -231,7 +231,7 @@ Common Crawl is a massive public dataset that regularly crawls the entire web. I
```python ```python
# Use both sources # Use both sources
config = SeedingConfig(source="sitemap+cc") config = SeedingConfig(source="cc+sitemap")
urls = await seeder.urls("example.com", config) urls = await seeder.urls("example.com", config)
``` ```
@@ -241,13 +241,13 @@ The `SeedingConfig` object is your control panel. Here's everything you can conf
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `source` | str | "sitemap+cc" | URL source: "cc" (Common Crawl), "sitemap", or "sitemap+cc" | | `source` | str | "cc" | URL source: "cc" (Common Crawl), "sitemap", or "cc+sitemap" |
| `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") | | `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") |
| `extract_head` | bool | False | Extract metadata from page `<head>` | | `extract_head` | bool | False | Extract metadata from page `<head>` |
| `live_check` | bool | False | Verify URLs are accessible | | `live_check` | bool | False | Verify URLs are accessible |
| `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) | | `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) |
| `concurrency` | int | 10 | Parallel workers for fetching | | `concurrency` | int | 10 | Parallel workers for fetching |
| `hits_per_sec` | int | 5 | Rate limit for requests | | `hits_per_sec` | int | None | Rate limit for requests |
| `force` | bool | False | Bypass cache, fetch fresh data | | `force` | bool | False | Bypass cache, fetch fresh data |
| `verbose` | bool | False | Show detailed progress | | `verbose` | bool | False | Show detailed progress |
| `query` | str | None | Search query for BM25 scoring | | `query` | str | None | Search query for BM25 scoring |
@@ -522,7 +522,7 @@ urls = await seeder.urls("docs.example.com", config)
```python ```python
# Find specific products # Find specific products
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", # Use both sources source="cc+sitemap", # Use both sources
extract_head=True, extract_head=True,
query="wireless headphones noise canceling", query="wireless headphones noise canceling",
scoring_method="bm25", scoring_method="bm25",
@@ -782,7 +782,7 @@ class ResearchAssistant:
# Step 1: Discover relevant URLs # Step 1: Discover relevant URLs
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", # Maximum coverage source="cc+sitemap", # Maximum coverage
extract_head=True, # Get metadata extract_head=True, # Get metadata
query=topic, # Research topic query=topic, # Research topic
scoring_method="bm25", # Smart scoring scoring_method="bm25", # Smart scoring
@@ -832,8 +832,7 @@ class ResearchAssistant:
# Extract URLs and crawl all articles # Extract URLs and crawl all articles
article_urls = [article['url'] for article in top_articles] article_urls = [article['url'] for article in top_articles]
results = [] results = []
crawl_results = await crawler.arun_many(article_urls, config=config) async for result in await crawler.arun_many(article_urls, config=config):
async for result in crawl_results:
if result.success: if result.success:
results.append({ results.append({
'url': result.url, 'url': result.url,
@@ -934,10 +933,10 @@ config = SeedingConfig(concurrency=10, hits_per_sec=5)
# When crawling many URLs # When crawling many URLs
async with AsyncWebCrawler() as crawler: async with AsyncWebCrawler() as crawler:
# Assuming urls is a list of URL strings # Assuming urls is a list of URL strings
crawl_results = await crawler.arun_many(urls, config=config) results = await crawler.arun_many(urls, config=config)
# Process as they arrive # Process as they arrive
async for result in crawl_results: async for result in results:
process_immediately(result) # Don't wait for all process_immediately(result) # Don't wait for all
``` ```
@@ -1021,7 +1020,7 @@ config = SeedingConfig(
# E-commerce product discovery # E-commerce product discovery
config = SeedingConfig( config = SeedingConfig(
source="sitemap+cc", source="cc+sitemap",
pattern="*/product/*", pattern="*/product/*",
extract_head=True, extract_head=True,
live_check=True live_check=True

View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
Simple API Test for Crawl4AI Docker Server v0.7.0
Uses only built-in Python modules to test all endpoints.
"""
import urllib.request
import urllib.parse
import json
import time
import sys
from typing import Dict, List, Optional
# Configuration
BASE_URL = "http://localhost:11234" # Change to your server URL
TEST_TIMEOUT = 30
class SimpleApiTester:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.token = None
self.results = []
def log(self, message: str):
print(f"[INFO] {message}")
def test_get_endpoint(self, endpoint: str) -> Dict:
"""Test a GET endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
req = urllib.request.Request(url)
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "GET",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "GET",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
"""Test a POST endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "POST",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "POST",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def print_result(self, result: Dict):
"""Print a formatted test result"""
status_color = {
"PASS": "",
"FAIL": "",
"SKIP": "⏭️"
}
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
if result['status'] == 'FAIL' and 'error' in result:
print(f" Error: {result['error']}")
self.results.append(result)
def run_all_tests(self):
"""Run all API tests"""
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
print(f"📡 Testing server at: {self.base_url}")
print("=" * 60)
# # Test basic endpoints
# print("\n=== BASIC ENDPOINTS ===")
# # Health check
# result = self.test_get_endpoint("/health")
# self.print_result(result)
# # Schema endpoint
# result = self.test_get_endpoint("/schema")
# self.print_result(result)
# # Metrics endpoint
# result = self.test_get_endpoint("/metrics")
# self.print_result(result)
# # Root redirect
# result = self.test_get_endpoint("/")
# self.print_result(result)
# # Test authentication
# print("\n=== AUTHENTICATION ===")
# # Get token
# token_payload = {"email": "test@example.com"}
# result = self.test_post_endpoint("/token", token_payload)
# self.print_result(result)
# # Extract token if successful
# if result['status'] == 'PASS' and 'data' in result:
# token = result['data'].get('access_token')
# if token:
# self.token = token
# self.log(f"Successfully obtained auth token: {token[:20]}...")
# Test core APIs
print("\n=== CORE APIs ===")
test_url = "https://example.com"
# Test markdown endpoint
md_payload = {
"url": test_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", md_payload)
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
self.print_result(result)
# Test screenshot endpoint
screenshot_payload = {
"url": test_url,
"screenshot_wait_for": 2
}
result = self.test_post_endpoint("/screenshot", screenshot_payload)
self.print_result(result)
# Test PDF endpoint
pdf_payload = {"url": test_url}
result = self.test_post_endpoint("/pdf", pdf_payload)
self.print_result(result)
# Test JavaScript execution
js_payload = {
"url": test_url,
"scripts": ["(() => document.title)()"]
}
result = self.test_post_endpoint("/execute_js", js_payload)
self.print_result(result)
# Test crawl endpoint
crawl_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)
self.print_result(result)
# Test LLM endpoint
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
result = self.test_get_endpoint(llm_endpoint)
self.print_result(result)
# Test ask endpoint
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
result = self.test_get_endpoint(ask_endpoint)
print(result)
self.print_result(result)
# Test job APIs
print("\n=== JOB APIs ===")
# Test LLM job
llm_job_payload = {
"url": test_url,
"q": "Extract main content",
"cache": False
}
result = self.test_post_endpoint("/llm/job", llm_job_payload)
self.print_result(result)
# Test crawl job
crawl_job_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
self.print_result(result)
# Test MCP
print("\n=== MCP APIs ===")
# Test MCP schema
result = self.test_get_endpoint("/mcp/schema")
self.print_result(result)
# Test error handling
print("\n=== ERROR HANDLING ===")
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
result = self.test_post_endpoint("/md", invalid_payload)
self.print_result(result)
# Test invalid endpoint
result = self.test_get_endpoint("/nonexistent")
self.print_result(result)
# Print summary
self.print_summary()
def print_summary(self):
"""Print test results summary"""
print("\n" + "=" * 60)
print("📊 TEST RESULTS SUMMARY")
print("=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r['status'] == 'PASS')
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
print(f"Total Tests: {total}")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
if failed > 0:
print("\n❌ FAILED TESTS:")
for result in self.results:
if result['status'] == 'FAIL':
print(f"{result['method']} {result['endpoint']}")
if 'error' in result:
print(f" Error: {result['error']}")
# Performance statistics
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
if response_times:
avg_time = sum(response_times) / len(response_times)
max_time = max(response_times)
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
print(f"⏱️ Max Response Time: {max_time:.3f}s")
# Save detailed report
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
with open(report_file, 'w') as f:
json.dump({
"timestamp": time.time(),
"server_url": self.base_url,
"version": "0.7.0",
"summary": {
"total": total,
"passed": passed,
"failed": failed
},
"results": self.results
}, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
def main():
"""Main test runner"""
import argparse
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
args = parser.parse_args()
tester = SimpleApiTester(args.url)
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n🛑 Test suite interrupted by user")
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()